meta_client/client/
ask_leader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, RwLock};
16use std::time::Duration;
17
18use api::v1::meta::heartbeat_client::HeartbeatClient;
19use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
20use common_grpc::channel_manager::ChannelManager;
21use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
22use common_telemetry::tracing_context::TracingContext;
23use common_telemetry::warn;
24use rand::seq::SliceRandom;
25use snafu::{OptionExt, ResultExt};
26use tokio::time::timeout;
27use tonic::transport::Channel;
28
29use crate::client::Id;
30use crate::error;
31use crate::error::Result;
32
33#[derive(Debug)]
34struct LeadershipGroup {
35    leader: Option<String>,
36    peers: Vec<String>,
37}
38
39#[derive(Clone, Debug)]
40pub struct AskLeader {
41    id: Id,
42    role: Role,
43    leadership_group: Arc<RwLock<LeadershipGroup>>,
44    channel_manager: ChannelManager,
45    max_retry: usize,
46}
47
48impl AskLeader {
49    pub fn new(
50        id: Id,
51        role: Role,
52        peers: impl Into<Vec<String>>,
53        channel_manager: ChannelManager,
54        max_retry: usize,
55    ) -> Self {
56        let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
57            leader: None,
58            peers: peers.into(),
59        }));
60        Self {
61            id,
62            role,
63            leadership_group,
64            channel_manager,
65            max_retry,
66        }
67    }
68
69    pub fn get_leader(&self) -> Option<String> {
70        self.leadership_group.read().unwrap().leader.clone()
71    }
72
73    async fn ask_leader_inner(&self) -> Result<String> {
74        let mut peers = {
75            let leadership_group = self.leadership_group.read().unwrap();
76            leadership_group.peers.clone()
77        };
78        peers.shuffle(&mut rand::rng());
79
80        let req = AskLeaderRequest {
81            header: Some(RequestHeader::new(
82                self.id,
83                self.role,
84                TracingContext::from_current_span().to_w3c(),
85            )),
86        };
87
88        let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
89
90        for addr in &peers {
91            let mut client = self.create_asker(addr)?;
92            let tx_clone = tx.clone();
93            let req = req.clone();
94            let addr = addr.to_string();
95            tokio::spawn(async move {
96                match client.ask_leader(req).await {
97                    Ok(res) => {
98                        if let Some(endpoint) = res.into_inner().leader {
99                            let _ = tx_clone.send(endpoint.addr).await;
100                        } else {
101                            warn!("No leader from: {addr}");
102                        };
103                    }
104                    Err(status) => {
105                        warn!("Failed to ask leader from: {addr}, {status}");
106                    }
107                }
108            });
109        }
110
111        let leader = timeout(
112            self.channel_manager
113                .config()
114                .timeout
115                .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
116            rx.recv(),
117        )
118        .await
119        .context(error::AskLeaderTimeoutSnafu)?
120        .context(error::NoLeaderSnafu)?;
121
122        let mut leadership_group = self.leadership_group.write().unwrap();
123        leadership_group.leader = Some(leader.clone());
124
125        Ok(leader)
126    }
127
128    pub async fn ask_leader(&self) -> Result<String> {
129        let mut times = 0;
130        while times < self.max_retry {
131            match self.ask_leader_inner().await {
132                Ok(res) => {
133                    return Ok(res);
134                }
135                Err(err) => {
136                    warn!("Failed to ask leader, source: {err}, retry {times} times");
137                    times += 1;
138                    continue;
139                }
140            }
141        }
142
143        error::RetryTimesExceededSnafu {
144            msg: "Failed to ask leader",
145            times: self.max_retry,
146        }
147        .fail()
148    }
149
150    fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
151        Ok(HeartbeatClient::new(
152            self.channel_manager
153                .get(addr)
154                .context(error::CreateChannelSnafu)?,
155        ))
156    }
157}