1use crate::{
2 exec::{keep_peer, Operations, Peer, Rpc},
3 state::{self, Table, TableStatus},
4 TIMEOUT,
5};
6use anyhow::{bail, Context, Result};
7use bytes::Bytes;
8use serde::{Deserialize, Serialize};
9use tokio::{task::JoinSet, time::timeout};
10use uuid::Uuid;
11
12pub async fn item_get(id: Uuid, key: Bytes) -> Result<Vec<Option<Bytes>>> {
16 if id.get_version().context("invalid table id")? != uuid::Version::SortRand {
17 bail!("table id has invalid version")
18 }
19
20 let table = Table::load(id).await?.context("table not found")?;
21 let TableStatus::Created(table) = table.status else {
22 bail!("table is not created")
23 };
24
25 let mut set = JoinSet::new();
26 for peer in table.peers_for_key(&key) {
27 let rpc = ItemGet {
28 table: id,
29 key: key.clone(),
30 };
31 set.spawn(keep_peer(peer, timeout(TIMEOUT, rpc.exec(peer))));
32 }
33
34 let mut successes = 0;
35 let mut results = Vec::new();
36 while let Some(res) = set.join_next().await {
37 match res.expect("join error") {
38 (_, Ok(Ok(Ok(res)))) => {
39 successes += 1;
40 results.push(res);
41 }
42 (Peer { addr, .. }, Ok(Ok(Err(store_error)))) => {
43 log::error!("GET: store error on {addr}, {store_error}")
44 }
45 (Peer { addr, .. }, Ok(Err(rpc_error))) => {
46 log::error!("GET: rpc error on {addr}, {rpc_error}")
47 }
48 (Peer { addr, .. }, Err(timeout_error)) => {
49 log::error!("GET: timeout error on {addr}, {timeout_error}")
50 }
51 }
52
53 if successes >= table.params.r {
54 set.abort_all();
55 break;
56 }
57 }
58
59 Ok(results)
60}
61
62pub async fn item_set(id: Uuid, key: Bytes, value: Bytes) -> Result<()> {
66 if id.get_version().context("invalid table id")? != uuid::Version::SortRand {
67 bail!("table id has invalid version")
68 }
69
70 let table = Table::load(id).await?.context("table not found")?;
71 let TableStatus::Created(table) = table.status else {
72 bail!("table is not created")
73 };
74
75 let mut set = JoinSet::new();
76 for peer in table.peers_for_key(&key) {
77 let rpc = ItemSet {
78 table: id,
79 key: key.clone(),
80 value: value.clone(),
81 };
82 set.spawn(keep_peer(peer, timeout(TIMEOUT, rpc.exec(peer))));
83 }
84
85 let mut successes = 0;
86 while let Some(res) = set.join_next().await {
87 match res.expect("Join error") {
88 (_, Ok(Ok(Ok(())))) => successes += 1,
89 (Peer { addr, .. }, Ok(Ok(Err(store_error)))) => {
90 log::error!("SET: store error on {addr}, {store_error}")
91 }
92 (Peer { addr, .. }, Ok(Err(rpc_error))) => {
93 log::error!("SET: rpc error on {addr}, {rpc_error}")
94 }
95 (Peer { addr, .. }, Err(timeout_error)) => {
96 log::error!("SET: timeout error on {addr}, {timeout_error}")
97 }
98 }
99
100 if successes >= table.params.w {
101 set.detach_all();
102 return Ok(());
103 }
104 }
105
106 bail!(
107 "Failed to write to {} nodes, only {successes} succeeded.",
108 table.params.w
109 )
110}
111
112pub async fn item_list(table_id: Uuid) -> Result<Vec<(String, Vec<(Vec<u8>, Vec<u8>)>)>> {
117 let table = Table::load(table_id).await?.context("table not found")?;
118 let TableStatus::Created(table) = table.status else {
119 bail!("table is not created")
120 };
121
122 let mut set = JoinSet::new();
123 for peer in table.peers() {
124 set.spawn(keep_peer(
125 peer,
126 timeout(TIMEOUT, ItemList { table: table_id }.exec(peer)),
127 ));
128 }
129
130 let mut data = Vec::new();
131 while let Some(res) = set.join_next().await {
132 let (peer, res) = res.expect("Join error");
133 let res = match res {
134 Ok(Ok(Ok(res))) => res,
135 _ => Vec::new(),
136 };
137 data.push((peer.addr.clone(), res));
138 }
139
140 Ok(data)
141}
142
143#[derive(Debug, Serialize, Deserialize)]
144pub struct ItemGet {
145 table: Uuid,
146 key: Bytes,
147}
148
149impl Rpc for ItemGet {
150 type Request = Operations;
151 type Response = Result<Option<Bytes>, state::Error>;
152
153 async fn handle(self) -> Result<Self::Response> {
154 Ok(state::item_get(self.table, &self.key).await)
155 }
156}
157
158#[derive(Debug, Serialize, Deserialize)]
159pub struct ItemSet {
160 table: Uuid,
161 key: Bytes,
162 value: Bytes,
163}
164
165impl Rpc for ItemSet {
166 type Request = Operations;
167 type Response = Result<(), state::Error>;
168
169 async fn handle(self) -> Result<Self::Response> {
170 Ok(state::item_set(self.table, &self.key, &self.value).await)
171 }
172}
173
174#[derive(Debug, Serialize, Deserialize)]
175pub struct ItemList {
176 table: Uuid,
177}
178
179impl Rpc for ItemList {
180 type Request = Operations;
181 type Response = Result<Vec<(Vec<u8>, Vec<u8>)>, state::Error>;
182
183 async fn handle(self) -> Result<Self::Response> {
184 Ok(state::item_list(self.table).await)
185 }
186}