meta_client/client/
ask_leader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use api::v1::meta::heartbeat_client::HeartbeatClient;
20use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
21use async_trait::async_trait;
22use common_grpc::channel_manager::ChannelManager;
23use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
24use common_telemetry::tracing_context::TracingContext;
25use common_telemetry::warn;
26use rand::seq::SliceRandom;
27use snafu::ResultExt;
28use tokio::time::timeout;
29use tonic::transport::Channel;
30
31use crate::client::Id;
32use crate::error;
33use crate::error::Result;
34
35pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
36
37/// Provide [MetaClient] a Metasrv leader's address.
38#[async_trait]
39pub trait LeaderProvider: Debug + Send + Sync {
40    /// Get the leader of the Metasrv. If it returns `None`, or the leader is outdated,
41    /// you can use `ask_leader` to find a new one.
42    fn leader(&self) -> Option<String>;
43
44    /// Find the current leader of the Metasrv.
45    async fn ask_leader(&self) -> Result<String>;
46}
47
48#[derive(Debug)]
49struct LeadershipGroup {
50    leader: Option<String>,
51    peers: Vec<String>,
52}
53
54#[derive(Clone, Debug)]
55pub struct AskLeader {
56    id: Id,
57    role: Role,
58    leadership_group: Arc<RwLock<LeadershipGroup>>,
59    channel_manager: ChannelManager,
60    max_retry: usize,
61}
62
63impl AskLeader {
64    pub fn new(
65        id: Id,
66        role: Role,
67        peers: impl Into<Vec<String>>,
68        channel_manager: ChannelManager,
69        max_retry: usize,
70    ) -> Self {
71        let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
72            leader: None,
73            peers: peers.into(),
74        }));
75        Self {
76            id,
77            role,
78            leadership_group,
79            channel_manager,
80            max_retry,
81        }
82    }
83
84    pub fn get_leader(&self) -> Option<String> {
85        self.leadership_group.read().unwrap().leader.clone()
86    }
87
88    async fn ask_leader_inner(&self) -> Result<String> {
89        let mut peers = {
90            let leadership_group = self.leadership_group.read().unwrap();
91            leadership_group.peers.clone()
92        };
93        peers.shuffle(&mut rand::rng());
94
95        let req = AskLeaderRequest {
96            header: Some(RequestHeader::new(
97                self.id,
98                self.role,
99                TracingContext::from_current_span().to_w3c(),
100            )),
101        };
102
103        let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
104        let channel_manager = self.channel_manager.clone();
105
106        for addr in &peers {
107            let mut client = self.create_asker(addr)?;
108            let tx_clone = tx.clone();
109            let req = req.clone();
110            let addr = addr.clone();
111            let channel_manager = channel_manager.clone();
112            tokio::spawn(async move {
113                match client.ask_leader(req).await {
114                    Ok(res) => {
115                        if let Some(endpoint) = res.into_inner().leader {
116                            let _ = tx_clone.send(endpoint.addr).await;
117                        } else {
118                            warn!("No leader from: {addr}");
119                        };
120                    }
121                    Err(status) => {
122                        // Reset cached channel even on generic errors: the VIP may keep us on a dead
123                        // backend, so forcing a reconnect gives us a chance to hit a healthy peer.
124                        Self::reset_channels_with_manager(
125                            &channel_manager,
126                            std::slice::from_ref(&addr),
127                        );
128                        warn!("Failed to ask leader from: {addr}, {status}");
129                    }
130                }
131            });
132        }
133
134        let leader = match timeout(
135            self.channel_manager
136                .config()
137                .timeout
138                .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
139            rx.recv(),
140        )
141        .await
142        {
143            Ok(Some(leader)) => leader,
144            Ok(None) => return error::NoLeaderSnafu.fail(),
145            Err(e) => {
146                // All peers timed out. Reset channels to force reconnection,
147                // which may help escape dead backends in VIP/LB scenarios.
148                Self::reset_channels_with_manager(&self.channel_manager, &peers);
149                return Err(e).context(error::AskLeaderTimeoutSnafu);
150            }
151        };
152
153        let mut leadership_group = self.leadership_group.write().unwrap();
154        leadership_group.leader = Some(leader.clone());
155
156        Ok(leader)
157    }
158
159    pub async fn ask_leader(&self) -> Result<String> {
160        let mut times = 0;
161        while times < self.max_retry {
162            match self.ask_leader_inner().await {
163                Ok(res) => {
164                    return Ok(res);
165                }
166                Err(err) => {
167                    warn!("Failed to ask leader, source: {err}, retry {times} times");
168                    times += 1;
169                    continue;
170                }
171            }
172        }
173
174        error::RetryTimesExceededSnafu {
175            msg: "Failed to ask leader",
176            times: self.max_retry,
177        }
178        .fail()
179    }
180
181    fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
182        Ok(HeartbeatClient::new(
183            self.channel_manager
184                .get(addr)
185                .context(error::CreateChannelSnafu)?,
186        ))
187    }
188
189    /// Drop cached channels for the given peers so a fresh connection is used next time.
190    fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
191        if peers.is_empty() {
192            return;
193        }
194
195        channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
196    }
197}
198
199#[async_trait]
200impl LeaderProvider for AskLeader {
201    fn leader(&self) -> Option<String> {
202        self.get_leader()
203    }
204
205    async fn ask_leader(&self) -> Result<String> {
206        self.ask_leader().await
207    }
208}