1use crate::{
2 exec::{Operations, Rpc, Table, TableData, TableParams, TableStatus},
3 peer::{self, availability_zone, local_index},
4 state, PREPARE_TIME, TIMEOUT,
5};
6use anyhow::bail;
7use serde::{Deserialize, Serialize};
8use std::num::NonZeroU64;
9use std::{collections::BTreeMap, sync::LazyLock};
10use tokio::time::timeout;
11use tokio::{
12 sync::Mutex,
13 task::{JoinHandle, JoinSet},
14 time::sleep,
15};
16use uuid::Uuid;
17
18static PENDING: LazyLock<Mutex<BTreeMap<Uuid, JoinHandle<()>>>> =
22 LazyLock::new(|| Mutex::new(BTreeMap::new()));
23
24#[derive(thiserror::Error, Debug, Serialize, Deserialize)]
26pub enum Error {
27 #[error("state error")]
28 State(#[from] state::Error),
29}
30
31pub async fn table_create(
36 params @ TableParams { mut b, n, r, w }: TableParams,
37) -> anyhow::Result<Uuid> {
38 let id = Uuid::now_v7();
39 log::info!("{id} creating with b={b}, n={n}, r={r}, w={w}");
40
41 let mut allocation_zone = BTreeMap::new();
42 let mut allocation_peer = BTreeMap::new();
43 let mut allocated = 0;
44 b *= n;
45 for (index, peer) in peer::peers().iter().enumerate() {
46 let rpc = TablePrepare { id, request: b / n };
47 let (zone, available) = match timeout(TIMEOUT, rpc.exec(peer)).await {
48 Ok(Ok(ret)) => ret?,
49 Ok(Err(e)) => {
50 log::warn!("{e}");
51 continue;
52 }
53 Err(e) => {
54 log::warn!("{e}");
55 continue;
56 }
57 };
58 log::debug!("{id} {zone} @ {} proposed {available}", peer.addr);
59
60 let zone_remaining = (b / n).saturating_sub(*allocation_zone.get(&zone).unwrap_or(&0));
61 let available = available.min(zone_remaining);
62 log::debug!(
63 "{id} {zone} @ {} will take {available}/{zone_remaining}",
64 peer.addr
65 );
66 if available == 0 {
67 continue;
68 }
69
70 allocation_zone
71 .entry(zone.clone())
72 .and_modify(|v| *v += available)
73 .or_insert(available);
74 allocation_peer.insert((zone, index as u64), available);
75 allocated += available;
76 log::debug!("{id} allocated {allocated}/{b}");
77
78 if allocated >= b {
79 break;
80 }
81 }
82
83 log::info!(
84 "{id}: found {} peers with a total of {allocated} bandwidth",
85 allocation_peer.len()
86 );
87
88 if allocated < b {
89 for (_, index) in allocation_peer.keys() {
90 let rpc = TableDelete { id };
91 let peer = &peer::peers()[*index as usize];
92 tokio::spawn(rpc.exec(peer));
93 }
94 bail!("Could not allocate enough bandwidth.");
95 }
96
97 for x in allocation_peer.values_mut() {
98 let y = *x * b / allocated;
99 b -= y;
100 allocated -= *x;
101 *x = y;
102 }
103
104 let data = TableData {
105 allocation: allocation_peer
106 .iter()
107 .filter_map(|(&(_, p), &b)| NonZeroU64::new(b).map(|x| (p, x)))
108 .collect(),
109 params,
110 };
111
112 let mut set = JoinSet::new();
113 for &index in data.allocation.keys() {
114 let rpc = TableCommit {
115 id,
116 table: data.clone(),
117 };
118 let peer = &peer::peers()[index as usize];
119 set.spawn(timeout(TIMEOUT, rpc.exec(peer)));
120 }
121 let results = set.join_all().await;
122 log::debug!("commit results {results:?}");
123 if results.iter().flatten().any(Result::is_err) {
124 for &index in data.allocation.keys() {
125 let rpc = TableDelete { id };
126 let peer = &peer::peers()[index as usize];
127 tokio::spawn(rpc.exec(peer));
128 }
129 bail!("could not commit table");
130 }
131
132 if !data.allocation.contains_key(&(local_index() as u64)) {
133 Table {
134 id,
135 status: TableStatus::Created(data),
136 }
137 .save()
138 .await?;
139 }
140
141 Ok(id)
142}
143
144#[derive(Debug, Serialize, Deserialize)]
148pub struct TablePrepare {
149 id: Uuid,
150 request: u64,
151}
152
153impl Rpc for TablePrepare {
154 type Request = Operations;
155 type Response = Result<(String, u64), Error>;
156
157 async fn handle(self) -> anyhow::Result<Self::Response> {
164 async fn inner(rpc: TablePrepare) -> Result<(String, u64), Error> {
165 log::info!("{} prepare", rpc.id);
166 let proposed = Table::prepare(rpc.id, rpc.request)
167 .await?
168 .map_or(0, NonZeroU64::get);
169
170 PENDING.lock().await.insert(
171 rpc.id,
172 tokio::spawn(async move {
173 sleep(PREPARE_TIME).await;
174
175 Table::delete_if_prepared(rpc.id)
176 .await
177 .expect("could not delete table");
178
179 PENDING.lock().await.remove(&rpc.id);
180 }),
181 );
182
183 Ok((availability_zone().to_owned(), proposed))
184 }
185
186 Ok(inner(self).await)
187 }
188}
189
190#[derive(Debug, Serialize, Deserialize)]
194pub struct TableCommit {
195 id: Uuid,
196 table: TableData,
197}
198
199impl Rpc for TableCommit {
200 type Request = Operations;
201 type Response = Result<(), Error>;
202
203 async fn handle(self) -> anyhow::Result<Self::Response> {
207 async fn inner(rpc: TableCommit) -> Result<(), Error> {
208 log::info!("{} commit", rpc.id);
209 Table::commit(rpc.id, rpc.table).await?;
210
211 if let Some(task) = PENDING.lock().await.remove(&rpc.id) {
212 task.abort();
213 }
214 Ok(())
215 }
216
217 Ok(inner(self).await)
218 }
219}
220
221#[derive(Debug, Serialize, Deserialize)]
225pub struct TableDelete {
226 id: Uuid,
227}
228
229pub async fn table_delete(id: Uuid) -> anyhow::Result<()> {
231 log::info!("{id} delete");
232
233 let table = Table::load(id).await?;
234 let Some(table) = table else {
235 bail!("table not found");
236 };
237 let TableStatus::Created(data) = table.status else {
238 bail!("Invalid table");
239 };
240
241 let mut set = JoinSet::new();
242 for &peer_id in data.allocation.keys() {
243 let rpc = TableDelete { id };
244 set.spawn(rpc.exec(&peer::peers()[peer_id as usize]));
245 }
246 set.join_all().await;
248
249 Table::delete(id).await?;
250 Ok(())
251}
252
253impl Rpc for TableDelete {
254 type Request = Operations;
255 type Response = Result<(), Error>;
256
257 async fn handle(self) -> anyhow::Result<Self::Response> {
261 Ok(Table::delete(self.id).await.map_err(Into::into))
262 }
263}