ktd/
main.rs

1mod exec;
2mod http;
3mod merkle;
4mod peer;
5mod state;
6
7use crate::exec::{gossip, Operations};
8use anyhow::Result;
9use clap::{arg, Parser};
10use std::{io::Write, time::Duration};
11use tokio::{net::TcpListener, signal::unix::SignalKind, try_join};
12use tokio_util::sync::CancellationToken;
13
14/// Time that an RPC may take to complete.
15///
16/// Any RPC with that does not respond completely within this specified time
17///  will be counted as a failure.
18///
19/// Defaults to 10 second.
20const TIMEOUT: Duration = Duration::from_secs(10);
21
22/// Time that a stored table can remain in the [Prepared](state::TableStatus::Prepared) state.
23///
24/// If it is not commited or deleted explicitly before this timer runs out
25///  the table will be automatically deleted.
26///
27/// Defaults to 60 seconds.
28const PREPARE_TIME: Duration = Duration::from_secs(60);
29
30/// `ktd`'s configuration and CLI interface.
31#[derive(Parser)]
32#[clap(author, version, about)]
33struct Cli {
34    /// The address where to bind the HTTP server to.
35    #[arg(long, default_value = "0.0.0.0:8000")]
36    http_addr: String,
37
38    /// The address where to bind the RPC server to.
39    #[arg(long, default_value = "0.0.0.0:3000")]
40    rpc_addr: String,
41
42    #[clap(flatten)]
43    state_cli: state::StateCli,
44
45    /// Configuration for peer discovery.
46    #[clap(flatten)]
47    peer_cli: peer::PeerCli,
48}
49
50/// Handles signals received by `ktd`, terminating cleanly.
51async fn killer(token: CancellationToken) -> Result<()> {
52    let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
53    let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
54    tokio::select! {
55        _ = sigint.recv() => log::info!("Received SIGINT."),
56        _ = sigterm.recv() => log::info!("Received SIGTERM."),
57    }
58    token.cancel();
59    Ok(())
60}
61
62/// Initializes the logger.
63///
64/// The following format is used: `RPC_ADDR LOG_LEVEL CALLEE | MESSAGE`<br>
65/// Usually the message will start with the table uuid.
66fn logger_init(rpc_addr: String) {
67    env_logger::Builder::from_default_env()
68        .format(move |buf, record| {
69            let s_addr = anstyle::Style::new().dimmed();
70
71            let s_lvl = buf.default_level_style(record.level());
72            let lvl = record.level();
73
74            let s_tgt = anstyle::Style::new().bold();
75            let tgt = record.target();
76
77            let args = record.args();
78
79            writeln!(
80                buf,
81                "{s_addr}{rpc_addr}{s_addr:#} {s_lvl}{lvl:5}{s_lvl:#} {s_tgt}{tgt:21}{s_tgt:#} | {args}"
82            )
83        })
84        .init();
85}
86
87/// Initializes all modules and starts all servers.
88#[tokio::main]
89async fn main() -> Result<()> {
90    // Parse configuration from the cli.
91    let Cli {
92        http_addr,
93        rpc_addr,
94        state_cli,
95        peer_cli,
96    } = Cli::parse();
97
98    logger_init(rpc_addr.clone());
99
100    // Bind the rpc address early.
101    let listener = TcpListener::bind(rpc_addr).await?;
102    // Initialize the static peer list, given our rpc address.
103    peer::init(peer_cli, listener.local_addr()?);
104    // Initialize the table metadata and gossip structures.
105    state::init(state_cli).await;
106
107    // Start the HTTP server and RPC server.
108    let token = CancellationToken::new();
109    let http = http::main(&http_addr, token.clone());
110    let rpc = Operations::listener(listener, token.clone());
111    let gossip = gossip::gossip(token.clone());
112    let killer = killer(token);
113    try_join!(http, rpc, gossip, killer)?;
114    Ok(())
115}