meta_client/client/
ask_leader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
21use async_trait::async_trait;
22use common_grpc::channel_manager::ChannelManager;
23use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::warn;
26use rand::seq::SliceRandom;
27use snafu::{OptionExt, ResultExt};
28use tokio::time::timeout;
29use tonic::transport::Channel;
30
31use crate::client::Id;
32use crate::error;
33use crate::error::Result;
34
35pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
36
37/// Provide [MetaClient] a Metasrv leader's address.
38#[async_trait]
39pub trait LeaderProvider: Debug + Send + Sync {
40    /// Get the leader of the Metasrv. If it returns `None`, or the leader is outdated,
41    /// you can use `ask_leader` to find a new one.
42    fn leader(&self) -> Option<String>;
43
44    /// Find the current leader of the Metasrv.
45    async fn ask_leader(&self) -> Result<String>;
46}
47
48#[derive(Debug)]
49struct LeadershipGroup {
50    leader: Option<String>,
51    peers: Vec<String>,
52}
53
54#[derive(Clone, Debug)]
55pub struct AskLeader {
56    id: Id,
57    role: Role,
58    leadership_group: Arc<RwLock<LeadershipGroup>>,
59    channel_manager: ChannelManager,
60    max_retry: usize,
61}
62
63impl AskLeader {
64    pub fn new(
65        id: Id,
66        role: Role,
67        peers: impl Into<Vec<String>>,
68        channel_manager: ChannelManager,
69        max_retry: usize,
70    ) -> Self {
71        let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
72            leader: None,
73            peers: peers.into(),
74        }));
75        Self {
76            id,
77            role,
78            leadership_group,
79            channel_manager,
80            max_retry,
81        }
82    }
83
84    pub fn get_leader(&self) -> Option<String> {
85        self.leadership_group.read().unwrap().leader.clone()
86    }
87
88    async fn ask_leader_inner(&self) -> Result<String> {
89        let mut peers = {
90            let leadership_group = self.leadership_group.read().unwrap();
91            leadership_group.peers.clone()
92        };
93        peers.shuffle(&mut rand::rng());
94
95        let req = AskLeaderRequest {
96            header: Some(RequestHeader::new(
97                self.id,
98                self.role,
99                TracingContext::from_current_span().to_w3c(),
100            )),
101        };
102
103        let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
104
105        for addr in &peers {
106            let mut client = self.create_asker(addr)?;
107            let tx_clone = tx.clone();
108            let req = req.clone();
109            let addr = addr.to_string();
110            tokio::spawn(async move {
111                match client.ask_leader(req).await {
112                    Ok(res) => {
113                        if let Some(endpoint) = res.into_inner().leader {
114                            let _ = tx_clone.send(endpoint.addr).await;
115                        } else {
116                            warn!("No leader from: {addr}");
117                        };
118                    }
119                    Err(status) => {
120                        warn!("Failed to ask leader from: {addr}, {status}");
121                    }
122                }
123            });
124        }
125
126        let leader = timeout(
127            self.channel_manager
128                .config()
129                .timeout
130                .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
131            rx.recv(),
132        )
133        .await
134        .context(error::AskLeaderTimeoutSnafu)?
135        .context(error::NoLeaderSnafu)?;
136
137        let mut leadership_group = self.leadership_group.write().unwrap();
138        leadership_group.leader = Some(leader.clone());
139
140        Ok(leader)
141    }
142
143    pub async fn ask_leader(&self) -> Result<String> {
144        let mut times = 0;
145        while times < self.max_retry {
146            match self.ask_leader_inner().await {
147                Ok(res) => {
148                    return Ok(res);
149                }
150                Err(err) => {
151                    warn!("Failed to ask leader, source: {err}, retry {times} times");
152                    times += 1;
153                    continue;
154                }
155            }
156        }
157
158        error::RetryTimesExceededSnafu {
159            msg: "Failed to ask leader",
160            times: self.max_retry,
161        }
162        .fail()
163    }
164
165    fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
166        Ok(HeartbeatClient::new(
167            self.channel_manager
168                .get(addr)
169                .context(error::CreateChannelSnafu)?,
170        ))
171    }
172}
173
174#[async_trait]
175impl LeaderProvider for AskLeader {
176    fn leader(&self) -> Option<String> {
177        self.get_leader()
178    }
179
180    async fn ask_leader(&self) -> Result<String> {
181        self.ask_leader().await
182    }
183}