ktd/state/
mod.rs

1//! Handles table metadata, exposing methods to load, save and list metadata.
2
3mod bandwidth;
4mod store;
5
6use crate::state::Error::TooMuchBandwidth;
7use crate::{
8    merkle::{self, Merkle},
9    peer::{self, local_index, Peer},
10};
11use bytes::Bytes;
12use clap::Parser;
13use serde::{Deserialize, Serialize};
14use std::num::NonZeroU64;
15use std::sync::RwLock as StdRwLock;
16use std::{
17    collections::BTreeMap,
18    sync::atomic::{AtomicU64, Ordering},
19};
20use std::{collections::HashMap, sync::LazyLock};
21use store::KeyValue;
22use tokio::{
23    sync::{Mutex, RwLock},
24    time::Instant,
25};
26use uuid::Uuid;
27use xxhash_rust::xxh3::xxh3_64;
28
29const META: Uuid = Uuid::new_v8(*b"kitsuraimetadata");
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "lowercase")]
33pub enum TableStatus {
34    Prepared { allocated: NonZeroU64 },
35    Created(TableData),
36    Deleted,
37}
38
39impl TableStatus {
40    fn age(&self) -> usize {
41        match self {
42            TableStatus::Prepared { .. } => 1,
43            TableStatus::Created(_) => 2,
44            TableStatus::Deleted => 3,
45        }
46    }
47
48    fn local_bandwidth(&self) -> u64 {
49        match self {
50            &TableStatus::Prepared { allocated } => allocated.get(),
51            TableStatus::Created(TableData { allocation, .. }) => allocation
52                .get(&(local_index() as u64))
53                .copied()
54                .map_or(0, NonZeroU64::get),
55            TableStatus::Deleted => 0,
56        }
57    }
58}
59
60#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
61pub struct TableParams {
62    pub b: u64,
63    pub n: u64,
64    pub r: u64,
65    pub w: u64,
66}
67
68#[derive(Serialize, Deserialize, Debug, Clone)]
69pub struct TableData {
70    pub allocation: BTreeMap<u64, NonZeroU64>,
71    pub params: TableParams,
72}
73
74impl TableData {
75    /// Return the peer on position `virt` on the hash ring.
76    fn virt_to_peer(&self, virt: u64) -> &'static Peer {
77        let TableParams { b, n, .. } = self.params;
78        let ord = virt / n + virt % n * b;
79        let index = self
80            .allocation
81            .iter()
82            .scan(0, |acc, (&k, &v)| {
83                *acc += v.get();
84                Some((k, *acc))
85            })
86            .find(|&(_, acc)| acc > ord)
87            .expect("index should be less than sigma")
88            .0;
89
90        &peer::peers()[index as usize]
91    }
92
93    pub fn peers(&self) -> impl Iterator<Item = &'static Peer> + use<'_> {
94        self.allocation.keys().map(|&x| &peer::peers()[x as usize])
95    }
96
97    /// Return an iterator over the peers that are responsible for the given key.
98    pub fn peers_for_key(&self, key: &[u8]) -> impl Iterator<Item = &'static Peer> + use<'_> {
99        let hash = xxh3_64(key);
100        let TableParams { b, n, .. } = self.params;
101        let virt = ((hash as u128 * (b * n) as u128) >> 64) as u64;
102        (0..n).map(move |i| self.virt_to_peer(virt + i))
103    }
104}
105
106static LOCK: LazyLock<RwLock<()>> = LazyLock::new(|| RwLock::new(()));
107static MERKLE: LazyLock<Mutex<Merkle>> = LazyLock::new(|| Mutex::new(Merkle::new()));
108static SCHED_BASE_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
109static SCHED_AVAIL_AT: LazyLock<StdRwLock<HashMap<Uuid, AtomicU64>>> =
110    LazyLock::new(|| StdRwLock::new(HashMap::new()));
111static TABLE_CACHE: LazyLock<StdRwLock<Table>> = LazyLock::new(|| {
112    StdRwLock::new(Table {
113        id: Uuid::nil(),
114        status: TableStatus::Deleted,
115    })
116});
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct Table {
120    pub id: Uuid,
121    pub status: TableStatus,
122}
123
124#[derive(thiserror::Error, Debug, Serialize, Deserialize, Clone)]
125pub enum Error {
126    #[error("table not found")]
127    NotFound,
128    #[error("table exists")]
129    Exists,
130    #[error("table not prepared")]
131    NotPrepared,
132    #[error("expired table")]
133    Expired,
134    #[error("bandwidth error")]
135    TooMuchBandwidth,
136    #[error("not a growing save")]
137    NotGrowing,
138    #[error("store error")]
139    Store(#[from] store::Error),
140}
141
142impl Table {
143    pub async fn list() -> Result<Vec<Self>, Error> {
144        let _guard = LOCK.read().await;
145
146        log::debug!("list");
147        let tables = store::item_list(META)
148            .await?
149            .into_iter()
150            .map(|(key, value)| Self {
151                id: Uuid::from_slice(&key).expect("id deserialization failed"),
152                status: postcard::from_bytes(value.as_ref()).expect("table deserialization failed"),
153            })
154            .collect();
155
156        Ok(tables)
157    }
158
159    async fn locked_load(id: Uuid) -> Result<Option<Self>, Error> {
160        log::debug!("{id} load");
161        assert!(LOCK.try_write().is_err());
162
163        {
164            let cache = TABLE_CACHE.read().expect("poisoned");
165            if cache.id == id {
166                return Ok(Some(cache.clone()));
167            }
168        }
169        let blob = store::item_get(META, id.as_bytes()).await?;
170        let table = blob.map(|blob| Self {
171            id,
172            status: postcard::from_bytes(blob.as_ref()).expect("table deserialization failed"),
173        });
174        {
175            if let Some(table) = &table {
176                *TABLE_CACHE.write().expect("poisoned") = table.clone();
177            }
178        }
179        Ok(table)
180    }
181
182    pub async fn load(id: Uuid) -> Result<Option<Self>, Error> {
183        let _guard = LOCK.read().await;
184        Self::locked_load(id).await
185    }
186
187    async fn locked_save(&self) -> Result<(), Error> {
188        log::debug!("{} save", self.id);
189        assert!(LOCK.try_read().is_err());
190
191        let blob = postcard::to_allocvec(&self.status).expect("serialization failed");
192        store::item_set(META, self.id.as_bytes(), &blob).await?;
193
194        match &self.status {
195            TableStatus::Prepared { .. } => {}
196            TableStatus::Created(_) => {
197                MERKLE.lock().await.insert(self.id.to_u128_le(), &blob);
198                SCHED_AVAIL_AT
199                    .write()
200                    .expect("poisoned")
201                    .insert(self.id, AtomicU64::new(0));
202            }
203            TableStatus::Deleted => {
204                MERKLE.lock().await.insert(self.id.to_u128_le(), &blob);
205                SCHED_AVAIL_AT.write().expect("poisoned").remove(&self.id);
206            }
207        }
208
209        // Update cache.
210        {
211            *TABLE_CACHE.write().expect("poisoned") = self.clone();
212        }
213
214        Ok(())
215    }
216
217    pub async fn prepare(id: Uuid, requested: u64) -> Result<Option<NonZeroU64>, Error> {
218        log::debug!("{id} prepare for {requested}");
219        let _guard = LOCK.write().await;
220
221        let table = Self::locked_load(id).await?;
222        if table.is_some() {
223            return Err(Error::Exists);
224        }
225
226        let Some(proposed) = NonZeroU64::new(bandwidth::alloc(requested)) else {
227            return Ok(None);
228        };
229
230        Table {
231            id,
232            status: TableStatus::Prepared {
233                allocated: proposed,
234            },
235        }
236        .locked_save()
237        .await?;
238
239        Ok(Some(proposed))
240    }
241
242    pub async fn commit(id: Uuid, data: TableData) -> Result<(), Error> {
243        log::debug!("{id} commit");
244        let _guard = LOCK.write().await;
245
246        let table = Self::locked_load(id).await?.ok_or(Error::NotPrepared)?;
247
248        let TableStatus::Prepared {
249            allocated: pre_bandwidth,
250        } = table.status
251        else {
252            return Err(Error::Expired);
253        };
254
255        let post_bandwidth = data
256            .allocation
257            .get(&(local_index() as u64))
258            .copied()
259            .map_or(0, NonZeroU64::get);
260
261        if post_bandwidth > pre_bandwidth.get() {
262            return Err(Error::TooMuchBandwidth);
263        }
264        bandwidth::free(pre_bandwidth.get() - post_bandwidth);
265
266        Table {
267            id,
268            status: TableStatus::Created(data),
269        }
270        .locked_save()
271        .await?;
272
273        Ok(())
274    }
275
276    pub async fn delete_if_prepared(id: Uuid) -> Result<Table, Error> {
277        log::debug!("{id} delete_if_prepared");
278        let _guard = LOCK.write().await;
279
280        let mut table = Self::locked_load(id)
281            .await
282            .ok()
283            .flatten()
284            .expect("table should exists");
285
286        if let TableStatus::Prepared { allocated } = table.status {
287            table.status = TableStatus::Deleted;
288            table.locked_save().await?;
289            bandwidth::free(allocated.get());
290        }
291        Ok(table)
292    }
293
294    pub async fn delete(id: Uuid) -> Result<(), Error> {
295        log::debug!("{id} delete");
296        let _guard = LOCK.write().await;
297
298        let mut table = Self::locked_load(id)
299            .await
300            .ok()
301            .flatten()
302            .expect("table should exists");
303
304        let allocated = table.status.local_bandwidth();
305        table.status = TableStatus::Deleted;
306        table.locked_save().await?;
307        bandwidth::free(allocated);
308        store::table_delete(id).await?;
309        Ok(())
310    }
311
312    pub async fn save(&self) -> Result<(), Error> {
313        log::debug!("{} save", self.id);
314        let _guard = LOCK.write().await;
315
316        let current = Self::locked_load(self.id).await.ok().flatten();
317        if current.as_ref().map_or(0, |t| t.status.age()) >= self.status.age() {
318            return Err(Error::NotGrowing);
319        }
320
321        let pre_bandwidth = current.map_or(0, |t| t.status.local_bandwidth());
322        let post_bandwidth = self.status.local_bandwidth();
323        if pre_bandwidth > post_bandwidth {
324            bandwidth::free(pre_bandwidth - post_bandwidth);
325        } else {
326            let diff = post_bandwidth - pre_bandwidth;
327            let allocated = bandwidth::alloc(diff);
328            if diff != allocated {
329                bandwidth::free(allocated);
330                return Err(TooMuchBandwidth);
331            };
332        }
333
334        self.locked_save().await?;
335
336        Ok(())
337    }
338}
339
340#[derive(Debug, Parser)]
341pub struct StateCli {
342    /// Configuration for the storage backend component.
343    #[clap(flatten)]
344    store_cli: store::StoreCli,
345
346    /// Available _"bandwidth"_ for this peer.
347    /// See [BANDWIDTH] for more details.
348    #[arg(short, long, default_value = "100")]
349    bandwidth: u64,
350}
351
352pub async fn init(cli: StateCli) {
353    log::info!("Initialize state");
354    store::init(cli.store_cli);
355    bandwidth::init(cli.bandwidth);
356
357    LazyLock::force(&SCHED_BASE_TIME);
358
359    let mut merkle = MERKLE.lock().await;
360
361    for table in Table::list().await.expect("could not get table list") {
362        let table = Table::delete_if_prepared(table.id).await.unwrap();
363
364        let requested = table.status.local_bandwidth();
365        if bandwidth::alloc(requested) != requested {
366            panic!("Overflowed bandwidth on initialization. Database may be corrupted?");
367        }
368
369        let data = postcard::to_allocvec(&table.status).unwrap();
370        merkle.insert(table.id.to_u128_le(), &data);
371
372        if matches!(table.status, TableStatus::Created(_)) {
373            let mut available = SCHED_AVAIL_AT.write().expect("poisoned");
374            available.insert(table.id, AtomicU64::new(0));
375        }
376    }
377}
378
379async fn sched_wait(table: &Table) {
380    let bandwidth = table.status.local_bandwidth() as u32;
381
382    let instant = {
383        let available = SCHED_AVAIL_AT.read().expect("poisoned");
384        let available = available
385            .get(&table.id)
386            .expect("table should be registered");
387
388        let filter = SCHED_BASE_TIME.elapsed() - tokio::time::Duration::from_secs(1) / 10;
389        let filter = (filter * bandwidth).as_secs();
390        available.fetch_max(filter, Ordering::Relaxed);
391
392        let at = available.fetch_add(1, Ordering::Relaxed);
393        *SCHED_BASE_TIME + tokio::time::Duration::from_secs(at) / bandwidth
394    };
395    tokio::time::sleep_until(instant).await;
396}
397
398pub async fn item_get(table_id: Uuid, key: &[u8]) -> Result<Option<Bytes>, Error> {
399    // let _guard = LOCK.read().await;
400    let table = Table::load(table_id).await.unwrap();
401    if !table
402        .as_ref()
403        .is_some_and(|t| matches!(t.status, TableStatus::Created(_)))
404    {
405        return Err(Error::NotFound);
406    }
407    sched_wait(table.as_ref().unwrap()).await;
408    Ok(store::item_get(table_id, key).await?)
409}
410
411pub async fn item_set(table_id: Uuid, key: &[u8], value: &[u8]) -> Result<(), Error> {
412    // let _guard = LOCK.read().await;
413    let table = Table::load(table_id).await.unwrap();
414    if !table
415        .as_ref()
416        .is_some_and(|t| matches!(t.status, TableStatus::Created(_)))
417    {
418        return Err(Error::NotFound);
419    }
420    sched_wait(table.as_ref().unwrap()).await;
421    Ok(store::item_set(table_id, key, value).await?)
422}
423
424pub async fn item_list(table_id: Uuid) -> Result<Vec<KeyValue>, Error> {
425    // let _guard = LOCK.read().await;
426    let table = Table::load(table_id).await.unwrap();
427    if !table.is_some_and(|t| matches!(t.status, TableStatus::Created(_))) {
428        return Err(Error::NotFound);
429    }
430    Ok(store::item_list(table_id).await?)
431}
432
433pub async fn merkle_find(path: merkle::Path) -> Option<(merkle::Path, u128)> {
434    MERKLE.lock().await.find(path)
435}