ktd/
peer.rs

1//! Handles Peer discovery (see [Discovery]) and self identification.
2//! Also handles this peer availability zone.
3
4use clap::Parser;
5use std::{
6    convert::Infallible,
7    net::{IpAddr, SocketAddr, ToSocketAddrs},
8    sync::OnceLock,
9};
10use uuid::Uuid;
11use xxhash_rust::xxh3::xxh3_128;
12
13static PEERS: OnceLock<Vec<Peer>> = OnceLock::new();
14static LOCAL_INDEX: OnceLock<usize> = OnceLock::new();
15static AVAILABILITY_ZONE: OnceLock<String> = OnceLock::new();
16
17#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
18pub struct Peer {
19    pub addr: String,
20    _marker: (),
21}
22
23impl Peer {
24    pub fn is_local(&self) -> bool {
25        std::ptr::eq(self, local())
26    }
27}
28
29pub fn peers() -> &'static [Peer] {
30    PEERS.get().expect("peers list is uninitialized")
31}
32
33pub fn local_index() -> usize {
34    *LOCAL_INDEX.get().expect("local index is uninitialized")
35}
36
37pub fn local() -> &'static Peer {
38    &peers()[local_index()]
39}
40
41pub fn availability_zone() -> &'static str {
42    AVAILABILITY_ZONE
43        .get()
44        .expect("availability zone is uninitialized")
45}
46
47#[derive(Debug, Clone)]
48enum Discovery {
49    Dns(String),
50    List(String),
51}
52
53impl Discovery {
54    fn value_parser(value: &str) -> Result<Discovery, Infallible> {
55        if let Some(stripped) = value.strip_prefix("dns:") {
56            Ok(Discovery::Dns(stripped.to_string()))
57        } else {
58            Ok(Discovery::List(value.to_string()))
59        }
60    }
61}
62
63#[derive(Debug, Parser)]
64pub struct PeerCli {
65    #[arg(long = "peers", value_parser = Discovery::value_parser)]
66    discovery: Discovery,
67
68    #[arg(long, default_value = None)]
69    availability_zone: Option<String>,
70}
71
72pub fn init(cli: PeerCli, local_addr: SocketAddr) {
73    let mut peers: Vec<_> = match &cli.discovery {
74        Discovery::Dns(v) => v
75            .to_socket_addrs()
76            .expect("could not resolve address")
77            .map(|a| Peer {
78                addr: a.to_string(),
79                _marker: (),
80            })
81            .collect(),
82        Discovery::List(l) => l
83            .split(',')
84            .map(|a| Peer {
85                addr: a.to_owned(),
86                _marker: (),
87            })
88            .collect(),
89    };
90
91    peers.sort();
92
93    let local_index = peers
94        .iter()
95        .position(|p| is_self(local_addr, &p.addr))
96        .expect("self is not in peers");
97
98    let zone = cli.availability_zone.unwrap_or_else(|| {
99        Uuid::new_v8(xxh3_128(peers[local_index].addr.as_bytes()).to_ne_bytes()).to_string()
100    });
101
102    PEERS
103        .set(peers)
104        .expect("peers should not be initialized!!!");
105
106    LOCAL_INDEX
107        .set(local_index)
108        .expect("local index already initialized");
109
110    AVAILABILITY_ZONE
111        .set(zone)
112        .expect("availability zone already initialized");
113}
114
115fn is_self(local_addr: SocketAddr, addr: &str) -> bool {
116    fn is_local_ip(ip: IpAddr) -> bool {
117        let interfaces = local_ip_address::list_afinet_netifas().unwrap();
118        for (_, iface_ip) in interfaces {
119            if iface_ip == ip {
120                return true;
121            }
122        }
123
124        false
125    }
126
127    for addr in addr.to_socket_addrs().expect("could not resolve address") {
128        let ip_ok = if local_addr.ip().is_unspecified() {
129            is_local_ip(addr.ip())
130        } else {
131            local_addr.ip() == addr.ip()
132        };
133
134        let port_ok = local_addr.port() == addr.port();
135
136        if ip_ok && port_ok {
137            return true;
138        }
139    }
140
141    false
142}