ktd/exec/
mod.rs

1//! Implements RPCs and uses them to execute actions on the cluster.<br>
2//! Exposes methods for client's usage.
3
4pub mod gossip;
5pub mod item;
6pub mod table;
7
8use crate::{
9    exec::{
10        item::{ItemGet, ItemList, ItemSet},
11        table::{TableCommit, TableDelete, TablePrepare},
12    },
13    peer::Peer,
14    state::{Table, TableData, TableParams, TableStatus},
15};
16use anyhow::Result;
17use axum::body::Body;
18use bytes::Bytes;
19use derive_more::From;
20use gossip::{GossipFind, GossipSync};
21use http::{Request, Response};
22use http_body_util::{BodyExt, Full};
23use hyper::{client::conn::http2::SendRequest, server::conn::http2, service::service_fn};
24use hyper_util::rt::{TokioExecutor, TokioIo};
25use serde::{de::DeserializeOwned, Deserialize, Serialize};
26use std::fmt::Debug;
27use std::sync::atomic::{AtomicUsize, Ordering};
28use std::sync::RwLock as StdRwLock;
29use std::{collections::HashMap, future::Future, sync::LazyLock};
30use tokio::net::{TcpListener, TcpStream};
31use tokio_util::sync::CancellationToken;
32
33/// Await the task (usually some RPC) and keep its peer to aid debugging.
34async fn keep_peer<O>(peer: &Peer, task: impl Future<Output = O>) -> (&Peer, O) {
35    (peer, task.await)
36}
37
38/// All known RPCs.
39#[derive(Debug, Serialize, Deserialize, From)]
40pub enum Operations {
41    ItemGet(ItemGet),
42    ItemSet(ItemSet),
43    ItemList(ItemList),
44    TablePrepare(TablePrepare),
45    TableCommit(TableCommit),
46    TableDelete(TableDelete),
47    GossipSync(GossipSync),
48    GossipFind(GossipFind),
49}
50
51impl Operations {
52    /// Returns the RPC name for debugging purposes.
53    #[allow(dead_code)]
54    fn name(&self) -> &'static str {
55        match self {
56            Operations::ItemGet(_) => "item-get",
57            Operations::ItemSet(_) => "item-set",
58            Operations::ItemList(_) => "item-list",
59            Operations::TablePrepare(_) => "table-prepare",
60            Operations::TableCommit(_) => "table-commit",
61            Operations::TableDelete(_) => "table-delete",
62            Operations::GossipSync(_) => "gossip-sync",
63            Operations::GossipFind(_) => "gossip-find",
64        }
65    }
66
67    /// Runs the RPC listener.
68    ///
69    /// Waits for incoming requests, deserializes them and executes the appropriate action.
70    pub async fn listener(listener: TcpListener, token: CancellationToken) -> Result<()> {
71        async fn recv(req: Request<hyper::body::Incoming>) -> Result<Response<Body>> {
72            let body = req.into_body();
73            let data = body.collect().await?.to_bytes();
74            let variant: Operations = postcard::from_bytes(&data)?;
75            // log::debug!("Request from {}: {}", stream.peer_addr()?, variant.name());
76
77            let body = match variant {
78                Operations::ItemGet(get) => get.remote().await?,
79                Operations::ItemSet(set) => set.remote().await?,
80                Operations::ItemList(list) => list.remote().await?,
81                Operations::TablePrepare(prepare) => prepare.remote().await?,
82                Operations::TableCommit(commit) => commit.remote().await?,
83                Operations::TableDelete(delete) => delete.remote().await?,
84                Operations::GossipSync(sync) => sync.remote().await?,
85                Operations::GossipFind(find) => find.remote().await?,
86            };
87
88            Ok(Response::builder().status(200).body(body)?)
89        }
90
91        loop {
92            let result = tokio::select! {
93                _ = token.cancelled() => break,
94                result = listener.accept() => result,
95            };
96            let (stream, _peer) = result?;
97            stream.set_nodelay(true)?;
98            let io = TokioIo::new(stream);
99
100            tokio::spawn(async move {
101                if let Err(err) = http2::Builder::new(TokioExecutor::new())
102                    .serve_connection(io, service_fn(recv))
103                    .await
104                {
105                    log::warn!("Error serving connection: {:?}", err);
106                }
107            });
108        }
109
110        Ok(())
111    }
112}
113
114type Connection = SendRequest<Full<Bytes>>;
115
116struct ConnectionQueue {
117    current: AtomicUsize,
118    queue: Vec<Connection>,
119}
120
121impl ConnectionQueue {
122    fn next(&self) -> Connection {
123        self.queue[self.current.fetch_add(1, Ordering::Relaxed) % self.queue.len()].clone()
124    }
125}
126
127static CONN_CACHE: LazyLock<StdRwLock<HashMap<String, ConnectionQueue>>> =
128    LazyLock::new(|| StdRwLock::new(HashMap::new()));
129
130async fn get_conn(peer: &str) -> Result<SendRequest<Full<Bytes>>> {
131    {
132        let cache = CONN_CACHE.read().expect("not poisoned");
133        if let Some(conn_queue) = cache.get(peer) {
134            let conn = conn_queue.next();
135            if !conn.is_closed() {
136                return Ok(conn);
137            }
138        }
139    }
140
141    let mut queue = vec![];
142    for _ in 0..20 {
143        let stream = TcpStream::connect(peer).await?;
144        let io = TokioIo::new(stream);
145
146        // Perform an HTTP/2 handshake over the TCP connection
147        let (sender, connection) =
148            hyper::client::conn::http2::handshake(TokioExecutor::new(), io).await?;
149
150        // Spawn the connection driver (handles incoming frames)
151        tokio::spawn(async move {
152            if let Err(err) = connection.await {
153                log::warn!("Connection error: {:?}", err);
154            }
155        });
156
157        queue.push(sender);
158    }
159
160    let mut cache = CONN_CACHE.write().expect("poisoned");
161    let conn_queue = ConnectionQueue {
162        queue,
163        current: AtomicUsize::new(0),
164    };
165    let conn = conn_queue.next();
166    cache.insert(peer.to_owned(), conn_queue);
167    Ok(conn)
168}
169
170pub trait Rpc: Serialize + DeserializeOwned {
171    type Request: Serialize + DeserializeOwned + From<Self>;
172    type Response: Serialize + DeserializeOwned;
173
174    async fn exec(self, peer: &Peer) -> Result<Self::Response> {
175        if peer.is_local() {
176            self.handle().await
177        } else {
178            let mut sender = get_conn(&peer.addr).await?;
179
180            let req: Self::Request = self.into();
181            let bytes = postcard::to_allocvec(&req)?;
182            let req = Request::builder().body(Full::new(Bytes::from(bytes)))?;
183
184            let resp = sender.send_request(req).await?;
185            let data = resp.into_body().collect().await?.to_bytes();
186            Ok(postcard::from_bytes(&data)?)
187        }
188    }
189
190    /// Implements the RPC on this node, automatically wrapped by the trait on remotes.
191    async fn handle(self) -> Result<Self::Response>;
192
193    /// Wraps the locally produced value into an HTTP Body.
194    /// Can be overridden to provide a faster path.
195    async fn remote(self) -> Result<Body> {
196        let result = self.handle().await?;
197        let bytes = postcard::to_allocvec(&result)?;
198        Ok(Body::new(Full::new(Bytes::from(bytes))))
199    }
200}