ktd/exec/
table.rs

1use crate::{
2    exec::{Operations, Rpc, Table, TableData, TableParams, TableStatus},
3    peer::{self, availability_zone, local_index},
4    state, PREPARE_TIME, TIMEOUT,
5};
6use anyhow::bail;
7use serde::{Deserialize, Serialize};
8use std::num::NonZeroU64;
9use std::{collections::BTreeMap, sync::LazyLock};
10use tokio::time::timeout;
11use tokio::{
12    sync::Mutex,
13    task::{JoinHandle, JoinSet},
14    time::sleep,
15};
16use uuid::Uuid;
17
18/// The global list of handles to tables in the prepared state, _pending_ creation.
19///
20/// Will be used to cancel the deletion tasks whenever the creation is successful.
21static PENDING: LazyLock<Mutex<BTreeMap<Uuid, JoinHandle<()>>>> =
22    LazyLock::new(|| Mutex::new(BTreeMap::new()));
23
24/// All table errors.
25#[derive(thiserror::Error, Debug, Serialize, Deserialize)]
26pub enum Error {
27    #[error("state error")]
28    State(#[from] state::Error),
29}
30
31/// Allocates a table with the given parameters and returns its ID.
32///
33/// When this function returns the table will be ready to use from this node,
34///  but not necessarily from all nodes.
35pub async fn table_create(
36    params @ TableParams { mut b, n, r, w }: TableParams,
37) -> anyhow::Result<Uuid> {
38    let id = Uuid::now_v7();
39    log::info!("{id} creating with b={b}, n={n}, r={r}, w={w}");
40
41    let mut allocation_zone = BTreeMap::new();
42    let mut allocation_peer = BTreeMap::new();
43    let mut allocated = 0;
44    b *= n;
45    for (index, peer) in peer::peers().iter().enumerate() {
46        let rpc = TablePrepare { id, request: b / n };
47        let (zone, available) = match timeout(TIMEOUT, rpc.exec(peer)).await {
48            Ok(Ok(ret)) => ret?,
49            Ok(Err(e)) => {
50                log::warn!("{e}");
51                continue;
52            }
53            Err(e) => {
54                log::warn!("{e}");
55                continue;
56            }
57        };
58        log::debug!("{id} {zone} @ {} proposed {available}", peer.addr);
59
60        let zone_remaining = (b / n).saturating_sub(*allocation_zone.get(&zone).unwrap_or(&0));
61        let available = available.min(zone_remaining);
62        log::debug!(
63            "{id} {zone} @ {} will take {available}/{zone_remaining}",
64            peer.addr
65        );
66        if available == 0 {
67            continue;
68        }
69
70        allocation_zone
71            .entry(zone.clone())
72            .and_modify(|v| *v += available)
73            .or_insert(available);
74        allocation_peer.insert((zone, index as u64), available);
75        allocated += available;
76        log::debug!("{id} allocated {allocated}/{b}");
77
78        if allocated >= b {
79            break;
80        }
81    }
82
83    log::info!(
84        "{id}: found {} peers with a total of {allocated} bandwidth",
85        allocation_peer.len()
86    );
87
88    if allocated < b {
89        for (_, index) in allocation_peer.keys() {
90            let rpc = TableDelete { id };
91            let peer = &peer::peers()[*index as usize];
92            tokio::spawn(rpc.exec(peer));
93        }
94        bail!("Could not allocate enough bandwidth.");
95    }
96
97    for x in allocation_peer.values_mut() {
98        let y = *x * b / allocated;
99        b -= y;
100        allocated -= *x;
101        *x = y;
102    }
103
104    let data = TableData {
105        allocation: allocation_peer
106            .iter()
107            .filter_map(|(&(_, p), &b)| NonZeroU64::new(b).map(|x| (p, x)))
108            .collect(),
109        params,
110    };
111
112    let mut set = JoinSet::new();
113    for &index in data.allocation.keys() {
114        let rpc = TableCommit {
115            id,
116            table: data.clone(),
117        };
118        let peer = &peer::peers()[index as usize];
119        set.spawn(timeout(TIMEOUT, rpc.exec(peer)));
120    }
121    let results = set.join_all().await;
122    log::debug!("commit results {results:?}");
123    if results.iter().flatten().any(Result::is_err) {
124        for &index in data.allocation.keys() {
125            let rpc = TableDelete { id };
126            let peer = &peer::peers()[index as usize];
127            tokio::spawn(rpc.exec(peer));
128        }
129        bail!("could not commit table");
130    }
131
132    if !data.allocation.contains_key(&(local_index() as u64)) {
133        Table {
134            id,
135            status: TableStatus::Created(data),
136        }
137        .save()
138        .await?;
139    }
140
141    Ok(id)
142}
143
144/// The preparation RPC.
145///
146/// Used to request available bandwidth from a node.
147#[derive(Debug, Serialize, Deserialize)]
148pub struct TablePrepare {
149    id: Uuid,
150    request: u64,
151}
152
153impl Rpc for TablePrepare {
154    type Request = Operations;
155    type Response = Result<(String, u64), Error>;
156
157    /// The preparation RPC's main body.
158    ///
159    /// Computes available bandwidth and stores the prepared table.
160    /// Launches the deletion tasks to execute after [PREPARE_TIME].
161    ///
162    /// Returns this peer availability zone and proposed bandwidth for this table.
163    async fn handle(self) -> anyhow::Result<Self::Response> {
164        async fn inner(rpc: TablePrepare) -> Result<(String, u64), Error> {
165            log::info!("{} prepare", rpc.id);
166            let proposed = Table::prepare(rpc.id, rpc.request)
167                .await?
168                .map_or(0, NonZeroU64::get);
169
170            PENDING.lock().await.insert(
171                rpc.id,
172                tokio::spawn(async move {
173                    sleep(PREPARE_TIME).await;
174
175                    Table::delete_if_prepared(rpc.id)
176                        .await
177                        .expect("could not delete table");
178
179                    PENDING.lock().await.remove(&rpc.id);
180                }),
181            );
182
183            Ok((availability_zone().to_owned(), proposed))
184        }
185
186        Ok(inner(self).await)
187    }
188}
189
190/// The commit RPC.
191///
192/// Used to commit proposed bandwidth and create a table.
193#[derive(Debug, Serialize, Deserialize)]
194pub struct TableCommit {
195    id: Uuid,
196    table: TableData,
197}
198
199impl Rpc for TableCommit {
200    type Request = Operations;
201    type Response = Result<(), Error>;
202
203    /// The commit RPC's main body.
204    ///
205    /// Stores the table if it is compatible with what was proposed earlier.
206    async fn handle(self) -> anyhow::Result<Self::Response> {
207        async fn inner(rpc: TableCommit) -> Result<(), Error> {
208            log::info!("{} commit", rpc.id);
209            Table::commit(rpc.id, rpc.table).await?;
210
211            if let Some(task) = PENDING.lock().await.remove(&rpc.id) {
212                task.abort();
213            }
214            Ok(())
215        }
216
217        Ok(inner(self).await)
218    }
219}
220
221/// The table delete RPC.
222///
223/// Used to delete a table after some error occurs.
224#[derive(Debug, Serialize, Deserialize)]
225pub struct TableDelete {
226    id: Uuid,
227}
228
229/// Deletes the given table.
230pub async fn table_delete(id: Uuid) -> anyhow::Result<()> {
231    log::info!("{id} delete");
232
233    let table = Table::load(id).await?;
234    let Some(table) = table else {
235        bail!("table not found");
236    };
237    let TableStatus::Created(data) = table.status else {
238        bail!("Invalid table");
239    };
240
241    let mut set = JoinSet::new();
242    for &peer_id in data.allocation.keys() {
243        let rpc = TableDelete { id };
244        set.spawn(rpc.exec(&peer::peers()[peer_id as usize]));
245    }
246    // TODO: we don't really care here.
247    set.join_all().await;
248
249    Table::delete(id).await?;
250    Ok(())
251}
252
253impl Rpc for TableDelete {
254    type Request = Operations;
255    type Response = Result<(), Error>;
256
257    /// The delete RPC's main body.
258    ///
259    /// See [Table::delete] for more.
260    async fn handle(self) -> anyhow::Result<Self::Response> {
261        Ok(Table::delete(self.id).await.map_err(Into::into))
262    }
263}