Skip to main content

meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod config;
17pub mod heartbeat;
18mod load_balance;
19mod procedure;
20
21mod cluster;
22mod store;
23mod util;
24
25use std::fmt::Debug;
26use std::sync::Arc;
27use std::time::Duration;
28
29use api::v1::meta::heartbeat_request::NodeWorkloads;
30use api::v1::meta::{
31    MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
32};
33pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
34use cluster::Client as ClusterClient;
35pub use cluster::ClusterKvBackend;
36use common_error::ext::BoxedError;
37use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
38use common_meta::cluster::{
39    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
40};
41use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
42use common_meta::distributed_time_constants::default_distributed_time_constants;
43use common_meta::error::{
44    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
45};
46use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
47use common_meta::kv_backend::KvBackendRef;
48use common_meta::peer::{Peer, PeerDiscovery};
49use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
50use common_meta::range_stream::PaginationStream;
51use common_meta::rpc::KeyValue;
52use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
53use common_meta::rpc::procedure::{
54    AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse,
55    GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
56    ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
57};
58use common_meta::rpc::store::{
59    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
60    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
61    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
62};
63use common_options::plugin_options::PluginOptionsDeserializer;
64use common_telemetry::info;
65use common_time::util::DefaultSystemTimer;
66use config::Client as ConfigClient;
67use futures::TryStreamExt;
68use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
69use procedure::Client as ProcedureClient;
70use serde::de::DeserializeOwned;
71use snafu::{OptionExt, ResultExt};
72use store::Client as StoreClient;
73
74pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
75use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
76use crate::error::{
77    ConvertMetaConfigSnafu, ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error,
78    GetFlowStatSnafu, NotStartedSnafu, Result,
79};
80
81pub type Id = u64;
82
83const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
84const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
85const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
86const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10);
87
88#[derive(Clone, Debug, Default)]
89pub struct MetaClientBuilder {
90    id: Id,
91    role: Role,
92    enable_heartbeat: bool,
93    enable_store: bool,
94    enable_procedure: bool,
95    enable_access_cluster_info: bool,
96    region_follower: Option<RegionFollowerClientRef>,
97    channel_manager: Option<ChannelManager>,
98    ddl_channel_manager: Option<ChannelManager>,
99    /// The default ddl timeout for each request.
100    ddl_timeout: Option<Duration>,
101    heartbeat_channel_manager: Option<ChannelManager>,
102}
103
104impl MetaClientBuilder {
105    pub fn new(member_id: u64, role: Role) -> Self {
106        Self {
107            id: member_id,
108            role,
109            ..Default::default()
110        }
111    }
112
113    /// Returns the role of Frontend's default options.
114    pub fn frontend_default_options() -> Self {
115        // Frontend does not need a member id.
116        Self::new(0, Role::Frontend)
117            .enable_store()
118            .enable_heartbeat()
119            .enable_procedure()
120            .enable_access_cluster_info()
121    }
122
123    /// Returns the role of Datanode's default options.
124    pub fn datanode_default_options(member_id: u64) -> Self {
125        Self::new(member_id, Role::Datanode)
126            .enable_store()
127            .enable_heartbeat()
128    }
129
130    /// Returns the role of Flownode's default options.
131    pub fn flownode_default_options(member_id: u64) -> Self {
132        Self::new(member_id, Role::Flownode)
133            .enable_store()
134            .enable_heartbeat()
135            .enable_procedure()
136            .enable_access_cluster_info()
137    }
138
139    pub fn enable_heartbeat(self) -> Self {
140        Self {
141            enable_heartbeat: true,
142            ..self
143        }
144    }
145
146    pub fn enable_store(self) -> Self {
147        Self {
148            enable_store: true,
149            ..self
150        }
151    }
152
153    pub fn enable_procedure(self) -> Self {
154        Self {
155            enable_procedure: true,
156            ..self
157        }
158    }
159
160    pub fn enable_access_cluster_info(self) -> Self {
161        Self {
162            enable_access_cluster_info: true,
163            ..self
164        }
165    }
166
167    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
168        Self {
169            channel_manager: Some(channel_manager),
170            ..self
171        }
172    }
173
174    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
175        Self {
176            ddl_channel_manager: Some(channel_manager),
177            ..self
178        }
179    }
180
181    pub fn ddl_timeout(self, timeout: Duration) -> Self {
182        Self {
183            ddl_timeout: Some(timeout),
184            ..self
185        }
186    }
187
188    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
189        Self {
190            heartbeat_channel_manager: Some(channel_manager),
191            ..self
192        }
193    }
194
195    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
196        Self {
197            region_follower: Some(region_follower),
198            ..self
199        }
200    }
201
202    pub fn build(self) -> MetaClient {
203        let mgr = self.channel_manager.unwrap_or_default();
204        let heartbeat_channel_manager = self
205            .heartbeat_channel_manager
206            .clone()
207            .unwrap_or_else(|| mgr.clone());
208
209        let heartbeat = self.enable_heartbeat.then(|| {
210            if self.heartbeat_channel_manager.is_some() {
211                info!("Enable heartbeat channel using the heartbeat channel manager.");
212            }
213
214            HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
215        });
216        let config = self
217            .enable_heartbeat
218            .then(|| ConfigClient::new(self.id, self.role, mgr.clone()));
219        let store = self
220            .enable_store
221            .then(|| StoreClient::new(self.id, self.role, mgr.clone()));
222        let procedure = self.enable_procedure.then(|| {
223            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
224            ProcedureClient::new(
225                self.id,
226                self.role,
227                mgr,
228                DEFAULT_SUBMIT_DDL_MAX_RETRY,
229                self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT),
230            )
231        });
232        let cluster = self
233            .enable_access_cluster_info
234            .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
235        let region_follower = self.region_follower.clone();
236
237        MetaClient {
238            id: self.id,
239            channel_manager: mgr.clone(),
240            leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
241                self.id,
242                self.role,
243                DEFAULT_ASK_LEADER_MAX_RETRY,
244                heartbeat_channel_manager,
245            )),
246            heartbeat,
247            config,
248            store,
249            procedure,
250            cluster,
251            region_follower,
252        }
253    }
254}
255
256#[derive(Debug)]
257pub struct MetaClient {
258    id: Id,
259    channel_manager: ChannelManager,
260    leader_provider_factory: LeaderProviderFactoryRef,
261    heartbeat: Option<HeartbeatClient>,
262    config: Option<ConfigClient>,
263    store: Option<StoreClient>,
264    procedure: Option<ProcedureClient>,
265    cluster: Option<ClusterClient>,
266    region_follower: Option<RegionFollowerClientRef>,
267}
268
269impl MetaClient {
270    pub fn new(id: Id, role: Role) -> Self {
271        Self {
272            id,
273            channel_manager: ChannelManager::default(),
274            leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
275                id,
276                role,
277                DEFAULT_ASK_LEADER_MAX_RETRY,
278                ChannelManager::default(),
279            )),
280            heartbeat: None,
281            config: None,
282            store: None,
283            procedure: None,
284            cluster: None,
285            region_follower: None,
286        }
287    }
288}
289
290pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
291
292/// A trait for clients that can manage region followers.
293#[async_trait::async_trait]
294pub trait RegionFollowerClient: Sync + Send + Debug {
295    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
296
297    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
298
299    async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
300
301    async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
302
303    async fn start(&self, urls: &[&str]) -> Result<()>;
304
305    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
306}
307
308#[async_trait::async_trait]
309impl ProcedureExecutor for MetaClient {
310    async fn submit_ddl_task(
311        &self,
312        _ctx: &ExecutorContext,
313        request: SubmitDdlTaskRequest,
314    ) -> MetaResult<SubmitDdlTaskResponse> {
315        self.submit_ddl_task(request)
316            .await
317            .map_err(BoxedError::new)
318            .context(meta_error::ExternalSnafu)
319    }
320
321    async fn migrate_region(
322        &self,
323        _ctx: &ExecutorContext,
324        request: MigrateRegionRequest,
325    ) -> MetaResult<MigrateRegionResponse> {
326        self.migrate_region(request)
327            .await
328            .map_err(BoxedError::new)
329            .context(meta_error::ExternalSnafu)
330    }
331
332    async fn reconcile(
333        &self,
334        _ctx: &ExecutorContext,
335        request: ReconcileRequest,
336    ) -> MetaResult<ReconcileResponse> {
337        self.reconcile(request)
338            .await
339            .map_err(BoxedError::new)
340            .context(meta_error::ExternalSnafu)
341    }
342
343    async fn manage_region_follower(
344        &self,
345        _ctx: &ExecutorContext,
346        request: ManageRegionFollowerRequest,
347    ) -> MetaResult<()> {
348        if let Some(region_follower) = &self.region_follower {
349            match request {
350                ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
351                    region_follower
352                        .add_region_follower(add_region_follower_request)
353                        .await
354                }
355                ManageRegionFollowerRequest::RemoveRegionFollower(
356                    remove_region_follower_request,
357                ) => {
358                    region_follower
359                        .remove_region_follower(remove_region_follower_request)
360                        .await
361                }
362                ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
363                    region_follower
364                        .add_table_follower(add_table_follower_request)
365                        .await
366                }
367                ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
368                    region_follower
369                        .remove_table_follower(remove_table_follower_request)
370                        .await
371                }
372            }
373            .map_err(BoxedError::new)
374            .context(meta_error::ExternalSnafu)
375        } else {
376            UnsupportedSnafu {
377                operation: "manage_region_follower",
378            }
379            .fail()
380        }
381    }
382
383    async fn query_procedure_state(
384        &self,
385        _ctx: &ExecutorContext,
386        pid: &str,
387    ) -> MetaResult<ProcedureStateResponse> {
388        self.query_procedure_state(pid)
389            .await
390            .map_err(BoxedError::new)
391            .context(meta_error::ExternalSnafu)
392    }
393
394    async fn gc_regions(
395        &self,
396        _ctx: &ExecutorContext,
397        request: GcRegionsRequest,
398    ) -> MetaResult<GcResponse> {
399        self.gc_regions(request)
400            .await
401            .map_err(BoxedError::new)
402            .context(meta_error::ExternalSnafu)
403    }
404
405    async fn gc_table(
406        &self,
407        _ctx: &ExecutorContext,
408        request: GcTableRequest,
409    ) -> MetaResult<GcResponse> {
410        self.gc_table(request)
411            .await
412            .map_err(BoxedError::new)
413            .context(meta_error::ExternalSnafu)
414    }
415
416    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
417        self.procedure_client()
418            .map_err(BoxedError::new)
419            .context(meta_error::ExternalSnafu)?
420            .list_procedures()
421            .await
422            .map_err(BoxedError::new)
423            .context(meta_error::ExternalSnafu)
424    }
425}
426
427// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated fields are removed from the proto.
428#[allow(deprecated)]
429#[async_trait::async_trait]
430impl ClusterInfo for MetaClient {
431    type Error = Error;
432
433    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
434        let cluster_client = self.cluster_client()?;
435
436        let (get_metasrv_nodes, nodes_key_prefix) = match role {
437            None => (true, Some(NodeInfoKey::key_prefix())),
438            Some(ClusterRole::Metasrv) => (true, None),
439            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
440        };
441
442        let mut nodes = if get_metasrv_nodes {
443            let last_activity_ts = -1; // Metasrv does not provide this information.
444
445            let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
446                cluster_client.get_metasrv_peers().await?;
447            followers
448                .into_iter()
449                .map(|node| {
450                    if let Some(node_info) = node.info {
451                        NodeInfo {
452                            peer: node.peer.unwrap_or_default(),
453                            last_activity_ts,
454                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
455                            version: node_info.version,
456                            git_commit: node_info.git_commit,
457                            start_time_ms: node_info.start_time_ms,
458                            total_cpu_millicores: node_info.total_cpu_millicores,
459                            total_memory_bytes: node_info.total_memory_bytes,
460                            cpu_usage_millicores: node_info.cpu_usage_millicores,
461                            memory_usage_bytes: node_info.memory_usage_bytes,
462                            hostname: node_info.hostname,
463                        }
464                    } else {
465                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
466                        NodeInfo {
467                            peer: node.peer.unwrap_or_default(),
468                            last_activity_ts,
469                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
470                            version: node.version,
471                            git_commit: node.git_commit,
472                            start_time_ms: node.start_time_ms,
473                            total_cpu_millicores: node.cpus as i64,
474                            total_memory_bytes: node.memory_bytes as i64,
475                            cpu_usage_millicores: 0,
476                            memory_usage_bytes: 0,
477                            hostname: "".to_string(),
478                        }
479                    }
480                })
481                .chain(leader.into_iter().map(|node| {
482                    if let Some(node_info) = node.info {
483                        NodeInfo {
484                            peer: node.peer.unwrap_or_default(),
485                            last_activity_ts,
486                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
487                            version: node_info.version,
488                            git_commit: node_info.git_commit,
489                            start_time_ms: node_info.start_time_ms,
490                            total_cpu_millicores: node_info.total_cpu_millicores,
491                            total_memory_bytes: node_info.total_memory_bytes,
492                            cpu_usage_millicores: node_info.cpu_usage_millicores,
493                            memory_usage_bytes: node_info.memory_usage_bytes,
494                            hostname: node_info.hostname,
495                        }
496                    } else {
497                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
498                        NodeInfo {
499                            peer: node.peer.unwrap_or_default(),
500                            last_activity_ts,
501                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
502                            version: node.version,
503                            git_commit: node.git_commit,
504                            start_time_ms: node.start_time_ms,
505                            total_cpu_millicores: node.cpus as i64,
506                            total_memory_bytes: node.memory_bytes as i64,
507                            cpu_usage_millicores: 0,
508                            memory_usage_bytes: 0,
509                            hostname: "".to_string(),
510                        }
511                    }
512                }))
513                .collect::<Vec<_>>()
514        } else {
515            Vec::new()
516        };
517
518        if let Some(prefix) = nodes_key_prefix {
519            let req = RangeRequest::new().with_prefix(prefix);
520            let res = cluster_client.range(req).await?;
521            for kv in res.kvs {
522                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
523            }
524        }
525
526        Ok(nodes)
527    }
528
529    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
530        let cluster_kv_backend = Arc::new(self.cluster_client()?);
531        let range_prefix = DatanodeStatKey::prefix_key();
532        let req = RangeRequest::new().with_prefix(range_prefix);
533        let stream =
534            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
535        let mut datanode_stats = stream
536            .try_collect::<Vec<_>>()
537            .await
538            .context(ConvertMetaResponseSnafu)?;
539        let region_stats = datanode_stats
540            .iter_mut()
541            .flat_map(|datanode_stat| {
542                let last = datanode_stat.stats.pop();
543                last.map(|stat| stat.region_stats).unwrap_or_default()
544            })
545            .collect::<Vec<_>>();
546
547        Ok(region_stats)
548    }
549
550    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
551        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
552        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
553        let flow_state_manager = FlowStateManager::new(cluster_backend);
554        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
555
556        Ok(res.map(|r| r.into()))
557    }
558}
559
560// TODO(weny): the discovery using client side timestamp may be inaccurate,
561// maybe we need to use the timestamp from metasrv in the future.
562#[async_trait::async_trait]
563impl PeerDiscovery for MetaClient {
564    async fn active_frontends(&self) -> MetaResult<Vec<Peer>> {
565        let nodes = self
566            .list_nodes(Some(ClusterRole::Frontend))
567            .await
568            .map_err(BoxedError::new)
569            .context(ExternalSnafu)?;
570        Ok(util::alive_frontends(
571            &DefaultSystemTimer,
572            nodes,
573            // TODO(weny): the heartbeat interval should be received from metasrv
574            // instead of using the default value.
575            default_distributed_time_constants().frontend_heartbeat_interval,
576        ))
577    }
578
579    async fn active_datanodes(
580        &self,
581        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
582    ) -> MetaResult<Vec<Peer>> {
583        let nodes = self
584            .list_nodes(Some(ClusterRole::Datanode))
585            .await
586            .map_err(BoxedError::new)
587            .context(ExternalSnafu)?;
588        Ok(util::alive_datanodes(
589            &DefaultSystemTimer,
590            nodes,
591            default_distributed_time_constants().datanode_lease,
592            filter,
593        ))
594    }
595
596    async fn active_flownodes(
597        &self,
598        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
599    ) -> MetaResult<Vec<Peer>> {
600        let nodes = self
601            .list_nodes(Some(ClusterRole::Flownode))
602            .await
603            .map_err(BoxedError::new)
604            .context(ExternalSnafu)?;
605        Ok(util::alive_flownodes(
606            &DefaultSystemTimer,
607            nodes,
608            default_distributed_time_constants().flownode_lease,
609            filter,
610        ))
611    }
612}
613
614fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
615    DatanodeStatValue::try_from(kv.value)
616        .map_err(BoxedError::new)
617        .context(ExternalSnafu)
618}
619
620impl MetaClient {
621    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
622    where
623        U: AsRef<str>,
624        A: AsRef<[U]> + Clone,
625    {
626        info!("MetaClient channel config: {:?}", self.channel_config());
627
628        let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
629        let leader_provider = self.leader_provider_factory.create(&urls);
630
631        self.start_with(leader_provider, urls).await
632    }
633
634    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
635    pub(crate) async fn start_with<U, A>(
636        &mut self,
637        leader_provider: LeaderProviderRef,
638        peers: A,
639    ) -> Result<()>
640    where
641        U: AsRef<str>,
642        A: AsRef<[U]> + Clone,
643    {
644        if let Some(client) = &self.region_follower {
645            info!("Starting region follower client ...");
646            client.start_with(leader_provider.clone()).await?;
647        }
648
649        if let Some(client) = &self.heartbeat {
650            info!("Starting heartbeat client ...");
651            client.start_with(leader_provider.clone()).await?;
652        }
653
654        if let Some(client) = &self.config {
655            info!("Starting config client ...");
656            client.start_with(leader_provider.clone()).await?;
657        }
658
659        if let Some(client) = &mut self.store {
660            info!("Starting store client ...");
661            client.start(peers.clone()).await?;
662        }
663
664        if let Some(client) = &self.procedure {
665            info!("Starting procedure client ...");
666            client.start_with(leader_provider.clone()).await?;
667        }
668
669        if let Some(client) = &mut self.cluster {
670            info!("Starting cluster client ...");
671            client.start_with(leader_provider).await?;
672        }
673        Ok(())
674    }
675
676    /// Ask the leader address of `metasrv`, and the heartbeat component
677    /// needs to create a bidirectional streaming to the leader.
678    pub async fn ask_leader(&self) -> Result<String> {
679        self.heartbeat_client()?.ask_leader().await
680    }
681
682    pub async fn pull_config<T, U>(&self, deserializer: T) -> Result<U>
683    where
684        T: PluginOptionsDeserializer<U>,
685        U: DeserializeOwned,
686    {
687        let res = self.config_client()?.pull_config().await?;
688        let v = deserializer
689            .deserialize(&res.payload)
690            .context(ConvertMetaConfigSnafu)?;
691        Ok(v)
692    }
693
694    /// Returns a heartbeat bidirectional streaming: (sender, receiver), the
695    /// other end is the leader of `metasrv`.
696    ///
697    /// The `datanode` needs to use the sender to continuously send heartbeat
698    /// packets (some self-state data), and the receiver can receive a response
699    /// from "metasrv" (which may contain some scheduling instructions).
700    ///
701    /// Returns the heartbeat sender, stream, and configuration received from Metasrv.
702    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
703        self.heartbeat_client()?.heartbeat().await
704    }
705
706    /// Range gets the keys in the range from the key-value store.
707    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
708        self.store_client()?
709            .range(req.into())
710            .await?
711            .try_into()
712            .context(ConvertMetaResponseSnafu)
713    }
714
715    /// Put puts the given key into the key-value store.
716    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
717        self.store_client()?
718            .put(req.into())
719            .await?
720            .try_into()
721            .context(ConvertMetaResponseSnafu)
722    }
723
724    /// BatchGet atomically get values by the given keys from the key-value store.
725    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
726        self.store_client()?
727            .batch_get(req.into())
728            .await?
729            .try_into()
730            .context(ConvertMetaResponseSnafu)
731    }
732
733    /// BatchPut atomically puts the given keys into the key-value store.
734    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
735        self.store_client()?
736            .batch_put(req.into())
737            .await?
738            .try_into()
739            .context(ConvertMetaResponseSnafu)
740    }
741
742    /// BatchDelete atomically deletes the given keys from the key-value store.
743    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
744        self.store_client()?
745            .batch_delete(req.into())
746            .await?
747            .try_into()
748            .context(ConvertMetaResponseSnafu)
749    }
750
751    /// CompareAndPut atomically puts the value to the given updated
752    /// value if the current value == the expected value.
753    pub async fn compare_and_put(
754        &self,
755        req: CompareAndPutRequest,
756    ) -> Result<CompareAndPutResponse> {
757        self.store_client()?
758            .compare_and_put(req.into())
759            .await?
760            .try_into()
761            .context(ConvertMetaResponseSnafu)
762    }
763
764    /// DeleteRange deletes the given range from the key-value store.
765    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
766        self.store_client()?
767            .delete_range(req.into())
768            .await?
769            .try_into()
770            .context(ConvertMetaResponseSnafu)
771    }
772
773    /// Query the procedure state by its id.
774    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
775        self.procedure_client()?.query_procedure_state(pid).await
776    }
777
778    /// Submit a region migration task.
779    pub async fn migrate_region(
780        &self,
781        request: MigrateRegionRequest,
782    ) -> Result<MigrateRegionResponse> {
783        self.procedure_client()?
784            .migrate_region(
785                request.region_id,
786                request.from_peer,
787                request.to_peer,
788                request.timeout,
789            )
790            .await
791    }
792
793    /// Reconcile the procedure state.
794    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
795        self.procedure_client()?.reconcile(request).await
796    }
797
798    /// Manually trigger GC for specific regions.
799    pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result<GcResponse> {
800        self.procedure_client()?.gc_regions(request).await
801    }
802
803    /// Manually trigger GC for a table (all its regions).
804    pub async fn gc_table(&self, request: GcTableRequest) -> Result<GcResponse> {
805        self.procedure_client()?.gc_table(request).await
806    }
807
808    /// Submit a DDL task
809    pub async fn submit_ddl_task(
810        &self,
811        req: SubmitDdlTaskRequest,
812    ) -> Result<SubmitDdlTaskResponse> {
813        let res = self
814            .procedure_client()?
815            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
816            .await?
817            .try_into()
818            .context(ConvertMetaResponseSnafu)?;
819
820        Ok(res)
821    }
822
823    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
824        self.heartbeat.clone().context(NotStartedSnafu {
825            name: "heartbeat_client",
826        })
827    }
828
829    pub fn config_client(&self) -> Result<ConfigClient> {
830        self.config.clone().context(NotStartedSnafu {
831            name: "config_client",
832        })
833    }
834
835    pub fn store_client(&self) -> Result<StoreClient> {
836        self.store.clone().context(NotStartedSnafu {
837            name: "store_client",
838        })
839    }
840
841    pub fn procedure_client(&self) -> Result<ProcedureClient> {
842        self.procedure.clone().context(NotStartedSnafu {
843            name: "procedure_client",
844        })
845    }
846
847    pub fn cluster_client(&self) -> Result<ClusterClient> {
848        self.cluster.clone().context(NotStartedSnafu {
849            name: "cluster_client",
850        })
851    }
852
853    pub fn channel_config(&self) -> &ChannelConfig {
854        self.channel_manager.config()
855    }
856
857    pub fn id(&self) -> Id {
858        self.id
859    }
860}
861
862#[cfg(test)]
863mod tests {
864    use std::sync::atomic::{AtomicUsize, Ordering};
865
866    use api::v1::meta::{HeartbeatRequest, Peer};
867    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
868    use rand::Rng;
869
870    use super::*;
871    use crate::error;
872    use crate::mocks::{self, MockMetaContext};
873
874    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
875
876    struct TestClient {
877        ns: String,
878        client: MetaClient,
879        meta_ctx: MockMetaContext,
880    }
881
882    impl TestClient {
883        async fn new(ns: impl Into<String>) -> Self {
884            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
885            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
886            Self {
887                ns: ns.into(),
888                client,
889                meta_ctx,
890            }
891        }
892
893        fn key(&self, name: &str) -> Vec<u8> {
894            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
895        }
896
897        async fn gen_data(&self) {
898            for i in 0..10 {
899                let req = PutRequest::new()
900                    .with_key(self.key(&format!("key-{i}")))
901                    .with_value(format!("{}-{}", "value", i).into_bytes())
902                    .with_prev_kv();
903                let res = self.client.put(req).await;
904                let _ = res.unwrap();
905            }
906        }
907
908        async fn clear_data(&self) {
909            let req =
910                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
911            let res = self.client.delete_range(req).await;
912            let _ = res.unwrap();
913        }
914
915        #[allow(dead_code)]
916        fn kv_backend(&self) -> KvBackendRef {
917            self.meta_ctx.kv_backend.clone()
918        }
919
920        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
921            self.meta_ctx.in_memory.clone()
922        }
923    }
924
925    async fn new_client(ns: impl Into<String>) -> TestClient {
926        let client = TestClient::new(ns).await;
927        client.clear_data().await;
928        client
929    }
930
931    #[tokio::test]
932    async fn test_meta_client_builder() {
933        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
934
935        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
936            .enable_heartbeat()
937            .build();
938        let _ = meta_client.heartbeat_client().unwrap();
939        assert!(meta_client.store_client().is_err());
940        meta_client.start(urls).await.unwrap();
941
942        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
943        assert!(meta_client.heartbeat_client().is_err());
944        assert!(meta_client.store_client().is_err());
945        meta_client.start(urls).await.unwrap();
946
947        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
948            .enable_store()
949            .build();
950        assert!(meta_client.heartbeat_client().is_err());
951        let _ = meta_client.store_client().unwrap();
952        meta_client.start(urls).await.unwrap();
953
954        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
955            .enable_heartbeat()
956            .enable_store()
957            .build();
958        assert_eq!(2, meta_client.id());
959        assert_eq!(2, meta_client.id());
960        let _ = meta_client.heartbeat_client().unwrap();
961        let _ = meta_client.store_client().unwrap();
962        meta_client.start(urls).await.unwrap();
963    }
964
965    #[tokio::test]
966    async fn test_not_start_heartbeat_client() {
967        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
968        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
969            .enable_store()
970            .build();
971        meta_client.start(urls).await.unwrap();
972        let res = meta_client.ask_leader().await;
973        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
974    }
975
976    #[tokio::test]
977    async fn test_not_start_store_client() {
978        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
979        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
980            .enable_heartbeat()
981            .build();
982
983        meta_client.start(urls).await.unwrap();
984        let res = meta_client.put(PutRequest::default()).await;
985        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
986    }
987
988    #[tokio::test]
989    async fn test_ask_leader() {
990        let tc = new_client("test_ask_leader").await;
991        tc.client.ask_leader().await.unwrap();
992    }
993
994    #[tokio::test]
995    async fn test_heartbeat() {
996        let tc = new_client("test_heartbeat").await;
997        let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
998        // send heartbeats
999
1000        let request_sent = Arc::new(AtomicUsize::new(0));
1001        let request_sent_clone = request_sent.clone();
1002        let _handle = tokio::spawn(async move {
1003            for _ in 0..5 {
1004                let req = HeartbeatRequest {
1005                    peer: Some(Peer {
1006                        id: 1,
1007                        addr: "meta_client_peer".to_string(),
1008                    }),
1009                    ..Default::default()
1010                };
1011                sender.send(req).await.unwrap();
1012                request_sent_clone.fetch_add(1, Ordering::Relaxed);
1013            }
1014        });
1015
1016        let heartbeat_count = Arc::new(AtomicUsize::new(0));
1017        let heartbeat_count_clone = heartbeat_count.clone();
1018        let handle = tokio::spawn(async move {
1019            while let Some(_resp) = receiver.message().await.unwrap() {
1020                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
1021            }
1022        });
1023
1024        handle.await.unwrap();
1025        //+1 for the initial response
1026        assert_eq!(
1027            request_sent.load(Ordering::Relaxed) + 1,
1028            heartbeat_count.load(Ordering::Relaxed)
1029        );
1030    }
1031
1032    #[tokio::test]
1033    async fn test_range_get() {
1034        let tc = new_client("test_range_get").await;
1035        tc.gen_data().await;
1036
1037        let key = tc.key("key-0");
1038        let req = RangeRequest::new().with_key(key.as_slice());
1039        let res = tc.client.range(req).await;
1040        let mut kvs = res.unwrap().take_kvs();
1041        assert_eq!(1, kvs.len());
1042        let mut kv = kvs.pop().unwrap();
1043        assert_eq!(key, kv.take_key());
1044        assert_eq!(b"value-0".to_vec(), kv.take_value());
1045    }
1046
1047    #[tokio::test]
1048    async fn test_range_get_prefix() {
1049        let tc = new_client("test_range_get_prefix").await;
1050        tc.gen_data().await;
1051
1052        let req = RangeRequest::new().with_prefix(tc.key("key-"));
1053        let res = tc.client.range(req).await;
1054        let kvs = res.unwrap().take_kvs();
1055        assert_eq!(10, kvs.len());
1056        for (i, mut kv) in kvs.into_iter().enumerate() {
1057            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
1058            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1059        }
1060    }
1061
1062    #[tokio::test]
1063    async fn test_range() {
1064        let tc = new_client("test_range").await;
1065        tc.gen_data().await;
1066
1067        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
1068        let res = tc.client.range(req).await;
1069        let kvs = res.unwrap().take_kvs();
1070        assert_eq!(3, kvs.len());
1071        for (i, mut kv) in kvs.into_iter().enumerate() {
1072            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1073            assert_eq!(
1074                format!("{}-{}", "value", i + 5).into_bytes(),
1075                kv.take_value()
1076            );
1077        }
1078    }
1079
1080    #[tokio::test]
1081    async fn test_range_keys_only() {
1082        let tc = new_client("test_range_keys_only").await;
1083        tc.gen_data().await;
1084
1085        let req = RangeRequest::new()
1086            .with_range(tc.key("key-5"), tc.key("key-8"))
1087            .with_keys_only();
1088        let res = tc.client.range(req).await;
1089        let kvs = res.unwrap().take_kvs();
1090        assert_eq!(3, kvs.len());
1091        for (i, mut kv) in kvs.into_iter().enumerate() {
1092            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1093            assert!(kv.take_value().is_empty());
1094        }
1095    }
1096
1097    #[tokio::test]
1098    async fn test_put() {
1099        let tc = new_client("test_put").await;
1100
1101        let req = PutRequest::new()
1102            .with_key(tc.key("key"))
1103            .with_value(b"value".to_vec());
1104        let res = tc.client.put(req).await;
1105        assert!(res.unwrap().prev_kv.is_none());
1106    }
1107
1108    #[tokio::test]
1109    async fn test_put_with_prev_kv() {
1110        let tc = new_client("test_put_with_prev_kv").await;
1111
1112        let key = tc.key("key");
1113        let req = PutRequest::new()
1114            .with_key(key.as_slice())
1115            .with_value(b"value".to_vec())
1116            .with_prev_kv();
1117        let res = tc.client.put(req).await;
1118        assert!(res.unwrap().prev_kv.is_none());
1119
1120        let req = PutRequest::new()
1121            .with_key(key.as_slice())
1122            .with_value(b"value1".to_vec())
1123            .with_prev_kv();
1124        let res = tc.client.put(req).await;
1125        let mut kv = res.unwrap().prev_kv.unwrap();
1126        assert_eq!(key, kv.take_key());
1127        assert_eq!(b"value".to_vec(), kv.take_value());
1128    }
1129
1130    #[tokio::test]
1131    async fn test_batch_put() {
1132        let tc = new_client("test_batch_put").await;
1133
1134        let mut req = BatchPutRequest::new();
1135        for i in 0..275 {
1136            req = req.add_kv(
1137                tc.key(&format!("key-{}", i)),
1138                format!("value-{}", i).into_bytes(),
1139            );
1140        }
1141
1142        let res = tc.client.batch_put(req).await;
1143        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1144
1145        let req = RangeRequest::new().with_prefix(tc.key("key-"));
1146        let res = tc.client.range(req).await;
1147        let kvs = res.unwrap().take_kvs();
1148        assert_eq!(275, kvs.len());
1149    }
1150
1151    #[tokio::test]
1152    async fn test_batch_get() {
1153        let tc = new_client("test_batch_get").await;
1154        tc.gen_data().await;
1155
1156        let mut req = BatchGetRequest::default();
1157        for i in 0..256 {
1158            req = req.add_key(tc.key(&format!("key-{}", i)));
1159        }
1160        let res = tc.client.batch_get(req).await.unwrap();
1161        assert_eq!(10, res.kvs.len());
1162
1163        let req = BatchGetRequest::default()
1164            .add_key(tc.key("key-1"))
1165            .add_key(tc.key("key-999"));
1166        let res = tc.client.batch_get(req).await.unwrap();
1167        assert_eq!(1, res.kvs.len());
1168    }
1169
1170    #[tokio::test]
1171    async fn test_batch_put_with_prev_kv() {
1172        let tc = new_client("test_batch_put_with_prev_kv").await;
1173
1174        let key = tc.key("key");
1175        let key2 = tc.key("key2");
1176        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1177        let res = tc.client.batch_put(req).await;
1178        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1179
1180        let req = BatchPutRequest::new()
1181            .add_kv(key.as_slice(), b"value-".to_vec())
1182            .add_kv(key2.as_slice(), b"value2-".to_vec())
1183            .with_prev_kv();
1184        let res = tc.client.batch_put(req).await;
1185        let mut kvs = res.unwrap().take_prev_kvs();
1186        assert_eq!(1, kvs.len());
1187        let mut kv = kvs.pop().unwrap();
1188        assert_eq!(key, kv.take_key());
1189        assert_eq!(b"value".to_vec(), kv.take_value());
1190    }
1191
1192    #[tokio::test]
1193    async fn test_compare_and_put() {
1194        let tc = new_client("test_compare_and_put").await;
1195
1196        let key = tc.key("key");
1197        let req = CompareAndPutRequest::new()
1198            .with_key(key.as_slice())
1199            .with_expect(b"expect".to_vec())
1200            .with_value(b"value".to_vec());
1201        let res = tc.client.compare_and_put(req).await;
1202        assert!(!res.unwrap().is_success());
1203
1204        // create if absent
1205        let req = CompareAndPutRequest::new()
1206            .with_key(key.as_slice())
1207            .with_value(b"value".to_vec());
1208        let res = tc.client.compare_and_put(req).await;
1209        let mut res = res.unwrap();
1210        assert!(res.is_success());
1211        assert!(res.take_prev_kv().is_none());
1212
1213        // compare and put fail
1214        let req = CompareAndPutRequest::new()
1215            .with_key(key.as_slice())
1216            .with_expect(b"not_eq".to_vec())
1217            .with_value(b"value2".to_vec());
1218        let res = tc.client.compare_and_put(req).await;
1219        let mut res = res.unwrap();
1220        assert!(!res.is_success());
1221        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1222
1223        // compare and put success
1224        let req = CompareAndPutRequest::new()
1225            .with_key(key.as_slice())
1226            .with_expect(b"value".to_vec())
1227            .with_value(b"value2".to_vec());
1228        let res = tc.client.compare_and_put(req).await;
1229        let mut res = res.unwrap();
1230        assert!(res.is_success());
1231
1232        // If compare-and-put is success, previous value doesn't need to be returned.
1233        assert!(res.take_prev_kv().is_none());
1234    }
1235
1236    #[tokio::test]
1237    async fn test_delete_with_key() {
1238        let tc = new_client("test_delete_with_key").await;
1239        tc.gen_data().await;
1240
1241        let req = DeleteRangeRequest::new()
1242            .with_key(tc.key("key-0"))
1243            .with_prev_kv();
1244        let res = tc.client.delete_range(req).await;
1245        let mut res = res.unwrap();
1246        assert_eq!(1, res.deleted());
1247        let mut kvs = res.take_prev_kvs();
1248        assert_eq!(1, kvs.len());
1249        let mut kv = kvs.pop().unwrap();
1250        assert_eq!(b"value-0".to_vec(), kv.take_value());
1251    }
1252
1253    #[tokio::test]
1254    async fn test_delete_with_prefix() {
1255        let tc = new_client("test_delete_with_prefix").await;
1256        tc.gen_data().await;
1257
1258        let req = DeleteRangeRequest::new()
1259            .with_prefix(tc.key("key-"))
1260            .with_prev_kv();
1261        let res = tc.client.delete_range(req).await;
1262        let mut res = res.unwrap();
1263        assert_eq!(10, res.deleted());
1264        let kvs = res.take_prev_kvs();
1265        assert_eq!(10, kvs.len());
1266        for (i, mut kv) in kvs.into_iter().enumerate() {
1267            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1268        }
1269    }
1270
1271    #[tokio::test]
1272    async fn test_delete_with_range() {
1273        let tc = new_client("test_delete_with_range").await;
1274        tc.gen_data().await;
1275
1276        let req = DeleteRangeRequest::new()
1277            .with_range(tc.key("key-2"), tc.key("key-7"))
1278            .with_prev_kv();
1279        let res = tc.client.delete_range(req).await;
1280        let mut res = res.unwrap();
1281        assert_eq!(5, res.deleted());
1282        let kvs = res.take_prev_kvs();
1283        assert_eq!(5, kvs.len());
1284        for (i, mut kv) in kvs.into_iter().enumerate() {
1285            assert_eq!(
1286                format!("{}-{}", "value", i + 2).into_bytes(),
1287                kv.take_value()
1288            );
1289        }
1290    }
1291
1292    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1293        Ok(())
1294    }
1295
1296    #[tokio::test]
1297    async fn test_cluster_client_adaptive_range() {
1298        let tx = new_client("test_cluster_client").await;
1299        let in_memory = tx.in_memory().unwrap();
1300        let cluster_client = tx.client.cluster_client().unwrap();
1301        let mut rng = rand::rng();
1302
1303        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1304        for i in 0..10 {
1305            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1306            in_memory
1307                .put(
1308                    PutRequest::new()
1309                        .with_key(format!("__prefix/{i}").as_bytes())
1310                        .with_value(data.clone()),
1311                )
1312                .await
1313                .unwrap();
1314        }
1315
1316        let req = RangeRequest::new().with_prefix(b"__prefix/");
1317        let stream =
1318            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1319
1320        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1321        assert_eq!(10, res.len());
1322    }
1323}