meta_client/client/
ask_leader.rs1use std::fmt::Debug;
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
21use async_trait::async_trait;
22use common_grpc::channel_manager::ChannelManager;
23use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::warn;
26use rand::seq::SliceRandom;
27use snafu::ResultExt;
28use tokio::time::timeout;
29use tonic::transport::Channel;
30
31use crate::client::Id;
32use crate::error;
33use crate::error::Result;
34
35pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
36
37#[async_trait]
39pub trait LeaderProvider: Debug + Send + Sync {
40 fn leader(&self) -> Option<String>;
43
44 async fn ask_leader(&self) -> Result<String>;
46}
47
48#[derive(Debug)]
49struct LeadershipGroup {
50 leader: Option<String>,
51 peers: Vec<String>,
52}
53
54#[derive(Clone, Debug)]
55pub struct AskLeader {
56 id: Id,
57 role: Role,
58 leadership_group: Arc<RwLock<LeadershipGroup>>,
59 channel_manager: ChannelManager,
60 max_retry: usize,
61}
62
63impl AskLeader {
64 pub fn new(
65 id: Id,
66 role: Role,
67 peers: impl Into<Vec<String>>,
68 channel_manager: ChannelManager,
69 max_retry: usize,
70 ) -> Self {
71 let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
72 leader: None,
73 peers: peers.into(),
74 }));
75 Self {
76 id,
77 role,
78 leadership_group,
79 channel_manager,
80 max_retry,
81 }
82 }
83
84 pub fn get_leader(&self) -> Option<String> {
85 self.leadership_group.read().unwrap().leader.clone()
86 }
87
88 async fn ask_leader_inner(&self) -> Result<String> {
89 let mut peers = {
90 let leadership_group = self.leadership_group.read().unwrap();
91 leadership_group.peers.clone()
92 };
93 peers.shuffle(&mut rand::rng());
94
95 let req = AskLeaderRequest {
96 header: Some(RequestHeader::new(
97 self.id,
98 self.role,
99 TracingContext::from_current_span().to_w3c(),
100 )),
101 };
102
103 let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
104 let channel_manager = self.channel_manager.clone();
105
106 for addr in &peers {
107 let mut client = self.create_asker(addr)?;
108 let tx_clone = tx.clone();
109 let req = req.clone();
110 let addr = addr.clone();
111 let channel_manager = channel_manager.clone();
112 tokio::spawn(async move {
113 match client.ask_leader(req).await {
114 Ok(res) => {
115 if let Some(endpoint) = res.into_inner().leader {
116 let _ = tx_clone.send(endpoint.addr).await;
117 } else {
118 warn!("No leader from: {addr}");
119 };
120 }
121 Err(status) => {
122 Self::reset_channels_with_manager(
125 &channel_manager,
126 std::slice::from_ref(&addr),
127 );
128 warn!("Failed to ask leader from: {addr}, {status}");
129 }
130 }
131 });
132 }
133
134 let leader = match timeout(
135 self.channel_manager
136 .config()
137 .timeout
138 .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
139 rx.recv(),
140 )
141 .await
142 {
143 Ok(Some(leader)) => leader,
144 Ok(None) => return error::NoLeaderSnafu.fail(),
145 Err(e) => {
146 Self::reset_channels_with_manager(&self.channel_manager, &peers);
149 return Err(e).context(error::AskLeaderTimeoutSnafu);
150 }
151 };
152
153 let mut leadership_group = self.leadership_group.write().unwrap();
154 leadership_group.leader = Some(leader.clone());
155
156 Ok(leader)
157 }
158
159 pub async fn ask_leader(&self) -> Result<String> {
160 let mut times = 0;
161 while times < self.max_retry {
162 match self.ask_leader_inner().await {
163 Ok(res) => {
164 return Ok(res);
165 }
166 Err(err) => {
167 warn!("Failed to ask leader, source: {err}, retry {times} times");
168 times += 1;
169 continue;
170 }
171 }
172 }
173
174 error::RetryTimesExceededSnafu {
175 msg: "Failed to ask leader",
176 times: self.max_retry,
177 }
178 .fail()
179 }
180
181 fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
182 Ok(HeartbeatClient::new(
183 self.channel_manager
184 .get(addr)
185 .context(error::CreateChannelSnafu)?,
186 ))
187 }
188
189 fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
191 if peers.is_empty() {
192 return;
193 }
194
195 channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
196 }
197}
198
199#[async_trait]
200impl LeaderProvider for AskLeader {
201 fn leader(&self) -> Option<String> {
202 self.get_leader()
203 }
204
205 async fn ask_leader(&self) -> Result<String> {
206 self.ask_leader().await
207 }
208}