meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{
28    MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
29};
30pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
31use cluster::Client as ClusterClient;
32pub use cluster::ClusterKvBackend;
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
35use common_meta::cluster::{
36    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
37};
38use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
39use common_meta::error::{
40    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
41};
42use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
43use common_meta::kv_backend::KvBackendRef;
44use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
45use common_meta::range_stream::PaginationStream;
46use common_meta::rpc::KeyValue;
47use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
48use common_meta::rpc::procedure::{
49    AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
50    MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
51    RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
52};
53use common_meta::rpc::store::{
54    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
55    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
56    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
57};
58use common_telemetry::info;
59use futures::TryStreamExt;
60use heartbeat::Client as HeartbeatClient;
61use procedure::Client as ProcedureClient;
62use snafu::{OptionExt, ResultExt};
63use store::Client as StoreClient;
64
65pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
66use crate::error::{
67    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
68    Result,
69};
70
71pub type Id = u64;
72
73const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
74const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
75const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
76
77#[derive(Clone, Debug, Default)]
78pub struct MetaClientBuilder {
79    id: Id,
80    role: Role,
81    enable_heartbeat: bool,
82    enable_store: bool,
83    enable_procedure: bool,
84    enable_access_cluster_info: bool,
85    region_follower: Option<RegionFollowerClientRef>,
86    channel_manager: Option<ChannelManager>,
87    ddl_channel_manager: Option<ChannelManager>,
88    heartbeat_channel_manager: Option<ChannelManager>,
89}
90
91impl MetaClientBuilder {
92    pub fn new(member_id: u64, role: Role) -> Self {
93        Self {
94            id: member_id,
95            role,
96            ..Default::default()
97        }
98    }
99
100    /// Returns the role of Frontend's default options.
101    pub fn frontend_default_options() -> Self {
102        // Frontend does not need a member id.
103        Self::new(0, Role::Frontend)
104            .enable_store()
105            .enable_heartbeat()
106            .enable_procedure()
107            .enable_access_cluster_info()
108    }
109
110    /// Returns the role of Datanode's default options.
111    pub fn datanode_default_options(member_id: u64) -> Self {
112        Self::new(member_id, Role::Datanode)
113            .enable_store()
114            .enable_heartbeat()
115    }
116
117    /// Returns the role of Flownode's default options.
118    pub fn flownode_default_options(member_id: u64) -> Self {
119        Self::new(member_id, Role::Flownode)
120            .enable_store()
121            .enable_heartbeat()
122            .enable_procedure()
123            .enable_access_cluster_info()
124    }
125
126    pub fn enable_heartbeat(self) -> Self {
127        Self {
128            enable_heartbeat: true,
129            ..self
130        }
131    }
132
133    pub fn enable_store(self) -> Self {
134        Self {
135            enable_store: true,
136            ..self
137        }
138    }
139
140    pub fn enable_procedure(self) -> Self {
141        Self {
142            enable_procedure: true,
143            ..self
144        }
145    }
146
147    pub fn enable_access_cluster_info(self) -> Self {
148        Self {
149            enable_access_cluster_info: true,
150            ..self
151        }
152    }
153
154    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
155        Self {
156            channel_manager: Some(channel_manager),
157            ..self
158        }
159    }
160
161    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
162        Self {
163            ddl_channel_manager: Some(channel_manager),
164            ..self
165        }
166    }
167
168    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
169        Self {
170            heartbeat_channel_manager: Some(channel_manager),
171            ..self
172        }
173    }
174
175    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
176        Self {
177            region_follower: Some(region_follower),
178            ..self
179        }
180    }
181
182    pub fn build(self) -> MetaClient {
183        let mut client = if let Some(mgr) = self.channel_manager {
184            MetaClient::with_channel_manager(self.id, mgr)
185        } else {
186            MetaClient::new(self.id)
187        };
188
189        let mgr = client.channel_manager.clone();
190
191        if self.enable_heartbeat {
192            if self.heartbeat_channel_manager.is_some() {
193                info!("Enable heartbeat channel using the heartbeat channel manager.");
194            }
195            let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
196            client.heartbeat = Some(HeartbeatClient::new(
197                self.id,
198                self.role,
199                mgr,
200                DEFAULT_ASK_LEADER_MAX_RETRY,
201            ));
202        }
203
204        if self.enable_store {
205            client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
206        }
207
208        if self.enable_procedure {
209            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
210            client.procedure = Some(ProcedureClient::new(
211                self.id,
212                self.role,
213                mgr,
214                DEFAULT_SUBMIT_DDL_MAX_RETRY,
215            ));
216        }
217
218        if self.enable_access_cluster_info {
219            client.cluster = Some(ClusterClient::new(
220                self.id,
221                self.role,
222                mgr,
223                DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
224            ))
225        }
226
227        if let Some(region_follower) = self.region_follower {
228            client.region_follower = Some(region_follower);
229        }
230
231        client
232    }
233}
234
235#[derive(Debug, Default)]
236pub struct MetaClient {
237    id: Id,
238    channel_manager: ChannelManager,
239    heartbeat: Option<HeartbeatClient>,
240    store: Option<StoreClient>,
241    procedure: Option<ProcedureClient>,
242    cluster: Option<ClusterClient>,
243    region_follower: Option<RegionFollowerClientRef>,
244}
245
246pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
247
248/// A trait for clients that can manage region followers.
249#[async_trait::async_trait]
250pub trait RegionFollowerClient: Sync + Send + Debug {
251    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
252
253    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
254
255    async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
256
257    async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
258
259    async fn start(&self, urls: &[&str]) -> Result<()>;
260
261    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
262}
263
264#[async_trait::async_trait]
265impl ProcedureExecutor for MetaClient {
266    async fn submit_ddl_task(
267        &self,
268        _ctx: &ExecutorContext,
269        request: SubmitDdlTaskRequest,
270    ) -> MetaResult<SubmitDdlTaskResponse> {
271        self.submit_ddl_task(request)
272            .await
273            .map_err(BoxedError::new)
274            .context(meta_error::ExternalSnafu)
275    }
276
277    async fn migrate_region(
278        &self,
279        _ctx: &ExecutorContext,
280        request: MigrateRegionRequest,
281    ) -> MetaResult<MigrateRegionResponse> {
282        self.migrate_region(request)
283            .await
284            .map_err(BoxedError::new)
285            .context(meta_error::ExternalSnafu)
286    }
287
288    async fn reconcile(
289        &self,
290        _ctx: &ExecutorContext,
291        request: ReconcileRequest,
292    ) -> MetaResult<ReconcileResponse> {
293        self.reconcile(request)
294            .await
295            .map_err(BoxedError::new)
296            .context(meta_error::ExternalSnafu)
297    }
298
299    async fn manage_region_follower(
300        &self,
301        _ctx: &ExecutorContext,
302        request: ManageRegionFollowerRequest,
303    ) -> MetaResult<()> {
304        if let Some(region_follower) = &self.region_follower {
305            match request {
306                ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
307                    region_follower
308                        .add_region_follower(add_region_follower_request)
309                        .await
310                }
311                ManageRegionFollowerRequest::RemoveRegionFollower(
312                    remove_region_follower_request,
313                ) => {
314                    region_follower
315                        .remove_region_follower(remove_region_follower_request)
316                        .await
317                }
318                ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
319                    region_follower
320                        .add_table_follower(add_table_follower_request)
321                        .await
322                }
323                ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
324                    region_follower
325                        .remove_table_follower(remove_table_follower_request)
326                        .await
327                }
328            }
329            .map_err(BoxedError::new)
330            .context(meta_error::ExternalSnafu)
331        } else {
332            UnsupportedSnafu {
333                operation: "manage_region_follower",
334            }
335            .fail()
336        }
337    }
338
339    async fn query_procedure_state(
340        &self,
341        _ctx: &ExecutorContext,
342        pid: &str,
343    ) -> MetaResult<ProcedureStateResponse> {
344        self.query_procedure_state(pid)
345            .await
346            .map_err(BoxedError::new)
347            .context(meta_error::ExternalSnafu)
348    }
349
350    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
351        self.procedure_client()
352            .map_err(BoxedError::new)
353            .context(meta_error::ExternalSnafu)?
354            .list_procedures()
355            .await
356            .map_err(BoxedError::new)
357            .context(meta_error::ExternalSnafu)
358    }
359}
360
361// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated fields are removed from the proto.
362#[allow(deprecated)]
363#[async_trait::async_trait]
364impl ClusterInfo for MetaClient {
365    type Error = Error;
366
367    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
368        let cluster_client = self.cluster_client()?;
369
370        let (get_metasrv_nodes, nodes_key_prefix) = match role {
371            None => (true, Some(NodeInfoKey::key_prefix())),
372            Some(ClusterRole::Metasrv) => (true, None),
373            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
374        };
375
376        let mut nodes = if get_metasrv_nodes {
377            let last_activity_ts = -1; // Metasrv does not provide this information.
378
379            let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
380                cluster_client.get_metasrv_peers().await?;
381            followers
382                .into_iter()
383                .map(|node| {
384                    if let Some(node_info) = node.info {
385                        NodeInfo {
386                            peer: node.peer.unwrap_or_default(),
387                            last_activity_ts,
388                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
389                            version: node_info.version,
390                            git_commit: node_info.git_commit,
391                            start_time_ms: node_info.start_time_ms,
392                            total_cpu_millicores: node_info.total_cpu_millicores,
393                            total_memory_bytes: node_info.total_memory_bytes,
394                            cpu_usage_millicores: node_info.cpu_usage_millicores,
395                            memory_usage_bytes: node_info.memory_usage_bytes,
396                            hostname: node_info.hostname,
397                        }
398                    } else {
399                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
400                        NodeInfo {
401                            peer: node.peer.unwrap_or_default(),
402                            last_activity_ts,
403                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
404                            version: node.version,
405                            git_commit: node.git_commit,
406                            start_time_ms: node.start_time_ms,
407                            total_cpu_millicores: node.cpus as i64,
408                            total_memory_bytes: node.memory_bytes as i64,
409                            cpu_usage_millicores: 0,
410                            memory_usage_bytes: 0,
411                            hostname: "".to_string(),
412                        }
413                    }
414                })
415                .chain(leader.into_iter().map(|node| {
416                    if let Some(node_info) = node.info {
417                        NodeInfo {
418                            peer: node.peer.unwrap_or_default(),
419                            last_activity_ts,
420                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
421                            version: node_info.version,
422                            git_commit: node_info.git_commit,
423                            start_time_ms: node_info.start_time_ms,
424                            total_cpu_millicores: node_info.total_cpu_millicores,
425                            total_memory_bytes: node_info.total_memory_bytes,
426                            cpu_usage_millicores: node_info.cpu_usage_millicores,
427                            memory_usage_bytes: node_info.memory_usage_bytes,
428                            hostname: node_info.hostname,
429                        }
430                    } else {
431                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
432                        NodeInfo {
433                            peer: node.peer.unwrap_or_default(),
434                            last_activity_ts,
435                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
436                            version: node.version,
437                            git_commit: node.git_commit,
438                            start_time_ms: node.start_time_ms,
439                            total_cpu_millicores: node.cpus as i64,
440                            total_memory_bytes: node.memory_bytes as i64,
441                            cpu_usage_millicores: 0,
442                            memory_usage_bytes: 0,
443                            hostname: "".to_string(),
444                        }
445                    }
446                }))
447                .collect::<Vec<_>>()
448        } else {
449            Vec::new()
450        };
451
452        if let Some(prefix) = nodes_key_prefix {
453            let req = RangeRequest::new().with_prefix(prefix);
454            let res = cluster_client.range(req).await?;
455            for kv in res.kvs {
456                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
457            }
458        }
459
460        Ok(nodes)
461    }
462
463    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
464        let cluster_kv_backend = Arc::new(self.cluster_client()?);
465        let range_prefix = DatanodeStatKey::prefix_key();
466        let req = RangeRequest::new().with_prefix(range_prefix);
467        let stream =
468            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
469        let mut datanode_stats = stream
470            .try_collect::<Vec<_>>()
471            .await
472            .context(ConvertMetaResponseSnafu)?;
473        let region_stats = datanode_stats
474            .iter_mut()
475            .flat_map(|datanode_stat| {
476                let last = datanode_stat.stats.pop();
477                last.map(|stat| stat.region_stats).unwrap_or_default()
478            })
479            .collect::<Vec<_>>();
480
481        Ok(region_stats)
482    }
483
484    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
485        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
486        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
487        let flow_state_manager = FlowStateManager::new(cluster_backend);
488        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
489
490        Ok(res.map(|r| r.into()))
491    }
492}
493
494fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
495    DatanodeStatValue::try_from(kv.value)
496        .map_err(BoxedError::new)
497        .context(ExternalSnafu)
498}
499
500impl MetaClient {
501    pub fn new(id: Id) -> Self {
502        Self {
503            id,
504            ..Default::default()
505        }
506    }
507
508    pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
509        Self {
510            id,
511            channel_manager,
512            ..Default::default()
513        }
514    }
515
516    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
517    where
518        U: AsRef<str>,
519        A: AsRef<[U]> + Clone,
520    {
521        info!("MetaClient channel config: {:?}", self.channel_config());
522
523        if let Some(client) = &mut self.region_follower {
524            let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
525            client.start(&urls).await?;
526            info!("Region follower client started");
527        }
528        if let Some(client) = &mut self.heartbeat {
529            client.start(urls.clone()).await?;
530            info!("Heartbeat client started");
531        }
532        if let Some(client) = &mut self.store {
533            client.start(urls.clone()).await?;
534            info!("Store client started");
535        }
536        if let Some(client) = &mut self.procedure {
537            client.start(urls.clone()).await?;
538            info!("DDL client started");
539        }
540        if let Some(client) = &mut self.cluster {
541            client.start(urls).await?;
542            info!("Cluster client started");
543        }
544
545        Ok(())
546    }
547
548    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
549    pub(crate) async fn start_with<U, A>(
550        &mut self,
551        leader_provider: LeaderProviderRef,
552        peers: A,
553    ) -> Result<()>
554    where
555        U: AsRef<str>,
556        A: AsRef<[U]> + Clone,
557    {
558        if let Some(client) = &self.region_follower {
559            info!("Starting region follower client ...");
560            client.start_with(leader_provider.clone()).await?;
561        }
562
563        if let Some(client) = &self.heartbeat {
564            info!("Starting heartbeat client ...");
565            client.start_with(leader_provider.clone()).await?;
566        }
567
568        if let Some(client) = &mut self.store {
569            info!("Starting store client ...");
570            client.start(peers.clone()).await?;
571        }
572
573        if let Some(client) = &self.procedure {
574            info!("Starting procedure client ...");
575            client.start_with(leader_provider.clone()).await?;
576        }
577
578        if let Some(client) = &mut self.cluster {
579            info!("Starting cluster client ...");
580            client.start_with(leader_provider).await?;
581        }
582        Ok(())
583    }
584
585    /// Ask the leader address of `metasrv`, and the heartbeat component
586    /// needs to create a bidirectional streaming to the leader.
587    pub async fn ask_leader(&self) -> Result<String> {
588        self.heartbeat_client()?.ask_leader().await
589    }
590
591    /// Returns a heartbeat bidirectional streaming: (sender, receiver), the
592    /// other end is the leader of `metasrv`.
593    ///
594    /// The `datanode` needs to use the sender to continuously send heartbeat
595    /// packets (some self-state data), and the receiver can receive a response
596    /// from "metasrv" (which may contain some scheduling instructions).
597    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
598        self.heartbeat_client()?.heartbeat().await
599    }
600
601    /// Range gets the keys in the range from the key-value store.
602    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
603        self.store_client()?
604            .range(req.into())
605            .await?
606            .try_into()
607            .context(ConvertMetaResponseSnafu)
608    }
609
610    /// Put puts the given key into the key-value store.
611    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
612        self.store_client()?
613            .put(req.into())
614            .await?
615            .try_into()
616            .context(ConvertMetaResponseSnafu)
617    }
618
619    /// BatchGet atomically get values by the given keys from the key-value store.
620    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
621        self.store_client()?
622            .batch_get(req.into())
623            .await?
624            .try_into()
625            .context(ConvertMetaResponseSnafu)
626    }
627
628    /// BatchPut atomically puts the given keys into the key-value store.
629    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
630        self.store_client()?
631            .batch_put(req.into())
632            .await?
633            .try_into()
634            .context(ConvertMetaResponseSnafu)
635    }
636
637    /// BatchDelete atomically deletes the given keys from the key-value store.
638    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
639        self.store_client()?
640            .batch_delete(req.into())
641            .await?
642            .try_into()
643            .context(ConvertMetaResponseSnafu)
644    }
645
646    /// CompareAndPut atomically puts the value to the given updated
647    /// value if the current value == the expected value.
648    pub async fn compare_and_put(
649        &self,
650        req: CompareAndPutRequest,
651    ) -> Result<CompareAndPutResponse> {
652        self.store_client()?
653            .compare_and_put(req.into())
654            .await?
655            .try_into()
656            .context(ConvertMetaResponseSnafu)
657    }
658
659    /// DeleteRange deletes the given range from the key-value store.
660    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
661        self.store_client()?
662            .delete_range(req.into())
663            .await?
664            .try_into()
665            .context(ConvertMetaResponseSnafu)
666    }
667
668    /// Query the procedure state by its id.
669    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
670        self.procedure_client()?.query_procedure_state(pid).await
671    }
672
673    /// Submit a region migration task.
674    pub async fn migrate_region(
675        &self,
676        request: MigrateRegionRequest,
677    ) -> Result<MigrateRegionResponse> {
678        self.procedure_client()?
679            .migrate_region(
680                request.region_id,
681                request.from_peer,
682                request.to_peer,
683                request.timeout,
684            )
685            .await
686    }
687
688    /// Reconcile the procedure state.
689    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
690        self.procedure_client()?.reconcile(request).await
691    }
692
693    /// Submit a DDL task
694    pub async fn submit_ddl_task(
695        &self,
696        req: SubmitDdlTaskRequest,
697    ) -> Result<SubmitDdlTaskResponse> {
698        let res = self
699            .procedure_client()?
700            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
701            .await?
702            .try_into()
703            .context(ConvertMetaResponseSnafu)?;
704
705        Ok(res)
706    }
707
708    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
709        self.heartbeat.clone().context(NotStartedSnafu {
710            name: "heartbeat_client",
711        })
712    }
713
714    pub fn store_client(&self) -> Result<StoreClient> {
715        self.store.clone().context(NotStartedSnafu {
716            name: "store_client",
717        })
718    }
719
720    pub fn procedure_client(&self) -> Result<ProcedureClient> {
721        self.procedure.clone().context(NotStartedSnafu {
722            name: "procedure_client",
723        })
724    }
725
726    pub fn cluster_client(&self) -> Result<ClusterClient> {
727        self.cluster.clone().context(NotStartedSnafu {
728            name: "cluster_client",
729        })
730    }
731
732    pub fn channel_config(&self) -> &ChannelConfig {
733        self.channel_manager.config()
734    }
735
736    pub fn id(&self) -> Id {
737        self.id
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use std::sync::atomic::{AtomicUsize, Ordering};
744
745    use api::v1::meta::{HeartbeatRequest, Peer};
746    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
747    use rand::Rng;
748
749    use super::*;
750    use crate::error;
751    use crate::mocks::{self, MockMetaContext};
752
753    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
754
755    struct TestClient {
756        ns: String,
757        client: MetaClient,
758        meta_ctx: MockMetaContext,
759    }
760
761    impl TestClient {
762        async fn new(ns: impl Into<String>) -> Self {
763            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
764            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
765            Self {
766                ns: ns.into(),
767                client,
768                meta_ctx,
769            }
770        }
771
772        fn key(&self, name: &str) -> Vec<u8> {
773            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
774        }
775
776        async fn gen_data(&self) {
777            for i in 0..10 {
778                let req = PutRequest::new()
779                    .with_key(self.key(&format!("key-{i}")))
780                    .with_value(format!("{}-{}", "value", i).into_bytes())
781                    .with_prev_kv();
782                let res = self.client.put(req).await;
783                let _ = res.unwrap();
784            }
785        }
786
787        async fn clear_data(&self) {
788            let req =
789                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
790            let res = self.client.delete_range(req).await;
791            let _ = res.unwrap();
792        }
793
794        #[allow(dead_code)]
795        fn kv_backend(&self) -> KvBackendRef {
796            self.meta_ctx.kv_backend.clone()
797        }
798
799        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
800            self.meta_ctx.in_memory.clone()
801        }
802    }
803
804    async fn new_client(ns: impl Into<String>) -> TestClient {
805        let client = TestClient::new(ns).await;
806        client.clear_data().await;
807        client
808    }
809
810    #[tokio::test]
811    async fn test_meta_client_builder() {
812        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
813
814        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
815            .enable_heartbeat()
816            .build();
817        let _ = meta_client.heartbeat_client().unwrap();
818        assert!(meta_client.store_client().is_err());
819        meta_client.start(urls).await.unwrap();
820
821        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
822        assert!(meta_client.heartbeat_client().is_err());
823        assert!(meta_client.store_client().is_err());
824        meta_client.start(urls).await.unwrap();
825
826        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
827            .enable_store()
828            .build();
829        assert!(meta_client.heartbeat_client().is_err());
830        let _ = meta_client.store_client().unwrap();
831        meta_client.start(urls).await.unwrap();
832
833        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
834            .enable_heartbeat()
835            .enable_store()
836            .build();
837        assert_eq!(2, meta_client.id());
838        assert_eq!(2, meta_client.id());
839        let _ = meta_client.heartbeat_client().unwrap();
840        let _ = meta_client.store_client().unwrap();
841        meta_client.start(urls).await.unwrap();
842    }
843
844    #[tokio::test]
845    async fn test_not_start_heartbeat_client() {
846        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
847        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
848            .enable_store()
849            .build();
850        meta_client.start(urls).await.unwrap();
851        let res = meta_client.ask_leader().await;
852        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
853    }
854
855    #[tokio::test]
856    async fn test_not_start_store_client() {
857        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
858        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
859            .enable_heartbeat()
860            .build();
861
862        meta_client.start(urls).await.unwrap();
863        let res = meta_client.put(PutRequest::default()).await;
864        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
865    }
866
867    #[tokio::test]
868    async fn test_ask_leader() {
869        let tc = new_client("test_ask_leader").await;
870        tc.client.ask_leader().await.unwrap();
871    }
872
873    #[tokio::test]
874    async fn test_heartbeat() {
875        let tc = new_client("test_heartbeat").await;
876        let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
877        // send heartbeats
878
879        let request_sent = Arc::new(AtomicUsize::new(0));
880        let request_sent_clone = request_sent.clone();
881        let _handle = tokio::spawn(async move {
882            for _ in 0..5 {
883                let req = HeartbeatRequest {
884                    peer: Some(Peer {
885                        id: 1,
886                        addr: "meta_client_peer".to_string(),
887                    }),
888                    ..Default::default()
889                };
890                sender.send(req).await.unwrap();
891                request_sent_clone.fetch_add(1, Ordering::Relaxed);
892            }
893        });
894
895        let heartbeat_count = Arc::new(AtomicUsize::new(0));
896        let heartbeat_count_clone = heartbeat_count.clone();
897        let handle = tokio::spawn(async move {
898            while let Some(_resp) = receiver.message().await.unwrap() {
899                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
900            }
901        });
902
903        handle.await.unwrap();
904        //+1 for the initial response
905        assert_eq!(
906            request_sent.load(Ordering::Relaxed) + 1,
907            heartbeat_count.load(Ordering::Relaxed)
908        );
909    }
910
911    #[tokio::test]
912    async fn test_range_get() {
913        let tc = new_client("test_range_get").await;
914        tc.gen_data().await;
915
916        let key = tc.key("key-0");
917        let req = RangeRequest::new().with_key(key.as_slice());
918        let res = tc.client.range(req).await;
919        let mut kvs = res.unwrap().take_kvs();
920        assert_eq!(1, kvs.len());
921        let mut kv = kvs.pop().unwrap();
922        assert_eq!(key, kv.take_key());
923        assert_eq!(b"value-0".to_vec(), kv.take_value());
924    }
925
926    #[tokio::test]
927    async fn test_range_get_prefix() {
928        let tc = new_client("test_range_get_prefix").await;
929        tc.gen_data().await;
930
931        let req = RangeRequest::new().with_prefix(tc.key("key-"));
932        let res = tc.client.range(req).await;
933        let kvs = res.unwrap().take_kvs();
934        assert_eq!(10, kvs.len());
935        for (i, mut kv) in kvs.into_iter().enumerate() {
936            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
937            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
938        }
939    }
940
941    #[tokio::test]
942    async fn test_range() {
943        let tc = new_client("test_range").await;
944        tc.gen_data().await;
945
946        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
947        let res = tc.client.range(req).await;
948        let kvs = res.unwrap().take_kvs();
949        assert_eq!(3, kvs.len());
950        for (i, mut kv) in kvs.into_iter().enumerate() {
951            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
952            assert_eq!(
953                format!("{}-{}", "value", i + 5).into_bytes(),
954                kv.take_value()
955            );
956        }
957    }
958
959    #[tokio::test]
960    async fn test_range_keys_only() {
961        let tc = new_client("test_range_keys_only").await;
962        tc.gen_data().await;
963
964        let req = RangeRequest::new()
965            .with_range(tc.key("key-5"), tc.key("key-8"))
966            .with_keys_only();
967        let res = tc.client.range(req).await;
968        let kvs = res.unwrap().take_kvs();
969        assert_eq!(3, kvs.len());
970        for (i, mut kv) in kvs.into_iter().enumerate() {
971            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
972            assert!(kv.take_value().is_empty());
973        }
974    }
975
976    #[tokio::test]
977    async fn test_put() {
978        let tc = new_client("test_put").await;
979
980        let req = PutRequest::new()
981            .with_key(tc.key("key"))
982            .with_value(b"value".to_vec());
983        let res = tc.client.put(req).await;
984        assert!(res.unwrap().prev_kv.is_none());
985    }
986
987    #[tokio::test]
988    async fn test_put_with_prev_kv() {
989        let tc = new_client("test_put_with_prev_kv").await;
990
991        let key = tc.key("key");
992        let req = PutRequest::new()
993            .with_key(key.as_slice())
994            .with_value(b"value".to_vec())
995            .with_prev_kv();
996        let res = tc.client.put(req).await;
997        assert!(res.unwrap().prev_kv.is_none());
998
999        let req = PutRequest::new()
1000            .with_key(key.as_slice())
1001            .with_value(b"value1".to_vec())
1002            .with_prev_kv();
1003        let res = tc.client.put(req).await;
1004        let mut kv = res.unwrap().prev_kv.unwrap();
1005        assert_eq!(key, kv.take_key());
1006        assert_eq!(b"value".to_vec(), kv.take_value());
1007    }
1008
1009    #[tokio::test]
1010    async fn test_batch_put() {
1011        let tc = new_client("test_batch_put").await;
1012
1013        let mut req = BatchPutRequest::new();
1014        for i in 0..275 {
1015            req = req.add_kv(
1016                tc.key(&format!("key-{}", i)),
1017                format!("value-{}", i).into_bytes(),
1018            );
1019        }
1020
1021        let res = tc.client.batch_put(req).await;
1022        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1023
1024        let req = RangeRequest::new().with_prefix(tc.key("key-"));
1025        let res = tc.client.range(req).await;
1026        let kvs = res.unwrap().take_kvs();
1027        assert_eq!(275, kvs.len());
1028    }
1029
1030    #[tokio::test]
1031    async fn test_batch_get() {
1032        let tc = new_client("test_batch_get").await;
1033        tc.gen_data().await;
1034
1035        let mut req = BatchGetRequest::default();
1036        for i in 0..256 {
1037            req = req.add_key(tc.key(&format!("key-{}", i)));
1038        }
1039        let res = tc.client.batch_get(req).await.unwrap();
1040        assert_eq!(10, res.kvs.len());
1041
1042        let req = BatchGetRequest::default()
1043            .add_key(tc.key("key-1"))
1044            .add_key(tc.key("key-999"));
1045        let res = tc.client.batch_get(req).await.unwrap();
1046        assert_eq!(1, res.kvs.len());
1047    }
1048
1049    #[tokio::test]
1050    async fn test_batch_put_with_prev_kv() {
1051        let tc = new_client("test_batch_put_with_prev_kv").await;
1052
1053        let key = tc.key("key");
1054        let key2 = tc.key("key2");
1055        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1056        let res = tc.client.batch_put(req).await;
1057        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1058
1059        let req = BatchPutRequest::new()
1060            .add_kv(key.as_slice(), b"value-".to_vec())
1061            .add_kv(key2.as_slice(), b"value2-".to_vec())
1062            .with_prev_kv();
1063        let res = tc.client.batch_put(req).await;
1064        let mut kvs = res.unwrap().take_prev_kvs();
1065        assert_eq!(1, kvs.len());
1066        let mut kv = kvs.pop().unwrap();
1067        assert_eq!(key, kv.take_key());
1068        assert_eq!(b"value".to_vec(), kv.take_value());
1069    }
1070
1071    #[tokio::test]
1072    async fn test_compare_and_put() {
1073        let tc = new_client("test_compare_and_put").await;
1074
1075        let key = tc.key("key");
1076        let req = CompareAndPutRequest::new()
1077            .with_key(key.as_slice())
1078            .with_expect(b"expect".to_vec())
1079            .with_value(b"value".to_vec());
1080        let res = tc.client.compare_and_put(req).await;
1081        assert!(!res.unwrap().is_success());
1082
1083        // create if absent
1084        let req = CompareAndPutRequest::new()
1085            .with_key(key.as_slice())
1086            .with_value(b"value".to_vec());
1087        let res = tc.client.compare_and_put(req).await;
1088        let mut res = res.unwrap();
1089        assert!(res.is_success());
1090        assert!(res.take_prev_kv().is_none());
1091
1092        // compare and put fail
1093        let req = CompareAndPutRequest::new()
1094            .with_key(key.as_slice())
1095            .with_expect(b"not_eq".to_vec())
1096            .with_value(b"value2".to_vec());
1097        let res = tc.client.compare_and_put(req).await;
1098        let mut res = res.unwrap();
1099        assert!(!res.is_success());
1100        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1101
1102        // compare and put success
1103        let req = CompareAndPutRequest::new()
1104            .with_key(key.as_slice())
1105            .with_expect(b"value".to_vec())
1106            .with_value(b"value2".to_vec());
1107        let res = tc.client.compare_and_put(req).await;
1108        let mut res = res.unwrap();
1109        assert!(res.is_success());
1110
1111        // If compare-and-put is success, previous value doesn't need to be returned.
1112        assert!(res.take_prev_kv().is_none());
1113    }
1114
1115    #[tokio::test]
1116    async fn test_delete_with_key() {
1117        let tc = new_client("test_delete_with_key").await;
1118        tc.gen_data().await;
1119
1120        let req = DeleteRangeRequest::new()
1121            .with_key(tc.key("key-0"))
1122            .with_prev_kv();
1123        let res = tc.client.delete_range(req).await;
1124        let mut res = res.unwrap();
1125        assert_eq!(1, res.deleted());
1126        let mut kvs = res.take_prev_kvs();
1127        assert_eq!(1, kvs.len());
1128        let mut kv = kvs.pop().unwrap();
1129        assert_eq!(b"value-0".to_vec(), kv.take_value());
1130    }
1131
1132    #[tokio::test]
1133    async fn test_delete_with_prefix() {
1134        let tc = new_client("test_delete_with_prefix").await;
1135        tc.gen_data().await;
1136
1137        let req = DeleteRangeRequest::new()
1138            .with_prefix(tc.key("key-"))
1139            .with_prev_kv();
1140        let res = tc.client.delete_range(req).await;
1141        let mut res = res.unwrap();
1142        assert_eq!(10, res.deleted());
1143        let kvs = res.take_prev_kvs();
1144        assert_eq!(10, kvs.len());
1145        for (i, mut kv) in kvs.into_iter().enumerate() {
1146            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1147        }
1148    }
1149
1150    #[tokio::test]
1151    async fn test_delete_with_range() {
1152        let tc = new_client("test_delete_with_range").await;
1153        tc.gen_data().await;
1154
1155        let req = DeleteRangeRequest::new()
1156            .with_range(tc.key("key-2"), tc.key("key-7"))
1157            .with_prev_kv();
1158        let res = tc.client.delete_range(req).await;
1159        let mut res = res.unwrap();
1160        assert_eq!(5, res.deleted());
1161        let kvs = res.take_prev_kvs();
1162        assert_eq!(5, kvs.len());
1163        for (i, mut kv) in kvs.into_iter().enumerate() {
1164            assert_eq!(
1165                format!("{}-{}", "value", i + 2).into_bytes(),
1166                kv.take_value()
1167            );
1168        }
1169    }
1170
1171    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1172        Ok(())
1173    }
1174
1175    #[tokio::test]
1176    async fn test_cluster_client_adaptive_range() {
1177        let tx = new_client("test_cluster_client").await;
1178        let in_memory = tx.in_memory().unwrap();
1179        let cluster_client = tx.client.cluster_client().unwrap();
1180        let mut rng = rand::rng();
1181
1182        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1183        for i in 0..10 {
1184            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1185            in_memory
1186                .put(
1187                    PutRequest::new()
1188                        .with_key(format!("__prefix/{i}").as_bytes())
1189                        .with_value(data.clone()),
1190                )
1191                .await
1192                .unwrap();
1193        }
1194
1195        let req = RangeRequest::new().with_prefix(b"__prefix/");
1196        let stream =
1197            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1198
1199        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1200        assert_eq!(10, res.len());
1201    }
1202}