1pub mod gossip;
5pub mod item;
6pub mod table;
7
8use crate::{
9 exec::{
10 item::{ItemGet, ItemList, ItemSet},
11 table::{TableCommit, TableDelete, TablePrepare},
12 },
13 peer::Peer,
14 state::{Table, TableData, TableParams, TableStatus},
15};
16use anyhow::Result;
17use axum::body::Body;
18use bytes::Bytes;
19use derive_more::From;
20use gossip::{GossipFind, GossipSync};
21use http::{Request, Response};
22use http_body_util::{BodyExt, Full};
23use hyper::{client::conn::http2::SendRequest, server::conn::http2, service::service_fn};
24use hyper_util::rt::{TokioExecutor, TokioIo};
25use serde::{de::DeserializeOwned, Deserialize, Serialize};
26use std::fmt::Debug;
27use std::sync::atomic::{AtomicUsize, Ordering};
28use std::sync::RwLock as StdRwLock;
29use std::{collections::HashMap, future::Future, sync::LazyLock};
30use tokio::net::{TcpListener, TcpStream};
31use tokio_util::sync::CancellationToken;
32
33async fn keep_peer<O>(peer: &Peer, task: impl Future<Output = O>) -> (&Peer, O) {
35 (peer, task.await)
36}
37
38#[derive(Debug, Serialize, Deserialize, From)]
40pub enum Operations {
41 ItemGet(ItemGet),
42 ItemSet(ItemSet),
43 ItemList(ItemList),
44 TablePrepare(TablePrepare),
45 TableCommit(TableCommit),
46 TableDelete(TableDelete),
47 GossipSync(GossipSync),
48 GossipFind(GossipFind),
49}
50
51impl Operations {
52 #[allow(dead_code)]
54 fn name(&self) -> &'static str {
55 match self {
56 Operations::ItemGet(_) => "item-get",
57 Operations::ItemSet(_) => "item-set",
58 Operations::ItemList(_) => "item-list",
59 Operations::TablePrepare(_) => "table-prepare",
60 Operations::TableCommit(_) => "table-commit",
61 Operations::TableDelete(_) => "table-delete",
62 Operations::GossipSync(_) => "gossip-sync",
63 Operations::GossipFind(_) => "gossip-find",
64 }
65 }
66
67 pub async fn listener(listener: TcpListener, token: CancellationToken) -> Result<()> {
71 async fn recv(req: Request<hyper::body::Incoming>) -> Result<Response<Body>> {
72 let body = req.into_body();
73 let data = body.collect().await?.to_bytes();
74 let variant: Operations = postcard::from_bytes(&data)?;
75 let body = match variant {
78 Operations::ItemGet(get) => get.remote().await?,
79 Operations::ItemSet(set) => set.remote().await?,
80 Operations::ItemList(list) => list.remote().await?,
81 Operations::TablePrepare(prepare) => prepare.remote().await?,
82 Operations::TableCommit(commit) => commit.remote().await?,
83 Operations::TableDelete(delete) => delete.remote().await?,
84 Operations::GossipSync(sync) => sync.remote().await?,
85 Operations::GossipFind(find) => find.remote().await?,
86 };
87
88 Ok(Response::builder().status(200).body(body)?)
89 }
90
91 loop {
92 let result = tokio::select! {
93 _ = token.cancelled() => break,
94 result = listener.accept() => result,
95 };
96 let (stream, _peer) = result?;
97 stream.set_nodelay(true)?;
98 let io = TokioIo::new(stream);
99
100 tokio::spawn(async move {
101 if let Err(err) = http2::Builder::new(TokioExecutor::new())
102 .serve_connection(io, service_fn(recv))
103 .await
104 {
105 log::warn!("Error serving connection: {:?}", err);
106 }
107 });
108 }
109
110 Ok(())
111 }
112}
113
114type Connection = SendRequest<Full<Bytes>>;
115
116struct ConnectionQueue {
117 current: AtomicUsize,
118 queue: Vec<Connection>,
119}
120
121impl ConnectionQueue {
122 fn next(&self) -> Connection {
123 self.queue[self.current.fetch_add(1, Ordering::Relaxed) % self.queue.len()].clone()
124 }
125}
126
127static CONN_CACHE: LazyLock<StdRwLock<HashMap<String, ConnectionQueue>>> =
128 LazyLock::new(|| StdRwLock::new(HashMap::new()));
129
130async fn get_conn(peer: &str) -> Result<SendRequest<Full<Bytes>>> {
131 {
132 let cache = CONN_CACHE.read().expect("not poisoned");
133 if let Some(conn_queue) = cache.get(peer) {
134 let conn = conn_queue.next();
135 if !conn.is_closed() {
136 return Ok(conn);
137 }
138 }
139 }
140
141 let mut queue = vec![];
142 for _ in 0..20 {
143 let stream = TcpStream::connect(peer).await?;
144 let io = TokioIo::new(stream);
145
146 let (sender, connection) =
148 hyper::client::conn::http2::handshake(TokioExecutor::new(), io).await?;
149
150 tokio::spawn(async move {
152 if let Err(err) = connection.await {
153 log::warn!("Connection error: {:?}", err);
154 }
155 });
156
157 queue.push(sender);
158 }
159
160 let mut cache = CONN_CACHE.write().expect("poisoned");
161 let conn_queue = ConnectionQueue {
162 queue,
163 current: AtomicUsize::new(0),
164 };
165 let conn = conn_queue.next();
166 cache.insert(peer.to_owned(), conn_queue);
167 Ok(conn)
168}
169
170pub trait Rpc: Serialize + DeserializeOwned {
171 type Request: Serialize + DeserializeOwned + From<Self>;
172 type Response: Serialize + DeserializeOwned;
173
174 async fn exec(self, peer: &Peer) -> Result<Self::Response> {
175 if peer.is_local() {
176 self.handle().await
177 } else {
178 let mut sender = get_conn(&peer.addr).await?;
179
180 let req: Self::Request = self.into();
181 let bytes = postcard::to_allocvec(&req)?;
182 let req = Request::builder().body(Full::new(Bytes::from(bytes)))?;
183
184 let resp = sender.send_request(req).await?;
185 let data = resp.into_body().collect().await?.to_bytes();
186 Ok(postcard::from_bytes(&data)?)
187 }
188 }
189
190 async fn handle(self) -> Result<Self::Response>;
192
193 async fn remote(self) -> Result<Body> {
196 let result = self.handle().await?;
197 let bytes = postcard::to_allocvec(&result)?;
198 Ok(Body::new(Full::new(Bytes::from(bytes))))
199 }
200}