meta_client/client/
ask_leader.rs1use std::sync::{Arc, RwLock};
16use std::time::Duration;
17
18use api::v1::meta::heartbeat_client::HeartbeatClient;
19use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
20use common_grpc::channel_manager::ChannelManager;
21use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
22use common_telemetry::tracing_context::TracingContext;
23use common_telemetry::warn;
24use rand::seq::SliceRandom;
25use snafu::{OptionExt, ResultExt};
26use tokio::time::timeout;
27use tonic::transport::Channel;
28
29use crate::client::Id;
30use crate::error;
31use crate::error::Result;
32
33#[derive(Debug)]
34struct LeadershipGroup {
35 leader: Option<String>,
36 peers: Vec<String>,
37}
38
39#[derive(Clone, Debug)]
40pub struct AskLeader {
41 id: Id,
42 role: Role,
43 leadership_group: Arc<RwLock<LeadershipGroup>>,
44 channel_manager: ChannelManager,
45 max_retry: usize,
46}
47
48impl AskLeader {
49 pub fn new(
50 id: Id,
51 role: Role,
52 peers: impl Into<Vec<String>>,
53 channel_manager: ChannelManager,
54 max_retry: usize,
55 ) -> Self {
56 let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
57 leader: None,
58 peers: peers.into(),
59 }));
60 Self {
61 id,
62 role,
63 leadership_group,
64 channel_manager,
65 max_retry,
66 }
67 }
68
69 pub fn get_leader(&self) -> Option<String> {
70 self.leadership_group.read().unwrap().leader.clone()
71 }
72
73 async fn ask_leader_inner(&self) -> Result<String> {
74 let mut peers = {
75 let leadership_group = self.leadership_group.read().unwrap();
76 leadership_group.peers.clone()
77 };
78 peers.shuffle(&mut rand::rng());
79
80 let req = AskLeaderRequest {
81 header: Some(RequestHeader::new(
82 self.id,
83 self.role,
84 TracingContext::from_current_span().to_w3c(),
85 )),
86 };
87
88 let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
89
90 for addr in &peers {
91 let mut client = self.create_asker(addr)?;
92 let tx_clone = tx.clone();
93 let req = req.clone();
94 let addr = addr.to_string();
95 tokio::spawn(async move {
96 match client.ask_leader(req).await {
97 Ok(res) => {
98 if let Some(endpoint) = res.into_inner().leader {
99 let _ = tx_clone.send(endpoint.addr).await;
100 } else {
101 warn!("No leader from: {addr}");
102 };
103 }
104 Err(status) => {
105 warn!("Failed to ask leader from: {addr}, {status}");
106 }
107 }
108 });
109 }
110
111 let leader = timeout(
112 self.channel_manager
113 .config()
114 .timeout
115 .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
116 rx.recv(),
117 )
118 .await
119 .context(error::AskLeaderTimeoutSnafu)?
120 .context(error::NoLeaderSnafu)?;
121
122 let mut leadership_group = self.leadership_group.write().unwrap();
123 leadership_group.leader = Some(leader.clone());
124
125 Ok(leader)
126 }
127
128 pub async fn ask_leader(&self) -> Result<String> {
129 let mut times = 0;
130 while times < self.max_retry {
131 match self.ask_leader_inner().await {
132 Ok(res) => {
133 return Ok(res);
134 }
135 Err(err) => {
136 warn!("Failed to ask leader, source: {err}, retry {times} times");
137 times += 1;
138 continue;
139 }
140 }
141 }
142
143 error::RetryTimesExceededSnafu {
144 msg: "Failed to ask leader",
145 times: self.max_retry,
146 }
147 .fail()
148 }
149
150 fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
151 Ok(HeartbeatClient::new(
152 self.channel_manager
153 .get(addr)
154 .context(error::CreateChannelSnafu)?,
155 ))
156 }
157}