meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{
28    MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
29};
30pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
31use cluster::Client as ClusterClient;
32pub use cluster::ClusterKvBackend;
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
35use common_meta::cluster::{
36    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
37};
38use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
39use common_meta::error::{
40    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
41};
42use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
43use common_meta::kv_backend::KvBackendRef;
44use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
45use common_meta::range_stream::PaginationStream;
46use common_meta::rpc::KeyValue;
47use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
48use common_meta::rpc::procedure::{
49    AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
50    MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
51    RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
52};
53use common_meta::rpc::store::{
54    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
55    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
56    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
57};
58use common_telemetry::info;
59use futures::TryStreamExt;
60use heartbeat::Client as HeartbeatClient;
61use procedure::Client as ProcedureClient;
62use snafu::{OptionExt, ResultExt};
63use store::Client as StoreClient;
64
65pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
66use crate::error::{
67    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
68    Result,
69};
70
71pub type Id = u64;
72
73const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
74const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
75const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
76
77#[derive(Clone, Debug, Default)]
78pub struct MetaClientBuilder {
79    id: Id,
80    role: Role,
81    enable_heartbeat: bool,
82    enable_store: bool,
83    enable_procedure: bool,
84    enable_access_cluster_info: bool,
85    region_follower: Option<RegionFollowerClientRef>,
86    channel_manager: Option<ChannelManager>,
87    ddl_channel_manager: Option<ChannelManager>,
88    heartbeat_channel_manager: Option<ChannelManager>,
89}
90
91impl MetaClientBuilder {
92    pub fn new(member_id: u64, role: Role) -> Self {
93        Self {
94            id: member_id,
95            role,
96            ..Default::default()
97        }
98    }
99
100    /// Returns the role of Frontend's default options.
101    pub fn frontend_default_options() -> Self {
102        // Frontend does not need a member id.
103        Self::new(0, Role::Frontend)
104            .enable_store()
105            .enable_heartbeat()
106            .enable_procedure()
107            .enable_access_cluster_info()
108    }
109
110    /// Returns the role of Datanode's default options.
111    pub fn datanode_default_options(member_id: u64) -> Self {
112        Self::new(member_id, Role::Datanode)
113            .enable_store()
114            .enable_heartbeat()
115    }
116
117    /// Returns the role of Flownode's default options.
118    pub fn flownode_default_options(member_id: u64) -> Self {
119        Self::new(member_id, Role::Flownode)
120            .enable_store()
121            .enable_heartbeat()
122            .enable_procedure()
123            .enable_access_cluster_info()
124    }
125
126    pub fn enable_heartbeat(self) -> Self {
127        Self {
128            enable_heartbeat: true,
129            ..self
130        }
131    }
132
133    pub fn enable_store(self) -> Self {
134        Self {
135            enable_store: true,
136            ..self
137        }
138    }
139
140    pub fn enable_procedure(self) -> Self {
141        Self {
142            enable_procedure: true,
143            ..self
144        }
145    }
146
147    pub fn enable_access_cluster_info(self) -> Self {
148        Self {
149            enable_access_cluster_info: true,
150            ..self
151        }
152    }
153
154    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
155        Self {
156            channel_manager: Some(channel_manager),
157            ..self
158        }
159    }
160
161    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
162        Self {
163            ddl_channel_manager: Some(channel_manager),
164            ..self
165        }
166    }
167
168    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
169        Self {
170            heartbeat_channel_manager: Some(channel_manager),
171            ..self
172        }
173    }
174
175    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
176        Self {
177            region_follower: Some(region_follower),
178            ..self
179        }
180    }
181
182    pub fn build(self) -> MetaClient {
183        let mut client = if let Some(mgr) = self.channel_manager {
184            MetaClient::with_channel_manager(self.id, mgr)
185        } else {
186            MetaClient::new(self.id)
187        };
188
189        let mgr = client.channel_manager.clone();
190
191        if self.enable_heartbeat {
192            let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
193            client.heartbeat = Some(HeartbeatClient::new(
194                self.id,
195                self.role,
196                mgr,
197                DEFAULT_ASK_LEADER_MAX_RETRY,
198            ));
199        }
200
201        if self.enable_store {
202            client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
203        }
204
205        if self.enable_procedure {
206            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
207            client.procedure = Some(ProcedureClient::new(
208                self.id,
209                self.role,
210                mgr,
211                DEFAULT_SUBMIT_DDL_MAX_RETRY,
212            ));
213        }
214
215        if self.enable_access_cluster_info {
216            client.cluster = Some(ClusterClient::new(
217                self.id,
218                self.role,
219                mgr,
220                DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
221            ))
222        }
223
224        if let Some(region_follower) = self.region_follower {
225            client.region_follower = Some(region_follower);
226        }
227
228        client
229    }
230}
231
232#[derive(Debug, Default)]
233pub struct MetaClient {
234    id: Id,
235    channel_manager: ChannelManager,
236    heartbeat: Option<HeartbeatClient>,
237    store: Option<StoreClient>,
238    procedure: Option<ProcedureClient>,
239    cluster: Option<ClusterClient>,
240    region_follower: Option<RegionFollowerClientRef>,
241}
242
243pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
244
245/// A trait for clients that can manage region followers.
246#[async_trait::async_trait]
247pub trait RegionFollowerClient: Sync + Send + Debug {
248    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
249
250    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
251
252    async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
253
254    async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
255
256    async fn start(&self, urls: &[&str]) -> Result<()>;
257
258    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
259}
260
261#[async_trait::async_trait]
262impl ProcedureExecutor for MetaClient {
263    async fn submit_ddl_task(
264        &self,
265        _ctx: &ExecutorContext,
266        request: SubmitDdlTaskRequest,
267    ) -> MetaResult<SubmitDdlTaskResponse> {
268        self.submit_ddl_task(request)
269            .await
270            .map_err(BoxedError::new)
271            .context(meta_error::ExternalSnafu)
272    }
273
274    async fn migrate_region(
275        &self,
276        _ctx: &ExecutorContext,
277        request: MigrateRegionRequest,
278    ) -> MetaResult<MigrateRegionResponse> {
279        self.migrate_region(request)
280            .await
281            .map_err(BoxedError::new)
282            .context(meta_error::ExternalSnafu)
283    }
284
285    async fn reconcile(
286        &self,
287        _ctx: &ExecutorContext,
288        request: ReconcileRequest,
289    ) -> MetaResult<ReconcileResponse> {
290        self.reconcile(request)
291            .await
292            .map_err(BoxedError::new)
293            .context(meta_error::ExternalSnafu)
294    }
295
296    async fn manage_region_follower(
297        &self,
298        _ctx: &ExecutorContext,
299        request: ManageRegionFollowerRequest,
300    ) -> MetaResult<()> {
301        if let Some(region_follower) = &self.region_follower {
302            match request {
303                ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
304                    region_follower
305                        .add_region_follower(add_region_follower_request)
306                        .await
307                }
308                ManageRegionFollowerRequest::RemoveRegionFollower(
309                    remove_region_follower_request,
310                ) => {
311                    region_follower
312                        .remove_region_follower(remove_region_follower_request)
313                        .await
314                }
315                ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
316                    region_follower
317                        .add_table_follower(add_table_follower_request)
318                        .await
319                }
320                ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
321                    region_follower
322                        .remove_table_follower(remove_table_follower_request)
323                        .await
324                }
325            }
326            .map_err(BoxedError::new)
327            .context(meta_error::ExternalSnafu)
328        } else {
329            UnsupportedSnafu {
330                operation: "manage_region_follower",
331            }
332            .fail()
333        }
334    }
335
336    async fn query_procedure_state(
337        &self,
338        _ctx: &ExecutorContext,
339        pid: &str,
340    ) -> MetaResult<ProcedureStateResponse> {
341        self.query_procedure_state(pid)
342            .await
343            .map_err(BoxedError::new)
344            .context(meta_error::ExternalSnafu)
345    }
346
347    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
348        self.procedure_client()
349            .map_err(BoxedError::new)
350            .context(meta_error::ExternalSnafu)?
351            .list_procedures()
352            .await
353            .map_err(BoxedError::new)
354            .context(meta_error::ExternalSnafu)
355    }
356}
357
358// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated fields are removed from the proto.
359#[allow(deprecated)]
360#[async_trait::async_trait]
361impl ClusterInfo for MetaClient {
362    type Error = Error;
363
364    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
365        let cluster_client = self.cluster_client()?;
366
367        let (get_metasrv_nodes, nodes_key_prefix) = match role {
368            None => (true, Some(NodeInfoKey::key_prefix())),
369            Some(ClusterRole::Metasrv) => (true, None),
370            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
371        };
372
373        let mut nodes = if get_metasrv_nodes {
374            let last_activity_ts = -1; // Metasrv does not provide this information.
375
376            let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
377                cluster_client.get_metasrv_peers().await?;
378            followers
379                .into_iter()
380                .map(|node| {
381                    if let Some(node_info) = node.info {
382                        NodeInfo {
383                            peer: node.peer.unwrap_or_default(),
384                            last_activity_ts,
385                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
386                            version: node_info.version,
387                            git_commit: node_info.git_commit,
388                            start_time_ms: node_info.start_time_ms,
389                            total_cpu_millicores: node_info.total_cpu_millicores,
390                            total_memory_bytes: node_info.total_memory_bytes,
391                            cpu_usage_millicores: node_info.cpu_usage_millicores,
392                            memory_usage_bytes: node_info.memory_usage_bytes,
393                            hostname: node_info.hostname,
394                        }
395                    } else {
396                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
397                        NodeInfo {
398                            peer: node.peer.unwrap_or_default(),
399                            last_activity_ts,
400                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
401                            version: node.version,
402                            git_commit: node.git_commit,
403                            start_time_ms: node.start_time_ms,
404                            total_cpu_millicores: node.cpus as i64,
405                            total_memory_bytes: node.memory_bytes as i64,
406                            cpu_usage_millicores: 0,
407                            memory_usage_bytes: 0,
408                            hostname: "".to_string(),
409                        }
410                    }
411                })
412                .chain(leader.into_iter().map(|node| {
413                    if let Some(node_info) = node.info {
414                        NodeInfo {
415                            peer: node.peer.unwrap_or_default(),
416                            last_activity_ts,
417                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
418                            version: node_info.version,
419                            git_commit: node_info.git_commit,
420                            start_time_ms: node_info.start_time_ms,
421                            total_cpu_millicores: node_info.total_cpu_millicores,
422                            total_memory_bytes: node_info.total_memory_bytes,
423                            cpu_usage_millicores: node_info.cpu_usage_millicores,
424                            memory_usage_bytes: node_info.memory_usage_bytes,
425                            hostname: node_info.hostname,
426                        }
427                    } else {
428                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
429                        NodeInfo {
430                            peer: node.peer.unwrap_or_default(),
431                            last_activity_ts,
432                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
433                            version: node.version,
434                            git_commit: node.git_commit,
435                            start_time_ms: node.start_time_ms,
436                            total_cpu_millicores: node.cpus as i64,
437                            total_memory_bytes: node.memory_bytes as i64,
438                            cpu_usage_millicores: 0,
439                            memory_usage_bytes: 0,
440                            hostname: "".to_string(),
441                        }
442                    }
443                }))
444                .collect::<Vec<_>>()
445        } else {
446            Vec::new()
447        };
448
449        if let Some(prefix) = nodes_key_prefix {
450            let req = RangeRequest::new().with_prefix(prefix);
451            let res = cluster_client.range(req).await?;
452            for kv in res.kvs {
453                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
454            }
455        }
456
457        Ok(nodes)
458    }
459
460    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
461        let cluster_kv_backend = Arc::new(self.cluster_client()?);
462        let range_prefix = DatanodeStatKey::prefix_key();
463        let req = RangeRequest::new().with_prefix(range_prefix);
464        let stream =
465            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
466        let mut datanode_stats = stream
467            .try_collect::<Vec<_>>()
468            .await
469            .context(ConvertMetaResponseSnafu)?;
470        let region_stats = datanode_stats
471            .iter_mut()
472            .flat_map(|datanode_stat| {
473                let last = datanode_stat.stats.pop();
474                last.map(|stat| stat.region_stats).unwrap_or_default()
475            })
476            .collect::<Vec<_>>();
477
478        Ok(region_stats)
479    }
480
481    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
482        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
483        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
484        let flow_state_manager = FlowStateManager::new(cluster_backend);
485        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
486
487        Ok(res.map(|r| r.into()))
488    }
489}
490
491fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
492    DatanodeStatValue::try_from(kv.value)
493        .map_err(BoxedError::new)
494        .context(ExternalSnafu)
495}
496
497impl MetaClient {
498    pub fn new(id: Id) -> Self {
499        Self {
500            id,
501            ..Default::default()
502        }
503    }
504
505    pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
506        Self {
507            id,
508            channel_manager,
509            ..Default::default()
510        }
511    }
512
513    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
514    where
515        U: AsRef<str>,
516        A: AsRef<[U]> + Clone,
517    {
518        info!("MetaClient channel config: {:?}", self.channel_config());
519
520        if let Some(client) = &mut self.region_follower {
521            let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
522            client.start(&urls).await?;
523            info!("Region follower client started");
524        }
525        if let Some(client) = &mut self.heartbeat {
526            client.start(urls.clone()).await?;
527            info!("Heartbeat client started");
528        }
529        if let Some(client) = &mut self.store {
530            client.start(urls.clone()).await?;
531            info!("Store client started");
532        }
533        if let Some(client) = &mut self.procedure {
534            client.start(urls.clone()).await?;
535            info!("DDL client started");
536        }
537        if let Some(client) = &mut self.cluster {
538            client.start(urls).await?;
539            info!("Cluster client started");
540        }
541
542        Ok(())
543    }
544
545    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
546    pub(crate) async fn start_with<U, A>(
547        &mut self,
548        leader_provider: LeaderProviderRef,
549        peers: A,
550    ) -> Result<()>
551    where
552        U: AsRef<str>,
553        A: AsRef<[U]> + Clone,
554    {
555        if let Some(client) = &self.region_follower {
556            info!("Starting region follower client ...");
557            client.start_with(leader_provider.clone()).await?;
558        }
559
560        if let Some(client) = &self.heartbeat {
561            info!("Starting heartbeat client ...");
562            client.start_with(leader_provider.clone()).await?;
563        }
564
565        if let Some(client) = &mut self.store {
566            info!("Starting store client ...");
567            client.start(peers.clone()).await?;
568        }
569
570        if let Some(client) = &self.procedure {
571            info!("Starting procedure client ...");
572            client.start_with(leader_provider.clone()).await?;
573        }
574
575        if let Some(client) = &mut self.cluster {
576            info!("Starting cluster client ...");
577            client.start_with(leader_provider).await?;
578        }
579        Ok(())
580    }
581
582    /// Ask the leader address of `metasrv`, and the heartbeat component
583    /// needs to create a bidirectional streaming to the leader.
584    pub async fn ask_leader(&self) -> Result<String> {
585        self.heartbeat_client()?.ask_leader().await
586    }
587
588    /// Returns a heartbeat bidirectional streaming: (sender, receiver), the
589    /// other end is the leader of `metasrv`.
590    ///
591    /// The `datanode` needs to use the sender to continuously send heartbeat
592    /// packets (some self-state data), and the receiver can receive a response
593    /// from "metasrv" (which may contain some scheduling instructions).
594    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
595        self.heartbeat_client()?.heartbeat().await
596    }
597
598    /// Range gets the keys in the range from the key-value store.
599    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
600        self.store_client()?
601            .range(req.into())
602            .await?
603            .try_into()
604            .context(ConvertMetaResponseSnafu)
605    }
606
607    /// Put puts the given key into the key-value store.
608    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
609        self.store_client()?
610            .put(req.into())
611            .await?
612            .try_into()
613            .context(ConvertMetaResponseSnafu)
614    }
615
616    /// BatchGet atomically get values by the given keys from the key-value store.
617    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
618        self.store_client()?
619            .batch_get(req.into())
620            .await?
621            .try_into()
622            .context(ConvertMetaResponseSnafu)
623    }
624
625    /// BatchPut atomically puts the given keys into the key-value store.
626    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
627        self.store_client()?
628            .batch_put(req.into())
629            .await?
630            .try_into()
631            .context(ConvertMetaResponseSnafu)
632    }
633
634    /// BatchDelete atomically deletes the given keys from the key-value store.
635    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
636        self.store_client()?
637            .batch_delete(req.into())
638            .await?
639            .try_into()
640            .context(ConvertMetaResponseSnafu)
641    }
642
643    /// CompareAndPut atomically puts the value to the given updated
644    /// value if the current value == the expected value.
645    pub async fn compare_and_put(
646        &self,
647        req: CompareAndPutRequest,
648    ) -> Result<CompareAndPutResponse> {
649        self.store_client()?
650            .compare_and_put(req.into())
651            .await?
652            .try_into()
653            .context(ConvertMetaResponseSnafu)
654    }
655
656    /// DeleteRange deletes the given range from the key-value store.
657    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
658        self.store_client()?
659            .delete_range(req.into())
660            .await?
661            .try_into()
662            .context(ConvertMetaResponseSnafu)
663    }
664
665    /// Query the procedure state by its id.
666    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
667        self.procedure_client()?.query_procedure_state(pid).await
668    }
669
670    /// Submit a region migration task.
671    pub async fn migrate_region(
672        &self,
673        request: MigrateRegionRequest,
674    ) -> Result<MigrateRegionResponse> {
675        self.procedure_client()?
676            .migrate_region(
677                request.region_id,
678                request.from_peer,
679                request.to_peer,
680                request.timeout,
681            )
682            .await
683    }
684
685    /// Reconcile the procedure state.
686    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
687        self.procedure_client()?.reconcile(request).await
688    }
689
690    /// Submit a DDL task
691    pub async fn submit_ddl_task(
692        &self,
693        req: SubmitDdlTaskRequest,
694    ) -> Result<SubmitDdlTaskResponse> {
695        let res = self
696            .procedure_client()?
697            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
698            .await?
699            .try_into()
700            .context(ConvertMetaResponseSnafu)?;
701
702        Ok(res)
703    }
704
705    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
706        self.heartbeat.clone().context(NotStartedSnafu {
707            name: "heartbeat_client",
708        })
709    }
710
711    pub fn store_client(&self) -> Result<StoreClient> {
712        self.store.clone().context(NotStartedSnafu {
713            name: "store_client",
714        })
715    }
716
717    pub fn procedure_client(&self) -> Result<ProcedureClient> {
718        self.procedure.clone().context(NotStartedSnafu {
719            name: "procedure_client",
720        })
721    }
722
723    pub fn cluster_client(&self) -> Result<ClusterClient> {
724        self.cluster.clone().context(NotStartedSnafu {
725            name: "cluster_client",
726        })
727    }
728
729    pub fn channel_config(&self) -> &ChannelConfig {
730        self.channel_manager.config()
731    }
732
733    pub fn id(&self) -> Id {
734        self.id
735    }
736}
737
738#[cfg(test)]
739mod tests {
740    use std::sync::atomic::{AtomicUsize, Ordering};
741
742    use api::v1::meta::{HeartbeatRequest, Peer};
743    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
744    use rand::Rng;
745
746    use super::*;
747    use crate::error;
748    use crate::mocks::{self, MockMetaContext};
749
750    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
751
752    struct TestClient {
753        ns: String,
754        client: MetaClient,
755        meta_ctx: MockMetaContext,
756    }
757
758    impl TestClient {
759        async fn new(ns: impl Into<String>) -> Self {
760            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
761            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
762            Self {
763                ns: ns.into(),
764                client,
765                meta_ctx,
766            }
767        }
768
769        fn key(&self, name: &str) -> Vec<u8> {
770            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
771        }
772
773        async fn gen_data(&self) {
774            for i in 0..10 {
775                let req = PutRequest::new()
776                    .with_key(self.key(&format!("key-{i}")))
777                    .with_value(format!("{}-{}", "value", i).into_bytes())
778                    .with_prev_kv();
779                let res = self.client.put(req).await;
780                let _ = res.unwrap();
781            }
782        }
783
784        async fn clear_data(&self) {
785            let req =
786                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
787            let res = self.client.delete_range(req).await;
788            let _ = res.unwrap();
789        }
790
791        #[allow(dead_code)]
792        fn kv_backend(&self) -> KvBackendRef {
793            self.meta_ctx.kv_backend.clone()
794        }
795
796        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
797            self.meta_ctx.in_memory.clone()
798        }
799    }
800
801    async fn new_client(ns: impl Into<String>) -> TestClient {
802        let client = TestClient::new(ns).await;
803        client.clear_data().await;
804        client
805    }
806
807    #[tokio::test]
808    async fn test_meta_client_builder() {
809        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
810
811        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
812            .enable_heartbeat()
813            .build();
814        let _ = meta_client.heartbeat_client().unwrap();
815        assert!(meta_client.store_client().is_err());
816        meta_client.start(urls).await.unwrap();
817
818        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
819        assert!(meta_client.heartbeat_client().is_err());
820        assert!(meta_client.store_client().is_err());
821        meta_client.start(urls).await.unwrap();
822
823        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
824            .enable_store()
825            .build();
826        assert!(meta_client.heartbeat_client().is_err());
827        let _ = meta_client.store_client().unwrap();
828        meta_client.start(urls).await.unwrap();
829
830        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
831            .enable_heartbeat()
832            .enable_store()
833            .build();
834        assert_eq!(2, meta_client.id());
835        assert_eq!(2, meta_client.id());
836        let _ = meta_client.heartbeat_client().unwrap();
837        let _ = meta_client.store_client().unwrap();
838        meta_client.start(urls).await.unwrap();
839    }
840
841    #[tokio::test]
842    async fn test_not_start_heartbeat_client() {
843        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
844        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
845            .enable_store()
846            .build();
847        meta_client.start(urls).await.unwrap();
848        let res = meta_client.ask_leader().await;
849        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
850    }
851
852    #[tokio::test]
853    async fn test_not_start_store_client() {
854        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
855        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
856            .enable_heartbeat()
857            .build();
858
859        meta_client.start(urls).await.unwrap();
860        let res = meta_client.put(PutRequest::default()).await;
861        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
862    }
863
864    #[tokio::test]
865    async fn test_ask_leader() {
866        let tc = new_client("test_ask_leader").await;
867        tc.client.ask_leader().await.unwrap();
868    }
869
870    #[tokio::test]
871    async fn test_heartbeat() {
872        let tc = new_client("test_heartbeat").await;
873        let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
874        // send heartbeats
875
876        let request_sent = Arc::new(AtomicUsize::new(0));
877        let request_sent_clone = request_sent.clone();
878        let _handle = tokio::spawn(async move {
879            for _ in 0..5 {
880                let req = HeartbeatRequest {
881                    peer: Some(Peer {
882                        id: 1,
883                        addr: "meta_client_peer".to_string(),
884                    }),
885                    ..Default::default()
886                };
887                sender.send(req).await.unwrap();
888                request_sent_clone.fetch_add(1, Ordering::Relaxed);
889            }
890        });
891
892        let heartbeat_count = Arc::new(AtomicUsize::new(0));
893        let heartbeat_count_clone = heartbeat_count.clone();
894        let handle = tokio::spawn(async move {
895            while let Some(_resp) = receiver.message().await.unwrap() {
896                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
897            }
898        });
899
900        handle.await.unwrap();
901        //+1 for the initial response
902        assert_eq!(
903            request_sent.load(Ordering::Relaxed) + 1,
904            heartbeat_count.load(Ordering::Relaxed)
905        );
906    }
907
908    #[tokio::test]
909    async fn test_range_get() {
910        let tc = new_client("test_range_get").await;
911        tc.gen_data().await;
912
913        let key = tc.key("key-0");
914        let req = RangeRequest::new().with_key(key.as_slice());
915        let res = tc.client.range(req).await;
916        let mut kvs = res.unwrap().take_kvs();
917        assert_eq!(1, kvs.len());
918        let mut kv = kvs.pop().unwrap();
919        assert_eq!(key, kv.take_key());
920        assert_eq!(b"value-0".to_vec(), kv.take_value());
921    }
922
923    #[tokio::test]
924    async fn test_range_get_prefix() {
925        let tc = new_client("test_range_get_prefix").await;
926        tc.gen_data().await;
927
928        let req = RangeRequest::new().with_prefix(tc.key("key-"));
929        let res = tc.client.range(req).await;
930        let kvs = res.unwrap().take_kvs();
931        assert_eq!(10, kvs.len());
932        for (i, mut kv) in kvs.into_iter().enumerate() {
933            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
934            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
935        }
936    }
937
938    #[tokio::test]
939    async fn test_range() {
940        let tc = new_client("test_range").await;
941        tc.gen_data().await;
942
943        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
944        let res = tc.client.range(req).await;
945        let kvs = res.unwrap().take_kvs();
946        assert_eq!(3, kvs.len());
947        for (i, mut kv) in kvs.into_iter().enumerate() {
948            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
949            assert_eq!(
950                format!("{}-{}", "value", i + 5).into_bytes(),
951                kv.take_value()
952            );
953        }
954    }
955
956    #[tokio::test]
957    async fn test_range_keys_only() {
958        let tc = new_client("test_range_keys_only").await;
959        tc.gen_data().await;
960
961        let req = RangeRequest::new()
962            .with_range(tc.key("key-5"), tc.key("key-8"))
963            .with_keys_only();
964        let res = tc.client.range(req).await;
965        let kvs = res.unwrap().take_kvs();
966        assert_eq!(3, kvs.len());
967        for (i, mut kv) in kvs.into_iter().enumerate() {
968            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
969            assert!(kv.take_value().is_empty());
970        }
971    }
972
973    #[tokio::test]
974    async fn test_put() {
975        let tc = new_client("test_put").await;
976
977        let req = PutRequest::new()
978            .with_key(tc.key("key"))
979            .with_value(b"value".to_vec());
980        let res = tc.client.put(req).await;
981        assert!(res.unwrap().prev_kv.is_none());
982    }
983
984    #[tokio::test]
985    async fn test_put_with_prev_kv() {
986        let tc = new_client("test_put_with_prev_kv").await;
987
988        let key = tc.key("key");
989        let req = PutRequest::new()
990            .with_key(key.as_slice())
991            .with_value(b"value".to_vec())
992            .with_prev_kv();
993        let res = tc.client.put(req).await;
994        assert!(res.unwrap().prev_kv.is_none());
995
996        let req = PutRequest::new()
997            .with_key(key.as_slice())
998            .with_value(b"value1".to_vec())
999            .with_prev_kv();
1000        let res = tc.client.put(req).await;
1001        let mut kv = res.unwrap().prev_kv.unwrap();
1002        assert_eq!(key, kv.take_key());
1003        assert_eq!(b"value".to_vec(), kv.take_value());
1004    }
1005
1006    #[tokio::test]
1007    async fn test_batch_put() {
1008        let tc = new_client("test_batch_put").await;
1009
1010        let mut req = BatchPutRequest::new();
1011        for i in 0..275 {
1012            req = req.add_kv(
1013                tc.key(&format!("key-{}", i)),
1014                format!("value-{}", i).into_bytes(),
1015            );
1016        }
1017
1018        let res = tc.client.batch_put(req).await;
1019        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1020
1021        let req = RangeRequest::new().with_prefix(tc.key("key-"));
1022        let res = tc.client.range(req).await;
1023        let kvs = res.unwrap().take_kvs();
1024        assert_eq!(275, kvs.len());
1025    }
1026
1027    #[tokio::test]
1028    async fn test_batch_get() {
1029        let tc = new_client("test_batch_get").await;
1030        tc.gen_data().await;
1031
1032        let mut req = BatchGetRequest::default();
1033        for i in 0..256 {
1034            req = req.add_key(tc.key(&format!("key-{}", i)));
1035        }
1036        let res = tc.client.batch_get(req).await.unwrap();
1037        assert_eq!(10, res.kvs.len());
1038
1039        let req = BatchGetRequest::default()
1040            .add_key(tc.key("key-1"))
1041            .add_key(tc.key("key-999"));
1042        let res = tc.client.batch_get(req).await.unwrap();
1043        assert_eq!(1, res.kvs.len());
1044    }
1045
1046    #[tokio::test]
1047    async fn test_batch_put_with_prev_kv() {
1048        let tc = new_client("test_batch_put_with_prev_kv").await;
1049
1050        let key = tc.key("key");
1051        let key2 = tc.key("key2");
1052        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1053        let res = tc.client.batch_put(req).await;
1054        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1055
1056        let req = BatchPutRequest::new()
1057            .add_kv(key.as_slice(), b"value-".to_vec())
1058            .add_kv(key2.as_slice(), b"value2-".to_vec())
1059            .with_prev_kv();
1060        let res = tc.client.batch_put(req).await;
1061        let mut kvs = res.unwrap().take_prev_kvs();
1062        assert_eq!(1, kvs.len());
1063        let mut kv = kvs.pop().unwrap();
1064        assert_eq!(key, kv.take_key());
1065        assert_eq!(b"value".to_vec(), kv.take_value());
1066    }
1067
1068    #[tokio::test]
1069    async fn test_compare_and_put() {
1070        let tc = new_client("test_compare_and_put").await;
1071
1072        let key = tc.key("key");
1073        let req = CompareAndPutRequest::new()
1074            .with_key(key.as_slice())
1075            .with_expect(b"expect".to_vec())
1076            .with_value(b"value".to_vec());
1077        let res = tc.client.compare_and_put(req).await;
1078        assert!(!res.unwrap().is_success());
1079
1080        // create if absent
1081        let req = CompareAndPutRequest::new()
1082            .with_key(key.as_slice())
1083            .with_value(b"value".to_vec());
1084        let res = tc.client.compare_and_put(req).await;
1085        let mut res = res.unwrap();
1086        assert!(res.is_success());
1087        assert!(res.take_prev_kv().is_none());
1088
1089        // compare and put fail
1090        let req = CompareAndPutRequest::new()
1091            .with_key(key.as_slice())
1092            .with_expect(b"not_eq".to_vec())
1093            .with_value(b"value2".to_vec());
1094        let res = tc.client.compare_and_put(req).await;
1095        let mut res = res.unwrap();
1096        assert!(!res.is_success());
1097        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1098
1099        // compare and put success
1100        let req = CompareAndPutRequest::new()
1101            .with_key(key.as_slice())
1102            .with_expect(b"value".to_vec())
1103            .with_value(b"value2".to_vec());
1104        let res = tc.client.compare_and_put(req).await;
1105        let mut res = res.unwrap();
1106        assert!(res.is_success());
1107
1108        // If compare-and-put is success, previous value doesn't need to be returned.
1109        assert!(res.take_prev_kv().is_none());
1110    }
1111
1112    #[tokio::test]
1113    async fn test_delete_with_key() {
1114        let tc = new_client("test_delete_with_key").await;
1115        tc.gen_data().await;
1116
1117        let req = DeleteRangeRequest::new()
1118            .with_key(tc.key("key-0"))
1119            .with_prev_kv();
1120        let res = tc.client.delete_range(req).await;
1121        let mut res = res.unwrap();
1122        assert_eq!(1, res.deleted());
1123        let mut kvs = res.take_prev_kvs();
1124        assert_eq!(1, kvs.len());
1125        let mut kv = kvs.pop().unwrap();
1126        assert_eq!(b"value-0".to_vec(), kv.take_value());
1127    }
1128
1129    #[tokio::test]
1130    async fn test_delete_with_prefix() {
1131        let tc = new_client("test_delete_with_prefix").await;
1132        tc.gen_data().await;
1133
1134        let req = DeleteRangeRequest::new()
1135            .with_prefix(tc.key("key-"))
1136            .with_prev_kv();
1137        let res = tc.client.delete_range(req).await;
1138        let mut res = res.unwrap();
1139        assert_eq!(10, res.deleted());
1140        let kvs = res.take_prev_kvs();
1141        assert_eq!(10, kvs.len());
1142        for (i, mut kv) in kvs.into_iter().enumerate() {
1143            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1144        }
1145    }
1146
1147    #[tokio::test]
1148    async fn test_delete_with_range() {
1149        let tc = new_client("test_delete_with_range").await;
1150        tc.gen_data().await;
1151
1152        let req = DeleteRangeRequest::new()
1153            .with_range(tc.key("key-2"), tc.key("key-7"))
1154            .with_prev_kv();
1155        let res = tc.client.delete_range(req).await;
1156        let mut res = res.unwrap();
1157        assert_eq!(5, res.deleted());
1158        let kvs = res.take_prev_kvs();
1159        assert_eq!(5, kvs.len());
1160        for (i, mut kv) in kvs.into_iter().enumerate() {
1161            assert_eq!(
1162                format!("{}-{}", "value", i + 2).into_bytes(),
1163                kv.take_value()
1164            );
1165        }
1166    }
1167
1168    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1169        Ok(())
1170    }
1171
1172    #[tokio::test]
1173    async fn test_cluster_client_adaptive_range() {
1174        let tx = new_client("test_cluster_client").await;
1175        let in_memory = tx.in_memory().unwrap();
1176        let cluster_client = tx.client.cluster_client().unwrap();
1177        let mut rng = rand::rng();
1178
1179        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1180        for i in 0..10 {
1181            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1182            in_memory
1183                .put(
1184                    PutRequest::new()
1185                        .with_key(format!("__prefix/{i}").as_bytes())
1186                        .with_value(data.clone()),
1187                )
1188                .await
1189                .unwrap();
1190        }
1191
1192        let req = RangeRequest::new().with_prefix(b"__prefix/");
1193        let stream =
1194            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1195
1196        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1197        assert_eq!(10, res.len());
1198    }
1199}