meta_client/client/
ask_leader.rs1use std::fmt::Debug;
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
21use async_trait::async_trait;
22use common_grpc::channel_manager::ChannelManager;
23use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::warn;
26use rand::seq::SliceRandom;
27use snafu::{OptionExt, ResultExt};
28use tokio::time::timeout;
29use tonic::transport::Channel;
30
31use crate::client::Id;
32use crate::error;
33use crate::error::Result;
34
35pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
36
37#[async_trait]
39pub trait LeaderProvider: Debug + Send + Sync {
40 fn leader(&self) -> Option<String>;
43
44 async fn ask_leader(&self) -> Result<String>;
46}
47
48#[derive(Debug)]
49struct LeadershipGroup {
50 leader: Option<String>,
51 peers: Vec<String>,
52}
53
54#[derive(Clone, Debug)]
55pub struct AskLeader {
56 id: Id,
57 role: Role,
58 leadership_group: Arc<RwLock<LeadershipGroup>>,
59 channel_manager: ChannelManager,
60 max_retry: usize,
61}
62
63impl AskLeader {
64 pub fn new(
65 id: Id,
66 role: Role,
67 peers: impl Into<Vec<String>>,
68 channel_manager: ChannelManager,
69 max_retry: usize,
70 ) -> Self {
71 let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
72 leader: None,
73 peers: peers.into(),
74 }));
75 Self {
76 id,
77 role,
78 leadership_group,
79 channel_manager,
80 max_retry,
81 }
82 }
83
84 pub fn get_leader(&self) -> Option<String> {
85 self.leadership_group.read().unwrap().leader.clone()
86 }
87
88 async fn ask_leader_inner(&self) -> Result<String> {
89 let mut peers = {
90 let leadership_group = self.leadership_group.read().unwrap();
91 leadership_group.peers.clone()
92 };
93 peers.shuffle(&mut rand::rng());
94
95 let req = AskLeaderRequest {
96 header: Some(RequestHeader::new(
97 self.id,
98 self.role,
99 TracingContext::from_current_span().to_w3c(),
100 )),
101 };
102
103 let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
104
105 for addr in &peers {
106 let mut client = self.create_asker(addr)?;
107 let tx_clone = tx.clone();
108 let req = req.clone();
109 let addr = addr.to_string();
110 tokio::spawn(async move {
111 match client.ask_leader(req).await {
112 Ok(res) => {
113 if let Some(endpoint) = res.into_inner().leader {
114 let _ = tx_clone.send(endpoint.addr).await;
115 } else {
116 warn!("No leader from: {addr}");
117 };
118 }
119 Err(status) => {
120 warn!("Failed to ask leader from: {addr}, {status}");
121 }
122 }
123 });
124 }
125
126 let leader = timeout(
127 self.channel_manager
128 .config()
129 .timeout
130 .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
131 rx.recv(),
132 )
133 .await
134 .context(error::AskLeaderTimeoutSnafu)?
135 .context(error::NoLeaderSnafu)?;
136
137 let mut leadership_group = self.leadership_group.write().unwrap();
138 leadership_group.leader = Some(leader.clone());
139
140 Ok(leader)
141 }
142
143 pub async fn ask_leader(&self) -> Result<String> {
144 let mut times = 0;
145 while times < self.max_retry {
146 match self.ask_leader_inner().await {
147 Ok(res) => {
148 return Ok(res);
149 }
150 Err(err) => {
151 warn!("Failed to ask leader, source: {err}, retry {times} times");
152 times += 1;
153 continue;
154 }
155 }
156 }
157
158 error::RetryTimesExceededSnafu {
159 msg: "Failed to ask leader",
160 times: self.max_retry,
161 }
162 .fail()
163 }
164
165 fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
166 Ok(HeartbeatClient::new(
167 self.channel_manager
168 .get(addr)
169 .context(error::CreateChannelSnafu)?,
170 ))
171 }
172}
173
174#[async_trait]
175impl LeaderProvider for AskLeader {
176 fn leader(&self) -> Option<String> {
177 self.get_leader()
178 }
179
180 async fn ask_leader(&self) -> Result<String> {
181 self.ask_leader().await
182 }
183}