meta_client/client/
ask_leader.rs1use std::fmt::Debug;
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
21use async_trait::async_trait;
22use common_grpc::channel_manager::ChannelManager;
23use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::warn;
26use rand::seq::SliceRandom;
27use snafu::ResultExt;
28use tokio::time::timeout;
29use tonic::transport::Channel;
30
31use crate::client::Id;
32use crate::error;
33use crate::error::Result;
34
35pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
36
37#[async_trait]
39pub trait LeaderProvider: Debug + Send + Sync {
40 fn leader(&self) -> Option<String>;
43
44 async fn ask_leader(&self) -> Result<String>;
46}
47
48pub type LeaderProviderFactoryRef = Arc<dyn LeaderProviderFactory>;
49
50pub trait LeaderProviderFactory: Send + Sync + Debug {
52 fn create(&self, peers: &[&str]) -> LeaderProviderRef;
53}
54
55#[derive(Debug)]
56struct LeadershipGroup {
57 leader: Option<String>,
58 peers: Vec<String>,
59}
60
61#[derive(Clone, Debug)]
62pub struct AskLeader {
63 id: Id,
64 role: Role,
65 leadership_group: Arc<RwLock<LeadershipGroup>>,
66 channel_manager: ChannelManager,
67 max_retry: usize,
68}
69
70impl AskLeader {
71 pub fn new(
72 id: Id,
73 role: Role,
74 peers: impl Into<Vec<String>>,
75 channel_manager: ChannelManager,
76 max_retry: usize,
77 ) -> Self {
78 let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
79 leader: None,
80 peers: peers.into(),
81 }));
82 Self {
83 id,
84 role,
85 leadership_group,
86 channel_manager,
87 max_retry,
88 }
89 }
90
91 pub fn get_leader(&self) -> Option<String> {
92 self.leadership_group.read().unwrap().leader.clone()
93 }
94
95 async fn ask_leader_inner(&self) -> Result<String> {
96 let mut peers = {
97 let leadership_group = self.leadership_group.read().unwrap();
98 leadership_group.peers.clone()
99 };
100 peers.shuffle(&mut rand::rng());
101
102 let req = AskLeaderRequest {
103 header: Some(RequestHeader::new(
104 self.id,
105 self.role,
106 TracingContext::from_current_span().to_w3c(),
107 )),
108 };
109
110 let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
111 let channel_manager = self.channel_manager.clone();
112
113 for addr in &peers {
114 let mut client = self.create_asker(addr)?;
115 let tx_clone = tx.clone();
116 let req = req.clone();
117 let addr = addr.clone();
118 let channel_manager = channel_manager.clone();
119 tokio::spawn(async move {
120 match client.ask_leader(req).await {
121 Ok(res) => {
122 if let Some(endpoint) = res.into_inner().leader {
123 let _ = tx_clone.send(endpoint.addr).await;
124 } else {
125 warn!("No leader from: {addr}");
126 };
127 }
128 Err(status) => {
129 Self::reset_channels_with_manager(
132 &channel_manager,
133 std::slice::from_ref(&addr),
134 );
135 warn!("Failed to ask leader from: {addr}, {status}");
136 }
137 }
138 });
139 }
140
141 let leader = match timeout(
142 self.channel_manager
143 .config()
144 .timeout
145 .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
146 rx.recv(),
147 )
148 .await
149 {
150 Ok(Some(leader)) => leader,
151 Ok(None) => return error::NoLeaderSnafu.fail(),
152 Err(e) => {
153 Self::reset_channels_with_manager(&self.channel_manager, &peers);
156 return Err(e).context(error::AskLeaderTimeoutSnafu);
157 }
158 };
159
160 let mut leadership_group = self.leadership_group.write().unwrap();
161 leadership_group.leader = Some(leader.clone());
162
163 Ok(leader)
164 }
165
166 pub async fn ask_leader(&self) -> Result<String> {
167 let mut times = 0;
168 while times < self.max_retry {
169 match self.ask_leader_inner().await {
170 Ok(res) => {
171 return Ok(res);
172 }
173 Err(err) => {
174 warn!("Failed to ask leader, source: {err}, retry {times} times");
175 times += 1;
176 continue;
177 }
178 }
179 }
180
181 error::RetryTimesExceededSnafu {
182 msg: "Failed to ask leader",
183 times: self.max_retry,
184 }
185 .fail()
186 }
187
188 fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
189 Ok(HeartbeatClient::new(
190 self.channel_manager
191 .get(addr)
192 .context(error::CreateChannelSnafu)?,
193 ))
194 }
195
196 fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
198 if peers.is_empty() {
199 return;
200 }
201
202 channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
203 }
204}
205
206#[async_trait]
207impl LeaderProvider for AskLeader {
208 fn leader(&self) -> Option<String> {
209 self.get_leader()
210 }
211
212 async fn ask_leader(&self) -> Result<String> {
213 self.ask_leader().await
214 }
215}
216
217#[derive(Clone, Debug)]
219pub struct LeaderProviderFactoryImpl {
220 id: Id,
221 role: Role,
222 max_retry: usize,
223 channel_manager: ChannelManager,
224}
225
226impl LeaderProviderFactoryImpl {
227 pub fn new(id: Id, role: Role, max_retry: usize, channel_manager: ChannelManager) -> Self {
228 Self {
229 id,
230 role,
231 max_retry,
232 channel_manager,
233 }
234 }
235}
236
237impl LeaderProviderFactory for LeaderProviderFactoryImpl {
238 fn create(&self, peers: &[&str]) -> LeaderProviderRef {
239 Arc::new(AskLeader::new(
240 self.id,
241 self.role,
242 peers.iter().map(|p| p.to_string()).collect::<Vec<_>>(),
243 self.channel_manager.clone(),
244 self.max_retry,
245 ))
246 }
247}