1mod bandwidth;
4mod store;
5
6use crate::state::Error::TooMuchBandwidth;
7use crate::{
8 merkle::{self, Merkle},
9 peer::{self, local_index, Peer},
10};
11use bytes::Bytes;
12use clap::Parser;
13use serde::{Deserialize, Serialize};
14use std::num::NonZeroU64;
15use std::sync::RwLock as StdRwLock;
16use std::{
17 collections::BTreeMap,
18 sync::atomic::{AtomicU64, Ordering},
19};
20use std::{collections::HashMap, sync::LazyLock};
21use store::KeyValue;
22use tokio::{
23 sync::{Mutex, RwLock},
24 time::Instant,
25};
26use uuid::Uuid;
27use xxhash_rust::xxh3::xxh3_64;
28
29const META: Uuid = Uuid::new_v8(*b"kitsuraimetadata");
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32#[serde(rename_all = "lowercase")]
33pub enum TableStatus {
34 Prepared { allocated: NonZeroU64 },
35 Created(TableData),
36 Deleted,
37}
38
39impl TableStatus {
40 fn age(&self) -> usize {
41 match self {
42 TableStatus::Prepared { .. } => 1,
43 TableStatus::Created(_) => 2,
44 TableStatus::Deleted => 3,
45 }
46 }
47
48 fn local_bandwidth(&self) -> u64 {
49 match self {
50 &TableStatus::Prepared { allocated } => allocated.get(),
51 TableStatus::Created(TableData { allocation, .. }) => allocation
52 .get(&(local_index() as u64))
53 .copied()
54 .map_or(0, NonZeroU64::get),
55 TableStatus::Deleted => 0,
56 }
57 }
58}
59
60#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
61pub struct TableParams {
62 pub b: u64,
63 pub n: u64,
64 pub r: u64,
65 pub w: u64,
66}
67
68#[derive(Serialize, Deserialize, Debug, Clone)]
69pub struct TableData {
70 pub allocation: BTreeMap<u64, NonZeroU64>,
71 pub params: TableParams,
72}
73
74impl TableData {
75 fn virt_to_peer(&self, virt: u64) -> &'static Peer {
77 let TableParams { b, n, .. } = self.params;
78 let ord = virt / n + virt % n * b;
79 let index = self
80 .allocation
81 .iter()
82 .scan(0, |acc, (&k, &v)| {
83 *acc += v.get();
84 Some((k, *acc))
85 })
86 .find(|&(_, acc)| acc > ord)
87 .expect("index should be less than sigma")
88 .0;
89
90 &peer::peers()[index as usize]
91 }
92
93 pub fn peers(&self) -> impl Iterator<Item = &'static Peer> + use<'_> {
94 self.allocation.keys().map(|&x| &peer::peers()[x as usize])
95 }
96
97 pub fn peers_for_key(&self, key: &[u8]) -> impl Iterator<Item = &'static Peer> + use<'_> {
99 let hash = xxh3_64(key);
100 let TableParams { b, n, .. } = self.params;
101 let virt = ((hash as u128 * (b * n) as u128) >> 64) as u64;
102 (0..n).map(move |i| self.virt_to_peer(virt + i))
103 }
104}
105
106static LOCK: LazyLock<RwLock<()>> = LazyLock::new(|| RwLock::new(()));
107static MERKLE: LazyLock<Mutex<Merkle>> = LazyLock::new(|| Mutex::new(Merkle::new()));
108static SCHED_BASE_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
109static SCHED_AVAIL_AT: LazyLock<StdRwLock<HashMap<Uuid, AtomicU64>>> =
110 LazyLock::new(|| StdRwLock::new(HashMap::new()));
111static TABLE_CACHE: LazyLock<StdRwLock<Table>> = LazyLock::new(|| {
112 StdRwLock::new(Table {
113 id: Uuid::nil(),
114 status: TableStatus::Deleted,
115 })
116});
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct Table {
120 pub id: Uuid,
121 pub status: TableStatus,
122}
123
124#[derive(thiserror::Error, Debug, Serialize, Deserialize, Clone)]
125pub enum Error {
126 #[error("table not found")]
127 NotFound,
128 #[error("table exists")]
129 Exists,
130 #[error("table not prepared")]
131 NotPrepared,
132 #[error("expired table")]
133 Expired,
134 #[error("bandwidth error")]
135 TooMuchBandwidth,
136 #[error("not a growing save")]
137 NotGrowing,
138 #[error("store error")]
139 Store(#[from] store::Error),
140}
141
142impl Table {
143 pub async fn list() -> Result<Vec<Self>, Error> {
144 let _guard = LOCK.read().await;
145
146 log::debug!("list");
147 let tables = store::item_list(META)
148 .await?
149 .into_iter()
150 .map(|(key, value)| Self {
151 id: Uuid::from_slice(&key).expect("id deserialization failed"),
152 status: postcard::from_bytes(value.as_ref()).expect("table deserialization failed"),
153 })
154 .collect();
155
156 Ok(tables)
157 }
158
159 async fn locked_load(id: Uuid) -> Result<Option<Self>, Error> {
160 log::debug!("{id} load");
161 assert!(LOCK.try_write().is_err());
162
163 {
164 let cache = TABLE_CACHE.read().expect("poisoned");
165 if cache.id == id {
166 return Ok(Some(cache.clone()));
167 }
168 }
169 let blob = store::item_get(META, id.as_bytes()).await?;
170 let table = blob.map(|blob| Self {
171 id,
172 status: postcard::from_bytes(blob.as_ref()).expect("table deserialization failed"),
173 });
174 {
175 if let Some(table) = &table {
176 *TABLE_CACHE.write().expect("poisoned") = table.clone();
177 }
178 }
179 Ok(table)
180 }
181
182 pub async fn load(id: Uuid) -> Result<Option<Self>, Error> {
183 let _guard = LOCK.read().await;
184 Self::locked_load(id).await
185 }
186
187 async fn locked_save(&self) -> Result<(), Error> {
188 log::debug!("{} save", self.id);
189 assert!(LOCK.try_read().is_err());
190
191 let blob = postcard::to_allocvec(&self.status).expect("serialization failed");
192 store::item_set(META, self.id.as_bytes(), &blob).await?;
193
194 match &self.status {
195 TableStatus::Prepared { .. } => {}
196 TableStatus::Created(_) => {
197 MERKLE.lock().await.insert(self.id.to_u128_le(), &blob);
198 SCHED_AVAIL_AT
199 .write()
200 .expect("poisoned")
201 .insert(self.id, AtomicU64::new(0));
202 }
203 TableStatus::Deleted => {
204 MERKLE.lock().await.insert(self.id.to_u128_le(), &blob);
205 SCHED_AVAIL_AT.write().expect("poisoned").remove(&self.id);
206 }
207 }
208
209 {
211 *TABLE_CACHE.write().expect("poisoned") = self.clone();
212 }
213
214 Ok(())
215 }
216
217 pub async fn prepare(id: Uuid, requested: u64) -> Result<Option<NonZeroU64>, Error> {
218 log::debug!("{id} prepare for {requested}");
219 let _guard = LOCK.write().await;
220
221 let table = Self::locked_load(id).await?;
222 if table.is_some() {
223 return Err(Error::Exists);
224 }
225
226 let Some(proposed) = NonZeroU64::new(bandwidth::alloc(requested)) else {
227 return Ok(None);
228 };
229
230 Table {
231 id,
232 status: TableStatus::Prepared {
233 allocated: proposed,
234 },
235 }
236 .locked_save()
237 .await?;
238
239 Ok(Some(proposed))
240 }
241
242 pub async fn commit(id: Uuid, data: TableData) -> Result<(), Error> {
243 log::debug!("{id} commit");
244 let _guard = LOCK.write().await;
245
246 let table = Self::locked_load(id).await?.ok_or(Error::NotPrepared)?;
247
248 let TableStatus::Prepared {
249 allocated: pre_bandwidth,
250 } = table.status
251 else {
252 return Err(Error::Expired);
253 };
254
255 let post_bandwidth = data
256 .allocation
257 .get(&(local_index() as u64))
258 .copied()
259 .map_or(0, NonZeroU64::get);
260
261 if post_bandwidth > pre_bandwidth.get() {
262 return Err(Error::TooMuchBandwidth);
263 }
264 bandwidth::free(pre_bandwidth.get() - post_bandwidth);
265
266 Table {
267 id,
268 status: TableStatus::Created(data),
269 }
270 .locked_save()
271 .await?;
272
273 Ok(())
274 }
275
276 pub async fn delete_if_prepared(id: Uuid) -> Result<Table, Error> {
277 log::debug!("{id} delete_if_prepared");
278 let _guard = LOCK.write().await;
279
280 let mut table = Self::locked_load(id)
281 .await
282 .ok()
283 .flatten()
284 .expect("table should exists");
285
286 if let TableStatus::Prepared { allocated } = table.status {
287 table.status = TableStatus::Deleted;
288 table.locked_save().await?;
289 bandwidth::free(allocated.get());
290 }
291 Ok(table)
292 }
293
294 pub async fn delete(id: Uuid) -> Result<(), Error> {
295 log::debug!("{id} delete");
296 let _guard = LOCK.write().await;
297
298 let mut table = Self::locked_load(id)
299 .await
300 .ok()
301 .flatten()
302 .expect("table should exists");
303
304 let allocated = table.status.local_bandwidth();
305 table.status = TableStatus::Deleted;
306 table.locked_save().await?;
307 bandwidth::free(allocated);
308 store::table_delete(id).await?;
309 Ok(())
310 }
311
312 pub async fn save(&self) -> Result<(), Error> {
313 log::debug!("{} save", self.id);
314 let _guard = LOCK.write().await;
315
316 let current = Self::locked_load(self.id).await.ok().flatten();
317 if current.as_ref().map_or(0, |t| t.status.age()) >= self.status.age() {
318 return Err(Error::NotGrowing);
319 }
320
321 let pre_bandwidth = current.map_or(0, |t| t.status.local_bandwidth());
322 let post_bandwidth = self.status.local_bandwidth();
323 if pre_bandwidth > post_bandwidth {
324 bandwidth::free(pre_bandwidth - post_bandwidth);
325 } else {
326 let diff = post_bandwidth - pre_bandwidth;
327 let allocated = bandwidth::alloc(diff);
328 if diff != allocated {
329 bandwidth::free(allocated);
330 return Err(TooMuchBandwidth);
331 };
332 }
333
334 self.locked_save().await?;
335
336 Ok(())
337 }
338}
339
340#[derive(Debug, Parser)]
341pub struct StateCli {
342 #[clap(flatten)]
344 store_cli: store::StoreCli,
345
346 #[arg(short, long, default_value = "100")]
349 bandwidth: u64,
350}
351
352pub async fn init(cli: StateCli) {
353 log::info!("Initialize state");
354 store::init(cli.store_cli);
355 bandwidth::init(cli.bandwidth);
356
357 LazyLock::force(&SCHED_BASE_TIME);
358
359 let mut merkle = MERKLE.lock().await;
360
361 for table in Table::list().await.expect("could not get table list") {
362 let table = Table::delete_if_prepared(table.id).await.unwrap();
363
364 let requested = table.status.local_bandwidth();
365 if bandwidth::alloc(requested) != requested {
366 panic!("Overflowed bandwidth on initialization. Database may be corrupted?");
367 }
368
369 let data = postcard::to_allocvec(&table.status).unwrap();
370 merkle.insert(table.id.to_u128_le(), &data);
371
372 if matches!(table.status, TableStatus::Created(_)) {
373 let mut available = SCHED_AVAIL_AT.write().expect("poisoned");
374 available.insert(table.id, AtomicU64::new(0));
375 }
376 }
377}
378
379async fn sched_wait(table: &Table) {
380 let bandwidth = table.status.local_bandwidth() as u32;
381
382 let instant = {
383 let available = SCHED_AVAIL_AT.read().expect("poisoned");
384 let available = available
385 .get(&table.id)
386 .expect("table should be registered");
387
388 let filter = SCHED_BASE_TIME.elapsed() - tokio::time::Duration::from_secs(1) / 10;
389 let filter = (filter * bandwidth).as_secs();
390 available.fetch_max(filter, Ordering::Relaxed);
391
392 let at = available.fetch_add(1, Ordering::Relaxed);
393 *SCHED_BASE_TIME + tokio::time::Duration::from_secs(at) / bandwidth
394 };
395 tokio::time::sleep_until(instant).await;
396}
397
398pub async fn item_get(table_id: Uuid, key: &[u8]) -> Result<Option<Bytes>, Error> {
399 let table = Table::load(table_id).await.unwrap();
401 if !table
402 .as_ref()
403 .is_some_and(|t| matches!(t.status, TableStatus::Created(_)))
404 {
405 return Err(Error::NotFound);
406 }
407 sched_wait(table.as_ref().unwrap()).await;
408 Ok(store::item_get(table_id, key).await?)
409}
410
411pub async fn item_set(table_id: Uuid, key: &[u8], value: &[u8]) -> Result<(), Error> {
412 let table = Table::load(table_id).await.unwrap();
414 if !table
415 .as_ref()
416 .is_some_and(|t| matches!(t.status, TableStatus::Created(_)))
417 {
418 return Err(Error::NotFound);
419 }
420 sched_wait(table.as_ref().unwrap()).await;
421 Ok(store::item_set(table_id, key, value).await?)
422}
423
424pub async fn item_list(table_id: Uuid) -> Result<Vec<KeyValue>, Error> {
425 let table = Table::load(table_id).await.unwrap();
427 if !table.is_some_and(|t| matches!(t.status, TableStatus::Created(_))) {
428 return Err(Error::NotFound);
429 }
430 Ok(store::item_list(table_id).await?)
431}
432
433pub async fn merkle_find(path: merkle::Path) -> Option<(merkle::Path, u128)> {
434 MERKLE.lock().await.find(path)
435}