ktd/exec/
item.rs

1use crate::{
2    exec::{keep_peer, Operations, Peer, Rpc},
3    state::{self, Table, TableStatus},
4    TIMEOUT,
5};
6use anyhow::{bail, Context, Result};
7use bytes::Bytes;
8use serde::{Deserialize, Serialize};
9use tokio::{task::JoinSet, time::timeout};
10use uuid::Uuid;
11
12/// Retrieve a key from the given table.
13///
14/// Executes [ItemGet] on `N` nodes and returns after `R` successful responses.
15pub async fn item_get(id: Uuid, key: Bytes) -> Result<Vec<Option<Bytes>>> {
16    if id.get_version().context("invalid table id")? != uuid::Version::SortRand {
17        bail!("table id has invalid version")
18    }
19
20    let table = Table::load(id).await?.context("table not found")?;
21    let TableStatus::Created(table) = table.status else {
22        bail!("table is not created")
23    };
24
25    let mut set = JoinSet::new();
26    for peer in table.peers_for_key(&key) {
27        let rpc = ItemGet {
28            table: id,
29            key: key.clone(),
30        };
31        set.spawn(keep_peer(peer, timeout(TIMEOUT, rpc.exec(peer))));
32    }
33
34    let mut successes = 0;
35    let mut results = Vec::new();
36    while let Some(res) = set.join_next().await {
37        match res.expect("join error") {
38            (_, Ok(Ok(Ok(res)))) => {
39                successes += 1;
40                results.push(res);
41            }
42            (Peer { addr, .. }, Ok(Ok(Err(store_error)))) => {
43                log::error!("GET: store error on {addr}, {store_error}")
44            }
45            (Peer { addr, .. }, Ok(Err(rpc_error))) => {
46                log::error!("GET: rpc error on {addr}, {rpc_error}")
47            }
48            (Peer { addr, .. }, Err(timeout_error)) => {
49                log::error!("GET: timeout error on {addr}, {timeout_error}")
50            }
51        }
52
53        if successes >= table.params.r {
54            set.abort_all();
55            break;
56        }
57    }
58
59    Ok(results)
60}
61
62/// Sets a key in the given table.
63///
64/// Executes [ItemSet] on `N` nodes and returns after `W` successful responses.
65pub async fn item_set(id: Uuid, key: Bytes, value: Bytes) -> Result<()> {
66    if id.get_version().context("invalid table id")? != uuid::Version::SortRand {
67        bail!("table id has invalid version")
68    }
69
70    let table = Table::load(id).await?.context("table not found")?;
71    let TableStatus::Created(table) = table.status else {
72        bail!("table is not created")
73    };
74
75    let mut set = JoinSet::new();
76    for peer in table.peers_for_key(&key) {
77        let rpc = ItemSet {
78            table: id,
79            key: key.clone(),
80            value: value.clone(),
81        };
82        set.spawn(keep_peer(peer, timeout(TIMEOUT, rpc.exec(peer))));
83    }
84
85    let mut successes = 0;
86    while let Some(res) = set.join_next().await {
87        match res.expect("Join error") {
88            (_, Ok(Ok(Ok(())))) => successes += 1,
89            (Peer { addr, .. }, Ok(Ok(Err(store_error)))) => {
90                log::error!("SET: store error on {addr}, {store_error}")
91            }
92            (Peer { addr, .. }, Ok(Err(rpc_error))) => {
93                log::error!("SET: rpc error on {addr}, {rpc_error}")
94            }
95            (Peer { addr, .. }, Err(timeout_error)) => {
96                log::error!("SET: timeout error on {addr}, {timeout_error}")
97            }
98        }
99
100        if successes >= table.params.w {
101            set.detach_all();
102            return Ok(());
103        }
104    }
105
106    bail!(
107        "Failed to write to {} nodes, only {successes} succeeded.",
108        table.params.w
109    )
110}
111
112/// Lists all key-value pairs for a given table for every node.
113///
114/// Returns a list of (address, key-value list).
115/// Executes [ItemList].
116pub async fn item_list(table_id: Uuid) -> Result<Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)>> {
117    let table = Table::load(table_id).await?.context("table not found")?;
118    let TableStatus::Created(table) = table.status else {
119        bail!("table is not created")
120    };
121
122    let mut set = JoinSet::new();
123    for peer in table.peers() {
124        set.spawn(keep_peer(
125            peer,
126            timeout(TIMEOUT, ItemList { table: table_id }.exec(peer)),
127        ));
128    }
129
130    let mut data = Vec::new();
131    while let Some(res) = set.join_next().await {
132        let (peer, res) = res.expect("Join error");
133        let res = match res {
134            Ok(Ok(Ok(res))) => res,
135            _ => Vec::new(),
136        };
137        data.push((peer.addr.clone(), res));
138    }
139
140    Ok(data)
141}
142
143#[derive(Debug, Serialize, Deserialize)]
144pub struct ItemGet {
145    table: Uuid,
146    key: Bytes,
147}
148
149impl Rpc for ItemGet {
150    type Request = Operations;
151    type Response = Result<Option<Bytes>, state::Error>;
152
153    async fn handle(self) -> Result<Self::Response> {
154        Ok(state::item_get(self.table, &self.key).await)
155    }
156}
157
158#[derive(Debug, Serialize, Deserialize)]
159pub struct ItemSet {
160    table: Uuid,
161    key: Bytes,
162    value: Bytes,
163}
164
165impl Rpc for ItemSet {
166    type Request = Operations;
167    type Response = Result<(), state::Error>;
168
169    async fn handle(self) -> Result<Self::Response> {
170        Ok(state::item_set(self.table, &self.key, &self.value).await)
171    }
172}
173
174#[derive(Debug, Serialize, Deserialize)]
175pub struct ItemList {
176    table: Uuid,
177}
178
179impl Rpc for ItemList {
180    type Request = Operations;
181    type Response = Result<Vec<(Vec<u8>, Vec<u8>)>, state::Error>;
182
183    async fn handle(self) -> Result<Self::Response> {
184        Ok(state::item_list(self.table).await)
185    }
186}