1use clap::Parser;
5use std::{
6 convert::Infallible,
7 net::{IpAddr, SocketAddr, ToSocketAddrs},
8 sync::OnceLock,
9};
10use uuid::Uuid;
11use xxhash_rust::xxh3::xxh3_128;
12
13static PEERS: OnceLock<Vec<Peer>> = OnceLock::new();
14static LOCAL_INDEX: OnceLock<usize> = OnceLock::new();
15static AVAILABILITY_ZONE: OnceLock<String> = OnceLock::new();
16
17#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
18pub struct Peer {
19 pub addr: String,
20 _marker: (),
21}
22
23impl Peer {
24 pub fn is_local(&self) -> bool {
25 std::ptr::eq(self, local())
26 }
27}
28
29pub fn peers() -> &'static [Peer] {
30 PEERS.get().expect("peers list is uninitialized")
31}
32
33pub fn local_index() -> usize {
34 *LOCAL_INDEX.get().expect("local index is uninitialized")
35}
36
37pub fn local() -> &'static Peer {
38 &peers()[local_index()]
39}
40
41pub fn availability_zone() -> &'static str {
42 AVAILABILITY_ZONE
43 .get()
44 .expect("availability zone is uninitialized")
45}
46
47#[derive(Debug, Clone)]
48enum Discovery {
49 Dns(String),
50 List(String),
51}
52
53impl Discovery {
54 fn value_parser(value: &str) -> Result<Discovery, Infallible> {
55 if let Some(stripped) = value.strip_prefix("dns:") {
56 Ok(Discovery::Dns(stripped.to_string()))
57 } else {
58 Ok(Discovery::List(value.to_string()))
59 }
60 }
61}
62
63#[derive(Debug, Parser)]
64pub struct PeerCli {
65 #[arg(long = "peers", value_parser = Discovery::value_parser)]
66 discovery: Discovery,
67
68 #[arg(long, default_value = None)]
69 availability_zone: Option<String>,
70}
71
72pub fn init(cli: PeerCli, local_addr: SocketAddr) {
73 let mut peers: Vec<_> = match &cli.discovery {
74 Discovery::Dns(v) => v
75 .to_socket_addrs()
76 .expect("could not resolve address")
77 .map(|a| Peer {
78 addr: a.to_string(),
79 _marker: (),
80 })
81 .collect(),
82 Discovery::List(l) => l
83 .split(',')
84 .map(|a| Peer {
85 addr: a.to_owned(),
86 _marker: (),
87 })
88 .collect(),
89 };
90
91 peers.sort();
92
93 let local_index = peers
94 .iter()
95 .position(|p| is_self(local_addr, &p.addr))
96 .expect("self is not in peers");
97
98 let zone = cli.availability_zone.unwrap_or_else(|| {
99 Uuid::new_v8(xxh3_128(peers[local_index].addr.as_bytes()).to_ne_bytes()).to_string()
100 });
101
102 PEERS
103 .set(peers)
104 .expect("peers should not be initialized!!!");
105
106 LOCAL_INDEX
107 .set(local_index)
108 .expect("local index already initialized");
109
110 AVAILABILITY_ZONE
111 .set(zone)
112 .expect("availability zone already initialized");
113}
114
115fn is_self(local_addr: SocketAddr, addr: &str) -> bool {
116 fn is_local_ip(ip: IpAddr) -> bool {
117 let interfaces = local_ip_address::list_afinet_netifas().unwrap();
118 for (_, iface_ip) in interfaces {
119 if iface_ip == ip {
120 return true;
121 }
122 }
123
124 false
125 }
126
127 for addr in addr.to_socket_addrs().expect("could not resolve address") {
128 let ip_ok = if local_addr.ip().is_unspecified() {
129 is_local_ip(addr.ip())
130 } else {
131 local_addr.ip() == addr.ip()
132 };
133
134 let port_ok = local_addr.port() == addr.port();
135
136 if ip_ok && port_ok {
137 return true;
138 }
139 }
140
141 false
142}