meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16pub mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26use std::time::Duration;
27
28use api::v1::meta::{
29    MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
30};
31pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
32use cluster::Client as ClusterClient;
33pub use cluster::ClusterKvBackend;
34use common_error::ext::BoxedError;
35use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
36use common_meta::cluster::{
37    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
38};
39use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
40use common_meta::error::{
41    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
42};
43use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
44use common_meta::kv_backend::KvBackendRef;
45use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
46use common_meta::range_stream::PaginationStream;
47use common_meta::rpc::KeyValue;
48use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
49use common_meta::rpc::procedure::{
50    AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
51    MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
52    RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
53};
54use common_meta::rpc::store::{
55    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
56    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
57    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
58};
59use common_telemetry::info;
60use futures::TryStreamExt;
61use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
62use procedure::Client as ProcedureClient;
63use snafu::{OptionExt, ResultExt};
64use store::Client as StoreClient;
65
66pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
67use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
68use crate::error::{
69    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
70    Result,
71};
72
73pub type Id = u64;
74
75const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
76const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
77const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
78const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10);
79
80#[derive(Clone, Debug, Default)]
81pub struct MetaClientBuilder {
82    id: Id,
83    role: Role,
84    enable_heartbeat: bool,
85    enable_store: bool,
86    enable_procedure: bool,
87    enable_access_cluster_info: bool,
88    region_follower: Option<RegionFollowerClientRef>,
89    channel_manager: Option<ChannelManager>,
90    ddl_channel_manager: Option<ChannelManager>,
91    /// The default ddl timeout for each request.
92    ddl_timeout: Option<Duration>,
93    heartbeat_channel_manager: Option<ChannelManager>,
94}
95
96impl MetaClientBuilder {
97    pub fn new(member_id: u64, role: Role) -> Self {
98        Self {
99            id: member_id,
100            role,
101            ..Default::default()
102        }
103    }
104
105    /// Returns the role of Frontend's default options.
106    pub fn frontend_default_options() -> Self {
107        // Frontend does not need a member id.
108        Self::new(0, Role::Frontend)
109            .enable_store()
110            .enable_heartbeat()
111            .enable_procedure()
112            .enable_access_cluster_info()
113    }
114
115    /// Returns the role of Datanode's default options.
116    pub fn datanode_default_options(member_id: u64) -> Self {
117        Self::new(member_id, Role::Datanode)
118            .enable_store()
119            .enable_heartbeat()
120    }
121
122    /// Returns the role of Flownode's default options.
123    pub fn flownode_default_options(member_id: u64) -> Self {
124        Self::new(member_id, Role::Flownode)
125            .enable_store()
126            .enable_heartbeat()
127            .enable_procedure()
128            .enable_access_cluster_info()
129    }
130
131    pub fn enable_heartbeat(self) -> Self {
132        Self {
133            enable_heartbeat: true,
134            ..self
135        }
136    }
137
138    pub fn enable_store(self) -> Self {
139        Self {
140            enable_store: true,
141            ..self
142        }
143    }
144
145    pub fn enable_procedure(self) -> Self {
146        Self {
147            enable_procedure: true,
148            ..self
149        }
150    }
151
152    pub fn enable_access_cluster_info(self) -> Self {
153        Self {
154            enable_access_cluster_info: true,
155            ..self
156        }
157    }
158
159    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
160        Self {
161            channel_manager: Some(channel_manager),
162            ..self
163        }
164    }
165
166    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
167        Self {
168            ddl_channel_manager: Some(channel_manager),
169            ..self
170        }
171    }
172
173    pub fn ddl_timeout(self, timeout: Duration) -> Self {
174        Self {
175            ddl_timeout: Some(timeout),
176            ..self
177        }
178    }
179
180    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
181        Self {
182            heartbeat_channel_manager: Some(channel_manager),
183            ..self
184        }
185    }
186
187    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
188        Self {
189            region_follower: Some(region_follower),
190            ..self
191        }
192    }
193
194    pub fn build(self) -> MetaClient {
195        let mgr = self.channel_manager.unwrap_or_default();
196        let heartbeat_channel_manager = self
197            .heartbeat_channel_manager
198            .clone()
199            .unwrap_or_else(|| mgr.clone());
200
201        let heartbeat = self.enable_heartbeat.then(|| {
202            if self.heartbeat_channel_manager.is_some() {
203                info!("Enable heartbeat channel using the heartbeat channel manager.");
204            }
205
206            HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
207        });
208        let store = self
209            .enable_store
210            .then(|| StoreClient::new(self.id, self.role, mgr.clone()));
211        let procedure = self.enable_procedure.then(|| {
212            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
213            ProcedureClient::new(
214                self.id,
215                self.role,
216                mgr,
217                DEFAULT_SUBMIT_DDL_MAX_RETRY,
218                self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT),
219            )
220        });
221        let cluster = self
222            .enable_access_cluster_info
223            .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
224        let region_follower = self.region_follower.clone();
225
226        MetaClient {
227            id: self.id,
228            channel_manager: mgr.clone(),
229            leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
230                self.id,
231                self.role,
232                DEFAULT_ASK_LEADER_MAX_RETRY,
233                heartbeat_channel_manager,
234            )),
235            heartbeat,
236            store,
237            procedure,
238            cluster,
239            region_follower,
240        }
241    }
242}
243
244#[derive(Debug)]
245pub struct MetaClient {
246    id: Id,
247    channel_manager: ChannelManager,
248    leader_provider_factory: LeaderProviderFactoryRef,
249    heartbeat: Option<HeartbeatClient>,
250    store: Option<StoreClient>,
251    procedure: Option<ProcedureClient>,
252    cluster: Option<ClusterClient>,
253    region_follower: Option<RegionFollowerClientRef>,
254}
255
256impl MetaClient {
257    pub fn new(id: Id, role: Role) -> Self {
258        Self {
259            id,
260            channel_manager: ChannelManager::default(),
261            leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
262                id,
263                role,
264                DEFAULT_ASK_LEADER_MAX_RETRY,
265                ChannelManager::default(),
266            )),
267            heartbeat: None,
268            store: None,
269            procedure: None,
270            cluster: None,
271            region_follower: None,
272        }
273    }
274}
275
276pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
277
278/// A trait for clients that can manage region followers.
279#[async_trait::async_trait]
280pub trait RegionFollowerClient: Sync + Send + Debug {
281    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
282
283    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
284
285    async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
286
287    async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
288
289    async fn start(&self, urls: &[&str]) -> Result<()>;
290
291    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
292}
293
294#[async_trait::async_trait]
295impl ProcedureExecutor for MetaClient {
296    async fn submit_ddl_task(
297        &self,
298        _ctx: &ExecutorContext,
299        request: SubmitDdlTaskRequest,
300    ) -> MetaResult<SubmitDdlTaskResponse> {
301        self.submit_ddl_task(request)
302            .await
303            .map_err(BoxedError::new)
304            .context(meta_error::ExternalSnafu)
305    }
306
307    async fn migrate_region(
308        &self,
309        _ctx: &ExecutorContext,
310        request: MigrateRegionRequest,
311    ) -> MetaResult<MigrateRegionResponse> {
312        self.migrate_region(request)
313            .await
314            .map_err(BoxedError::new)
315            .context(meta_error::ExternalSnafu)
316    }
317
318    async fn reconcile(
319        &self,
320        _ctx: &ExecutorContext,
321        request: ReconcileRequest,
322    ) -> MetaResult<ReconcileResponse> {
323        self.reconcile(request)
324            .await
325            .map_err(BoxedError::new)
326            .context(meta_error::ExternalSnafu)
327    }
328
329    async fn manage_region_follower(
330        &self,
331        _ctx: &ExecutorContext,
332        request: ManageRegionFollowerRequest,
333    ) -> MetaResult<()> {
334        if let Some(region_follower) = &self.region_follower {
335            match request {
336                ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
337                    region_follower
338                        .add_region_follower(add_region_follower_request)
339                        .await
340                }
341                ManageRegionFollowerRequest::RemoveRegionFollower(
342                    remove_region_follower_request,
343                ) => {
344                    region_follower
345                        .remove_region_follower(remove_region_follower_request)
346                        .await
347                }
348                ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
349                    region_follower
350                        .add_table_follower(add_table_follower_request)
351                        .await
352                }
353                ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
354                    region_follower
355                        .remove_table_follower(remove_table_follower_request)
356                        .await
357                }
358            }
359            .map_err(BoxedError::new)
360            .context(meta_error::ExternalSnafu)
361        } else {
362            UnsupportedSnafu {
363                operation: "manage_region_follower",
364            }
365            .fail()
366        }
367    }
368
369    async fn query_procedure_state(
370        &self,
371        _ctx: &ExecutorContext,
372        pid: &str,
373    ) -> MetaResult<ProcedureStateResponse> {
374        self.query_procedure_state(pid)
375            .await
376            .map_err(BoxedError::new)
377            .context(meta_error::ExternalSnafu)
378    }
379
380    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
381        self.procedure_client()
382            .map_err(BoxedError::new)
383            .context(meta_error::ExternalSnafu)?
384            .list_procedures()
385            .await
386            .map_err(BoxedError::new)
387            .context(meta_error::ExternalSnafu)
388    }
389}
390
391// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated fields are removed from the proto.
392#[allow(deprecated)]
393#[async_trait::async_trait]
394impl ClusterInfo for MetaClient {
395    type Error = Error;
396
397    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
398        let cluster_client = self.cluster_client()?;
399
400        let (get_metasrv_nodes, nodes_key_prefix) = match role {
401            None => (true, Some(NodeInfoKey::key_prefix())),
402            Some(ClusterRole::Metasrv) => (true, None),
403            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
404        };
405
406        let mut nodes = if get_metasrv_nodes {
407            let last_activity_ts = -1; // Metasrv does not provide this information.
408
409            let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
410                cluster_client.get_metasrv_peers().await?;
411            followers
412                .into_iter()
413                .map(|node| {
414                    if let Some(node_info) = node.info {
415                        NodeInfo {
416                            peer: node.peer.unwrap_or_default(),
417                            last_activity_ts,
418                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
419                            version: node_info.version,
420                            git_commit: node_info.git_commit,
421                            start_time_ms: node_info.start_time_ms,
422                            total_cpu_millicores: node_info.total_cpu_millicores,
423                            total_memory_bytes: node_info.total_memory_bytes,
424                            cpu_usage_millicores: node_info.cpu_usage_millicores,
425                            memory_usage_bytes: node_info.memory_usage_bytes,
426                            hostname: node_info.hostname,
427                        }
428                    } else {
429                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
430                        NodeInfo {
431                            peer: node.peer.unwrap_or_default(),
432                            last_activity_ts,
433                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
434                            version: node.version,
435                            git_commit: node.git_commit,
436                            start_time_ms: node.start_time_ms,
437                            total_cpu_millicores: node.cpus as i64,
438                            total_memory_bytes: node.memory_bytes as i64,
439                            cpu_usage_millicores: 0,
440                            memory_usage_bytes: 0,
441                            hostname: "".to_string(),
442                        }
443                    }
444                })
445                .chain(leader.into_iter().map(|node| {
446                    if let Some(node_info) = node.info {
447                        NodeInfo {
448                            peer: node.peer.unwrap_or_default(),
449                            last_activity_ts,
450                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
451                            version: node_info.version,
452                            git_commit: node_info.git_commit,
453                            start_time_ms: node_info.start_time_ms,
454                            total_cpu_millicores: node_info.total_cpu_millicores,
455                            total_memory_bytes: node_info.total_memory_bytes,
456                            cpu_usage_millicores: node_info.cpu_usage_millicores,
457                            memory_usage_bytes: node_info.memory_usage_bytes,
458                            hostname: node_info.hostname,
459                        }
460                    } else {
461                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
462                        NodeInfo {
463                            peer: node.peer.unwrap_or_default(),
464                            last_activity_ts,
465                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
466                            version: node.version,
467                            git_commit: node.git_commit,
468                            start_time_ms: node.start_time_ms,
469                            total_cpu_millicores: node.cpus as i64,
470                            total_memory_bytes: node.memory_bytes as i64,
471                            cpu_usage_millicores: 0,
472                            memory_usage_bytes: 0,
473                            hostname: "".to_string(),
474                        }
475                    }
476                }))
477                .collect::<Vec<_>>()
478        } else {
479            Vec::new()
480        };
481
482        if let Some(prefix) = nodes_key_prefix {
483            let req = RangeRequest::new().with_prefix(prefix);
484            let res = cluster_client.range(req).await?;
485            for kv in res.kvs {
486                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
487            }
488        }
489
490        Ok(nodes)
491    }
492
493    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
494        let cluster_kv_backend = Arc::new(self.cluster_client()?);
495        let range_prefix = DatanodeStatKey::prefix_key();
496        let req = RangeRequest::new().with_prefix(range_prefix);
497        let stream =
498            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
499        let mut datanode_stats = stream
500            .try_collect::<Vec<_>>()
501            .await
502            .context(ConvertMetaResponseSnafu)?;
503        let region_stats = datanode_stats
504            .iter_mut()
505            .flat_map(|datanode_stat| {
506                let last = datanode_stat.stats.pop();
507                last.map(|stat| stat.region_stats).unwrap_or_default()
508            })
509            .collect::<Vec<_>>();
510
511        Ok(region_stats)
512    }
513
514    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
515        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
516        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
517        let flow_state_manager = FlowStateManager::new(cluster_backend);
518        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
519
520        Ok(res.map(|r| r.into()))
521    }
522}
523
524fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
525    DatanodeStatValue::try_from(kv.value)
526        .map_err(BoxedError::new)
527        .context(ExternalSnafu)
528}
529
530impl MetaClient {
531    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
532    where
533        U: AsRef<str>,
534        A: AsRef<[U]> + Clone,
535    {
536        info!("MetaClient channel config: {:?}", self.channel_config());
537
538        let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
539        let leader_provider = self.leader_provider_factory.create(&urls);
540
541        self.start_with(leader_provider, urls).await
542    }
543
544    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
545    pub(crate) async fn start_with<U, A>(
546        &mut self,
547        leader_provider: LeaderProviderRef,
548        peers: A,
549    ) -> Result<()>
550    where
551        U: AsRef<str>,
552        A: AsRef<[U]> + Clone,
553    {
554        if let Some(client) = &self.region_follower {
555            info!("Starting region follower client ...");
556            client.start_with(leader_provider.clone()).await?;
557        }
558
559        if let Some(client) = &self.heartbeat {
560            info!("Starting heartbeat client ...");
561            client.start_with(leader_provider.clone()).await?;
562        }
563
564        if let Some(client) = &mut self.store {
565            info!("Starting store client ...");
566            client.start(peers.clone()).await?;
567        }
568
569        if let Some(client) = &self.procedure {
570            info!("Starting procedure client ...");
571            client.start_with(leader_provider.clone()).await?;
572        }
573
574        if let Some(client) = &mut self.cluster {
575            info!("Starting cluster client ...");
576            client.start_with(leader_provider).await?;
577        }
578        Ok(())
579    }
580
581    /// Ask the leader address of `metasrv`, and the heartbeat component
582    /// needs to create a bidirectional streaming to the leader.
583    pub async fn ask_leader(&self) -> Result<String> {
584        self.heartbeat_client()?.ask_leader().await
585    }
586
587    /// Returns a heartbeat bidirectional streaming: (sender, receiver), the
588    /// other end is the leader of `metasrv`.
589    ///
590    /// The `datanode` needs to use the sender to continuously send heartbeat
591    /// packets (some self-state data), and the receiver can receive a response
592    /// from "metasrv" (which may contain some scheduling instructions).
593    ///
594    /// Returns the heartbeat sender, stream, and configuration received from Metasrv.
595    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
596        self.heartbeat_client()?.heartbeat().await
597    }
598
599    /// Range gets the keys in the range from the key-value store.
600    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
601        self.store_client()?
602            .range(req.into())
603            .await?
604            .try_into()
605            .context(ConvertMetaResponseSnafu)
606    }
607
608    /// Put puts the given key into the key-value store.
609    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
610        self.store_client()?
611            .put(req.into())
612            .await?
613            .try_into()
614            .context(ConvertMetaResponseSnafu)
615    }
616
617    /// BatchGet atomically get values by the given keys from the key-value store.
618    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
619        self.store_client()?
620            .batch_get(req.into())
621            .await?
622            .try_into()
623            .context(ConvertMetaResponseSnafu)
624    }
625
626    /// BatchPut atomically puts the given keys into the key-value store.
627    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
628        self.store_client()?
629            .batch_put(req.into())
630            .await?
631            .try_into()
632            .context(ConvertMetaResponseSnafu)
633    }
634
635    /// BatchDelete atomically deletes the given keys from the key-value store.
636    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
637        self.store_client()?
638            .batch_delete(req.into())
639            .await?
640            .try_into()
641            .context(ConvertMetaResponseSnafu)
642    }
643
644    /// CompareAndPut atomically puts the value to the given updated
645    /// value if the current value == the expected value.
646    pub async fn compare_and_put(
647        &self,
648        req: CompareAndPutRequest,
649    ) -> Result<CompareAndPutResponse> {
650        self.store_client()?
651            .compare_and_put(req.into())
652            .await?
653            .try_into()
654            .context(ConvertMetaResponseSnafu)
655    }
656
657    /// DeleteRange deletes the given range from the key-value store.
658    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
659        self.store_client()?
660            .delete_range(req.into())
661            .await?
662            .try_into()
663            .context(ConvertMetaResponseSnafu)
664    }
665
666    /// Query the procedure state by its id.
667    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
668        self.procedure_client()?.query_procedure_state(pid).await
669    }
670
671    /// Submit a region migration task.
672    pub async fn migrate_region(
673        &self,
674        request: MigrateRegionRequest,
675    ) -> Result<MigrateRegionResponse> {
676        self.procedure_client()?
677            .migrate_region(
678                request.region_id,
679                request.from_peer,
680                request.to_peer,
681                request.timeout,
682            )
683            .await
684    }
685
686    /// Reconcile the procedure state.
687    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
688        self.procedure_client()?.reconcile(request).await
689    }
690
691    /// Submit a DDL task
692    pub async fn submit_ddl_task(
693        &self,
694        req: SubmitDdlTaskRequest,
695    ) -> Result<SubmitDdlTaskResponse> {
696        let res = self
697            .procedure_client()?
698            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
699            .await?
700            .try_into()
701            .context(ConvertMetaResponseSnafu)?;
702
703        Ok(res)
704    }
705
706    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
707        self.heartbeat.clone().context(NotStartedSnafu {
708            name: "heartbeat_client",
709        })
710    }
711
712    pub fn store_client(&self) -> Result<StoreClient> {
713        self.store.clone().context(NotStartedSnafu {
714            name: "store_client",
715        })
716    }
717
718    pub fn procedure_client(&self) -> Result<ProcedureClient> {
719        self.procedure.clone().context(NotStartedSnafu {
720            name: "procedure_client",
721        })
722    }
723
724    pub fn cluster_client(&self) -> Result<ClusterClient> {
725        self.cluster.clone().context(NotStartedSnafu {
726            name: "cluster_client",
727        })
728    }
729
730    pub fn channel_config(&self) -> &ChannelConfig {
731        self.channel_manager.config()
732    }
733
734    pub fn id(&self) -> Id {
735        self.id
736    }
737}
738
739#[cfg(test)]
740mod tests {
741    use std::sync::atomic::{AtomicUsize, Ordering};
742
743    use api::v1::meta::{HeartbeatRequest, Peer};
744    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
745    use rand::Rng;
746
747    use super::*;
748    use crate::error;
749    use crate::mocks::{self, MockMetaContext};
750
751    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
752
753    struct TestClient {
754        ns: String,
755        client: MetaClient,
756        meta_ctx: MockMetaContext,
757    }
758
759    impl TestClient {
760        async fn new(ns: impl Into<String>) -> Self {
761            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
762            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
763            Self {
764                ns: ns.into(),
765                client,
766                meta_ctx,
767            }
768        }
769
770        fn key(&self, name: &str) -> Vec<u8> {
771            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
772        }
773
774        async fn gen_data(&self) {
775            for i in 0..10 {
776                let req = PutRequest::new()
777                    .with_key(self.key(&format!("key-{i}")))
778                    .with_value(format!("{}-{}", "value", i).into_bytes())
779                    .with_prev_kv();
780                let res = self.client.put(req).await;
781                let _ = res.unwrap();
782            }
783        }
784
785        async fn clear_data(&self) {
786            let req =
787                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
788            let res = self.client.delete_range(req).await;
789            let _ = res.unwrap();
790        }
791
792        #[allow(dead_code)]
793        fn kv_backend(&self) -> KvBackendRef {
794            self.meta_ctx.kv_backend.clone()
795        }
796
797        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
798            self.meta_ctx.in_memory.clone()
799        }
800    }
801
802    async fn new_client(ns: impl Into<String>) -> TestClient {
803        let client = TestClient::new(ns).await;
804        client.clear_data().await;
805        client
806    }
807
808    #[tokio::test]
809    async fn test_meta_client_builder() {
810        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
811
812        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
813            .enable_heartbeat()
814            .build();
815        let _ = meta_client.heartbeat_client().unwrap();
816        assert!(meta_client.store_client().is_err());
817        meta_client.start(urls).await.unwrap();
818
819        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
820        assert!(meta_client.heartbeat_client().is_err());
821        assert!(meta_client.store_client().is_err());
822        meta_client.start(urls).await.unwrap();
823
824        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
825            .enable_store()
826            .build();
827        assert!(meta_client.heartbeat_client().is_err());
828        let _ = meta_client.store_client().unwrap();
829        meta_client.start(urls).await.unwrap();
830
831        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
832            .enable_heartbeat()
833            .enable_store()
834            .build();
835        assert_eq!(2, meta_client.id());
836        assert_eq!(2, meta_client.id());
837        let _ = meta_client.heartbeat_client().unwrap();
838        let _ = meta_client.store_client().unwrap();
839        meta_client.start(urls).await.unwrap();
840    }
841
842    #[tokio::test]
843    async fn test_not_start_heartbeat_client() {
844        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
845        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
846            .enable_store()
847            .build();
848        meta_client.start(urls).await.unwrap();
849        let res = meta_client.ask_leader().await;
850        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
851    }
852
853    #[tokio::test]
854    async fn test_not_start_store_client() {
855        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
856        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
857            .enable_heartbeat()
858            .build();
859
860        meta_client.start(urls).await.unwrap();
861        let res = meta_client.put(PutRequest::default()).await;
862        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
863    }
864
865    #[tokio::test]
866    async fn test_ask_leader() {
867        let tc = new_client("test_ask_leader").await;
868        tc.client.ask_leader().await.unwrap();
869    }
870
871    #[tokio::test]
872    async fn test_heartbeat() {
873        let tc = new_client("test_heartbeat").await;
874        let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
875        // send heartbeats
876
877        let request_sent = Arc::new(AtomicUsize::new(0));
878        let request_sent_clone = request_sent.clone();
879        let _handle = tokio::spawn(async move {
880            for _ in 0..5 {
881                let req = HeartbeatRequest {
882                    peer: Some(Peer {
883                        id: 1,
884                        addr: "meta_client_peer".to_string(),
885                    }),
886                    ..Default::default()
887                };
888                sender.send(req).await.unwrap();
889                request_sent_clone.fetch_add(1, Ordering::Relaxed);
890            }
891        });
892
893        let heartbeat_count = Arc::new(AtomicUsize::new(0));
894        let heartbeat_count_clone = heartbeat_count.clone();
895        let handle = tokio::spawn(async move {
896            while let Some(_resp) = receiver.message().await.unwrap() {
897                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
898            }
899        });
900
901        handle.await.unwrap();
902        //+1 for the initial response
903        assert_eq!(
904            request_sent.load(Ordering::Relaxed) + 1,
905            heartbeat_count.load(Ordering::Relaxed)
906        );
907    }
908
909    #[tokio::test]
910    async fn test_range_get() {
911        let tc = new_client("test_range_get").await;
912        tc.gen_data().await;
913
914        let key = tc.key("key-0");
915        let req = RangeRequest::new().with_key(key.as_slice());
916        let res = tc.client.range(req).await;
917        let mut kvs = res.unwrap().take_kvs();
918        assert_eq!(1, kvs.len());
919        let mut kv = kvs.pop().unwrap();
920        assert_eq!(key, kv.take_key());
921        assert_eq!(b"value-0".to_vec(), kv.take_value());
922    }
923
924    #[tokio::test]
925    async fn test_range_get_prefix() {
926        let tc = new_client("test_range_get_prefix").await;
927        tc.gen_data().await;
928
929        let req = RangeRequest::new().with_prefix(tc.key("key-"));
930        let res = tc.client.range(req).await;
931        let kvs = res.unwrap().take_kvs();
932        assert_eq!(10, kvs.len());
933        for (i, mut kv) in kvs.into_iter().enumerate() {
934            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
935            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
936        }
937    }
938
939    #[tokio::test]
940    async fn test_range() {
941        let tc = new_client("test_range").await;
942        tc.gen_data().await;
943
944        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
945        let res = tc.client.range(req).await;
946        let kvs = res.unwrap().take_kvs();
947        assert_eq!(3, kvs.len());
948        for (i, mut kv) in kvs.into_iter().enumerate() {
949            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
950            assert_eq!(
951                format!("{}-{}", "value", i + 5).into_bytes(),
952                kv.take_value()
953            );
954        }
955    }
956
957    #[tokio::test]
958    async fn test_range_keys_only() {
959        let tc = new_client("test_range_keys_only").await;
960        tc.gen_data().await;
961
962        let req = RangeRequest::new()
963            .with_range(tc.key("key-5"), tc.key("key-8"))
964            .with_keys_only();
965        let res = tc.client.range(req).await;
966        let kvs = res.unwrap().take_kvs();
967        assert_eq!(3, kvs.len());
968        for (i, mut kv) in kvs.into_iter().enumerate() {
969            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
970            assert!(kv.take_value().is_empty());
971        }
972    }
973
974    #[tokio::test]
975    async fn test_put() {
976        let tc = new_client("test_put").await;
977
978        let req = PutRequest::new()
979            .with_key(tc.key("key"))
980            .with_value(b"value".to_vec());
981        let res = tc.client.put(req).await;
982        assert!(res.unwrap().prev_kv.is_none());
983    }
984
985    #[tokio::test]
986    async fn test_put_with_prev_kv() {
987        let tc = new_client("test_put_with_prev_kv").await;
988
989        let key = tc.key("key");
990        let req = PutRequest::new()
991            .with_key(key.as_slice())
992            .with_value(b"value".to_vec())
993            .with_prev_kv();
994        let res = tc.client.put(req).await;
995        assert!(res.unwrap().prev_kv.is_none());
996
997        let req = PutRequest::new()
998            .with_key(key.as_slice())
999            .with_value(b"value1".to_vec())
1000            .with_prev_kv();
1001        let res = tc.client.put(req).await;
1002        let mut kv = res.unwrap().prev_kv.unwrap();
1003        assert_eq!(key, kv.take_key());
1004        assert_eq!(b"value".to_vec(), kv.take_value());
1005    }
1006
1007    #[tokio::test]
1008    async fn test_batch_put() {
1009        let tc = new_client("test_batch_put").await;
1010
1011        let mut req = BatchPutRequest::new();
1012        for i in 0..275 {
1013            req = req.add_kv(
1014                tc.key(&format!("key-{}", i)),
1015                format!("value-{}", i).into_bytes(),
1016            );
1017        }
1018
1019        let res = tc.client.batch_put(req).await;
1020        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1021
1022        let req = RangeRequest::new().with_prefix(tc.key("key-"));
1023        let res = tc.client.range(req).await;
1024        let kvs = res.unwrap().take_kvs();
1025        assert_eq!(275, kvs.len());
1026    }
1027
1028    #[tokio::test]
1029    async fn test_batch_get() {
1030        let tc = new_client("test_batch_get").await;
1031        tc.gen_data().await;
1032
1033        let mut req = BatchGetRequest::default();
1034        for i in 0..256 {
1035            req = req.add_key(tc.key(&format!("key-{}", i)));
1036        }
1037        let res = tc.client.batch_get(req).await.unwrap();
1038        assert_eq!(10, res.kvs.len());
1039
1040        let req = BatchGetRequest::default()
1041            .add_key(tc.key("key-1"))
1042            .add_key(tc.key("key-999"));
1043        let res = tc.client.batch_get(req).await.unwrap();
1044        assert_eq!(1, res.kvs.len());
1045    }
1046
1047    #[tokio::test]
1048    async fn test_batch_put_with_prev_kv() {
1049        let tc = new_client("test_batch_put_with_prev_kv").await;
1050
1051        let key = tc.key("key");
1052        let key2 = tc.key("key2");
1053        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1054        let res = tc.client.batch_put(req).await;
1055        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1056
1057        let req = BatchPutRequest::new()
1058            .add_kv(key.as_slice(), b"value-".to_vec())
1059            .add_kv(key2.as_slice(), b"value2-".to_vec())
1060            .with_prev_kv();
1061        let res = tc.client.batch_put(req).await;
1062        let mut kvs = res.unwrap().take_prev_kvs();
1063        assert_eq!(1, kvs.len());
1064        let mut kv = kvs.pop().unwrap();
1065        assert_eq!(key, kv.take_key());
1066        assert_eq!(b"value".to_vec(), kv.take_value());
1067    }
1068
1069    #[tokio::test]
1070    async fn test_compare_and_put() {
1071        let tc = new_client("test_compare_and_put").await;
1072
1073        let key = tc.key("key");
1074        let req = CompareAndPutRequest::new()
1075            .with_key(key.as_slice())
1076            .with_expect(b"expect".to_vec())
1077            .with_value(b"value".to_vec());
1078        let res = tc.client.compare_and_put(req).await;
1079        assert!(!res.unwrap().is_success());
1080
1081        // create if absent
1082        let req = CompareAndPutRequest::new()
1083            .with_key(key.as_slice())
1084            .with_value(b"value".to_vec());
1085        let res = tc.client.compare_and_put(req).await;
1086        let mut res = res.unwrap();
1087        assert!(res.is_success());
1088        assert!(res.take_prev_kv().is_none());
1089
1090        // compare and put fail
1091        let req = CompareAndPutRequest::new()
1092            .with_key(key.as_slice())
1093            .with_expect(b"not_eq".to_vec())
1094            .with_value(b"value2".to_vec());
1095        let res = tc.client.compare_and_put(req).await;
1096        let mut res = res.unwrap();
1097        assert!(!res.is_success());
1098        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1099
1100        // compare and put success
1101        let req = CompareAndPutRequest::new()
1102            .with_key(key.as_slice())
1103            .with_expect(b"value".to_vec())
1104            .with_value(b"value2".to_vec());
1105        let res = tc.client.compare_and_put(req).await;
1106        let mut res = res.unwrap();
1107        assert!(res.is_success());
1108
1109        // If compare-and-put is success, previous value doesn't need to be returned.
1110        assert!(res.take_prev_kv().is_none());
1111    }
1112
1113    #[tokio::test]
1114    async fn test_delete_with_key() {
1115        let tc = new_client("test_delete_with_key").await;
1116        tc.gen_data().await;
1117
1118        let req = DeleteRangeRequest::new()
1119            .with_key(tc.key("key-0"))
1120            .with_prev_kv();
1121        let res = tc.client.delete_range(req).await;
1122        let mut res = res.unwrap();
1123        assert_eq!(1, res.deleted());
1124        let mut kvs = res.take_prev_kvs();
1125        assert_eq!(1, kvs.len());
1126        let mut kv = kvs.pop().unwrap();
1127        assert_eq!(b"value-0".to_vec(), kv.take_value());
1128    }
1129
1130    #[tokio::test]
1131    async fn test_delete_with_prefix() {
1132        let tc = new_client("test_delete_with_prefix").await;
1133        tc.gen_data().await;
1134
1135        let req = DeleteRangeRequest::new()
1136            .with_prefix(tc.key("key-"))
1137            .with_prev_kv();
1138        let res = tc.client.delete_range(req).await;
1139        let mut res = res.unwrap();
1140        assert_eq!(10, res.deleted());
1141        let kvs = res.take_prev_kvs();
1142        assert_eq!(10, kvs.len());
1143        for (i, mut kv) in kvs.into_iter().enumerate() {
1144            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1145        }
1146    }
1147
1148    #[tokio::test]
1149    async fn test_delete_with_range() {
1150        let tc = new_client("test_delete_with_range").await;
1151        tc.gen_data().await;
1152
1153        let req = DeleteRangeRequest::new()
1154            .with_range(tc.key("key-2"), tc.key("key-7"))
1155            .with_prev_kv();
1156        let res = tc.client.delete_range(req).await;
1157        let mut res = res.unwrap();
1158        assert_eq!(5, res.deleted());
1159        let kvs = res.take_prev_kvs();
1160        assert_eq!(5, kvs.len());
1161        for (i, mut kv) in kvs.into_iter().enumerate() {
1162            assert_eq!(
1163                format!("{}-{}", "value", i + 2).into_bytes(),
1164                kv.take_value()
1165            );
1166        }
1167    }
1168
1169    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1170        Ok(())
1171    }
1172
1173    #[tokio::test]
1174    async fn test_cluster_client_adaptive_range() {
1175        let tx = new_client("test_cluster_client").await;
1176        let in_memory = tx.in_memory().unwrap();
1177        let cluster_client = tx.client.cluster_client().unwrap();
1178        let mut rng = rand::rng();
1179
1180        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1181        for i in 0..10 {
1182            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1183            in_memory
1184                .put(
1185                    PutRequest::new()
1186                        .with_key(format!("__prefix/{i}").as_bytes())
1187                        .with_value(data.clone()),
1188                )
1189                .await
1190                .unwrap();
1191        }
1192
1193        let req = RangeRequest::new().with_prefix(b"__prefix/");
1194        let stream =
1195            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1196
1197        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1198        assert_eq!(10, res.len());
1199    }
1200}