meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod config;
17pub mod heartbeat;
18mod load_balance;
19mod procedure;
20
21mod cluster;
22mod store;
23mod util;
24
25use std::fmt::Debug;
26use std::sync::Arc;
27use std::time::Duration;
28
29use api::v1::meta::{
30    MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
31};
32pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
33use cluster::Client as ClusterClient;
34pub use cluster::ClusterKvBackend;
35use common_error::ext::BoxedError;
36use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
37use common_meta::cluster::{
38    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
39};
40use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
41use common_meta::error::{
42    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
43};
44use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
45use common_meta::kv_backend::KvBackendRef;
46use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
47use common_meta::range_stream::PaginationStream;
48use common_meta::rpc::KeyValue;
49use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
50use common_meta::rpc::procedure::{
51    AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse,
52    GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
53    ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
54};
55use common_meta::rpc::store::{
56    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
57    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
58    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
59};
60use common_options::plugin_options::PluginOptionsDeserializer;
61use common_telemetry::info;
62use config::Client as ConfigClient;
63use futures::TryStreamExt;
64use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
65use procedure::Client as ProcedureClient;
66use serde::de::DeserializeOwned;
67use snafu::{OptionExt, ResultExt};
68use store::Client as StoreClient;
69
70pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
71use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
72use crate::error::{
73    ConvertMetaConfigSnafu, ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error,
74    GetFlowStatSnafu, NotStartedSnafu, Result,
75};
76
77pub type Id = u64;
78
79const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
80const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
81const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
82const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10);
83
84#[derive(Clone, Debug, Default)]
85pub struct MetaClientBuilder {
86    id: Id,
87    role: Role,
88    enable_heartbeat: bool,
89    enable_store: bool,
90    enable_procedure: bool,
91    enable_access_cluster_info: bool,
92    region_follower: Option<RegionFollowerClientRef>,
93    channel_manager: Option<ChannelManager>,
94    ddl_channel_manager: Option<ChannelManager>,
95    /// The default ddl timeout for each request.
96    ddl_timeout: Option<Duration>,
97    heartbeat_channel_manager: Option<ChannelManager>,
98}
99
100impl MetaClientBuilder {
101    pub fn new(member_id: u64, role: Role) -> Self {
102        Self {
103            id: member_id,
104            role,
105            ..Default::default()
106        }
107    }
108
109    /// Returns the role of Frontend's default options.
110    pub fn frontend_default_options() -> Self {
111        // Frontend does not need a member id.
112        Self::new(0, Role::Frontend)
113            .enable_store()
114            .enable_heartbeat()
115            .enable_procedure()
116            .enable_access_cluster_info()
117    }
118
119    /// Returns the role of Datanode's default options.
120    pub fn datanode_default_options(member_id: u64) -> Self {
121        Self::new(member_id, Role::Datanode)
122            .enable_store()
123            .enable_heartbeat()
124    }
125
126    /// Returns the role of Flownode's default options.
127    pub fn flownode_default_options(member_id: u64) -> Self {
128        Self::new(member_id, Role::Flownode)
129            .enable_store()
130            .enable_heartbeat()
131            .enable_procedure()
132            .enable_access_cluster_info()
133    }
134
135    pub fn enable_heartbeat(self) -> Self {
136        Self {
137            enable_heartbeat: true,
138            ..self
139        }
140    }
141
142    pub fn enable_store(self) -> Self {
143        Self {
144            enable_store: true,
145            ..self
146        }
147    }
148
149    pub fn enable_procedure(self) -> Self {
150        Self {
151            enable_procedure: true,
152            ..self
153        }
154    }
155
156    pub fn enable_access_cluster_info(self) -> Self {
157        Self {
158            enable_access_cluster_info: true,
159            ..self
160        }
161    }
162
163    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
164        Self {
165            channel_manager: Some(channel_manager),
166            ..self
167        }
168    }
169
170    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
171        Self {
172            ddl_channel_manager: Some(channel_manager),
173            ..self
174        }
175    }
176
177    pub fn ddl_timeout(self, timeout: Duration) -> Self {
178        Self {
179            ddl_timeout: Some(timeout),
180            ..self
181        }
182    }
183
184    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
185        Self {
186            heartbeat_channel_manager: Some(channel_manager),
187            ..self
188        }
189    }
190
191    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
192        Self {
193            region_follower: Some(region_follower),
194            ..self
195        }
196    }
197
198    pub fn build(self) -> MetaClient {
199        let mgr = self.channel_manager.unwrap_or_default();
200        let heartbeat_channel_manager = self
201            .heartbeat_channel_manager
202            .clone()
203            .unwrap_or_else(|| mgr.clone());
204
205        let heartbeat = self.enable_heartbeat.then(|| {
206            if self.heartbeat_channel_manager.is_some() {
207                info!("Enable heartbeat channel using the heartbeat channel manager.");
208            }
209
210            HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
211        });
212        let config = self
213            .enable_heartbeat
214            .then(|| ConfigClient::new(self.id, self.role, mgr.clone()));
215        let store = self
216            .enable_store
217            .then(|| StoreClient::new(self.id, self.role, mgr.clone()));
218        let procedure = self.enable_procedure.then(|| {
219            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
220            ProcedureClient::new(
221                self.id,
222                self.role,
223                mgr,
224                DEFAULT_SUBMIT_DDL_MAX_RETRY,
225                self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT),
226            )
227        });
228        let cluster = self
229            .enable_access_cluster_info
230            .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
231        let region_follower = self.region_follower.clone();
232
233        MetaClient {
234            id: self.id,
235            channel_manager: mgr.clone(),
236            leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
237                self.id,
238                self.role,
239                DEFAULT_ASK_LEADER_MAX_RETRY,
240                heartbeat_channel_manager,
241            )),
242            heartbeat,
243            config,
244            store,
245            procedure,
246            cluster,
247            region_follower,
248        }
249    }
250}
251
252#[derive(Debug)]
253pub struct MetaClient {
254    id: Id,
255    channel_manager: ChannelManager,
256    leader_provider_factory: LeaderProviderFactoryRef,
257    heartbeat: Option<HeartbeatClient>,
258    config: Option<ConfigClient>,
259    store: Option<StoreClient>,
260    procedure: Option<ProcedureClient>,
261    cluster: Option<ClusterClient>,
262    region_follower: Option<RegionFollowerClientRef>,
263}
264
265impl MetaClient {
266    pub fn new(id: Id, role: Role) -> Self {
267        Self {
268            id,
269            channel_manager: ChannelManager::default(),
270            leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
271                id,
272                role,
273                DEFAULT_ASK_LEADER_MAX_RETRY,
274                ChannelManager::default(),
275            )),
276            heartbeat: None,
277            config: None,
278            store: None,
279            procedure: None,
280            cluster: None,
281            region_follower: None,
282        }
283    }
284}
285
286pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
287
288/// A trait for clients that can manage region followers.
289#[async_trait::async_trait]
290pub trait RegionFollowerClient: Sync + Send + Debug {
291    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
292
293    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
294
295    async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
296
297    async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
298
299    async fn start(&self, urls: &[&str]) -> Result<()>;
300
301    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
302}
303
304#[async_trait::async_trait]
305impl ProcedureExecutor for MetaClient {
306    async fn submit_ddl_task(
307        &self,
308        _ctx: &ExecutorContext,
309        request: SubmitDdlTaskRequest,
310    ) -> MetaResult<SubmitDdlTaskResponse> {
311        self.submit_ddl_task(request)
312            .await
313            .map_err(BoxedError::new)
314            .context(meta_error::ExternalSnafu)
315    }
316
317    async fn migrate_region(
318        &self,
319        _ctx: &ExecutorContext,
320        request: MigrateRegionRequest,
321    ) -> MetaResult<MigrateRegionResponse> {
322        self.migrate_region(request)
323            .await
324            .map_err(BoxedError::new)
325            .context(meta_error::ExternalSnafu)
326    }
327
328    async fn reconcile(
329        &self,
330        _ctx: &ExecutorContext,
331        request: ReconcileRequest,
332    ) -> MetaResult<ReconcileResponse> {
333        self.reconcile(request)
334            .await
335            .map_err(BoxedError::new)
336            .context(meta_error::ExternalSnafu)
337    }
338
339    async fn manage_region_follower(
340        &self,
341        _ctx: &ExecutorContext,
342        request: ManageRegionFollowerRequest,
343    ) -> MetaResult<()> {
344        if let Some(region_follower) = &self.region_follower {
345            match request {
346                ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
347                    region_follower
348                        .add_region_follower(add_region_follower_request)
349                        .await
350                }
351                ManageRegionFollowerRequest::RemoveRegionFollower(
352                    remove_region_follower_request,
353                ) => {
354                    region_follower
355                        .remove_region_follower(remove_region_follower_request)
356                        .await
357                }
358                ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
359                    region_follower
360                        .add_table_follower(add_table_follower_request)
361                        .await
362                }
363                ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
364                    region_follower
365                        .remove_table_follower(remove_table_follower_request)
366                        .await
367                }
368            }
369            .map_err(BoxedError::new)
370            .context(meta_error::ExternalSnafu)
371        } else {
372            UnsupportedSnafu {
373                operation: "manage_region_follower",
374            }
375            .fail()
376        }
377    }
378
379    async fn query_procedure_state(
380        &self,
381        _ctx: &ExecutorContext,
382        pid: &str,
383    ) -> MetaResult<ProcedureStateResponse> {
384        self.query_procedure_state(pid)
385            .await
386            .map_err(BoxedError::new)
387            .context(meta_error::ExternalSnafu)
388    }
389
390    async fn gc_regions(
391        &self,
392        _ctx: &ExecutorContext,
393        request: GcRegionsRequest,
394    ) -> MetaResult<GcResponse> {
395        self.gc_regions(request)
396            .await
397            .map_err(BoxedError::new)
398            .context(meta_error::ExternalSnafu)
399    }
400
401    async fn gc_table(
402        &self,
403        _ctx: &ExecutorContext,
404        request: GcTableRequest,
405    ) -> MetaResult<GcResponse> {
406        self.gc_table(request)
407            .await
408            .map_err(BoxedError::new)
409            .context(meta_error::ExternalSnafu)
410    }
411
412    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
413        self.procedure_client()
414            .map_err(BoxedError::new)
415            .context(meta_error::ExternalSnafu)?
416            .list_procedures()
417            .await
418            .map_err(BoxedError::new)
419            .context(meta_error::ExternalSnafu)
420    }
421}
422
423// TODO(zyy17): Allow deprecated fields for backward compatibility. Remove this when the deprecated fields are removed from the proto.
424#[allow(deprecated)]
425#[async_trait::async_trait]
426impl ClusterInfo for MetaClient {
427    type Error = Error;
428
429    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
430        let cluster_client = self.cluster_client()?;
431
432        let (get_metasrv_nodes, nodes_key_prefix) = match role {
433            None => (true, Some(NodeInfoKey::key_prefix())),
434            Some(ClusterRole::Metasrv) => (true, None),
435            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
436        };
437
438        let mut nodes = if get_metasrv_nodes {
439            let last_activity_ts = -1; // Metasrv does not provide this information.
440
441            let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
442                cluster_client.get_metasrv_peers().await?;
443            followers
444                .into_iter()
445                .map(|node| {
446                    if let Some(node_info) = node.info {
447                        NodeInfo {
448                            peer: node.peer.unwrap_or_default(),
449                            last_activity_ts,
450                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
451                            version: node_info.version,
452                            git_commit: node_info.git_commit,
453                            start_time_ms: node_info.start_time_ms,
454                            total_cpu_millicores: node_info.total_cpu_millicores,
455                            total_memory_bytes: node_info.total_memory_bytes,
456                            cpu_usage_millicores: node_info.cpu_usage_millicores,
457                            memory_usage_bytes: node_info.memory_usage_bytes,
458                            hostname: node_info.hostname,
459                        }
460                    } else {
461                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
462                        NodeInfo {
463                            peer: node.peer.unwrap_or_default(),
464                            last_activity_ts,
465                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
466                            version: node.version,
467                            git_commit: node.git_commit,
468                            start_time_ms: node.start_time_ms,
469                            total_cpu_millicores: node.cpus as i64,
470                            total_memory_bytes: node.memory_bytes as i64,
471                            cpu_usage_millicores: 0,
472                            memory_usage_bytes: 0,
473                            hostname: "".to_string(),
474                        }
475                    }
476                })
477                .chain(leader.into_iter().map(|node| {
478                    if let Some(node_info) = node.info {
479                        NodeInfo {
480                            peer: node.peer.unwrap_or_default(),
481                            last_activity_ts,
482                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
483                            version: node_info.version,
484                            git_commit: node_info.git_commit,
485                            start_time_ms: node_info.start_time_ms,
486                            total_cpu_millicores: node_info.total_cpu_millicores,
487                            total_memory_bytes: node_info.total_memory_bytes,
488                            cpu_usage_millicores: node_info.cpu_usage_millicores,
489                            memory_usage_bytes: node_info.memory_usage_bytes,
490                            hostname: node_info.hostname,
491                        }
492                    } else {
493                        // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
494                        NodeInfo {
495                            peer: node.peer.unwrap_or_default(),
496                            last_activity_ts,
497                            status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
498                            version: node.version,
499                            git_commit: node.git_commit,
500                            start_time_ms: node.start_time_ms,
501                            total_cpu_millicores: node.cpus as i64,
502                            total_memory_bytes: node.memory_bytes as i64,
503                            cpu_usage_millicores: 0,
504                            memory_usage_bytes: 0,
505                            hostname: "".to_string(),
506                        }
507                    }
508                }))
509                .collect::<Vec<_>>()
510        } else {
511            Vec::new()
512        };
513
514        if let Some(prefix) = nodes_key_prefix {
515            let req = RangeRequest::new().with_prefix(prefix);
516            let res = cluster_client.range(req).await?;
517            for kv in res.kvs {
518                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
519            }
520        }
521
522        Ok(nodes)
523    }
524
525    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
526        let cluster_kv_backend = Arc::new(self.cluster_client()?);
527        let range_prefix = DatanodeStatKey::prefix_key();
528        let req = RangeRequest::new().with_prefix(range_prefix);
529        let stream =
530            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
531        let mut datanode_stats = stream
532            .try_collect::<Vec<_>>()
533            .await
534            .context(ConvertMetaResponseSnafu)?;
535        let region_stats = datanode_stats
536            .iter_mut()
537            .flat_map(|datanode_stat| {
538                let last = datanode_stat.stats.pop();
539                last.map(|stat| stat.region_stats).unwrap_or_default()
540            })
541            .collect::<Vec<_>>();
542
543        Ok(region_stats)
544    }
545
546    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
547        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
548        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
549        let flow_state_manager = FlowStateManager::new(cluster_backend);
550        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
551
552        Ok(res.map(|r| r.into()))
553    }
554}
555
556fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
557    DatanodeStatValue::try_from(kv.value)
558        .map_err(BoxedError::new)
559        .context(ExternalSnafu)
560}
561
562impl MetaClient {
563    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
564    where
565        U: AsRef<str>,
566        A: AsRef<[U]> + Clone,
567    {
568        info!("MetaClient channel config: {:?}", self.channel_config());
569
570        let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
571        let leader_provider = self.leader_provider_factory.create(&urls);
572
573        self.start_with(leader_provider, urls).await
574    }
575
576    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
577    pub(crate) async fn start_with<U, A>(
578        &mut self,
579        leader_provider: LeaderProviderRef,
580        peers: A,
581    ) -> Result<()>
582    where
583        U: AsRef<str>,
584        A: AsRef<[U]> + Clone,
585    {
586        if let Some(client) = &self.region_follower {
587            info!("Starting region follower client ...");
588            client.start_with(leader_provider.clone()).await?;
589        }
590
591        if let Some(client) = &self.heartbeat {
592            info!("Starting heartbeat client ...");
593            client.start_with(leader_provider.clone()).await?;
594        }
595
596        if let Some(client) = &self.config {
597            info!("Starting config client ...");
598            client.start_with(leader_provider.clone()).await?;
599        }
600
601        if let Some(client) = &mut self.store {
602            info!("Starting store client ...");
603            client.start(peers.clone()).await?;
604        }
605
606        if let Some(client) = &self.procedure {
607            info!("Starting procedure client ...");
608            client.start_with(leader_provider.clone()).await?;
609        }
610
611        if let Some(client) = &mut self.cluster {
612            info!("Starting cluster client ...");
613            client.start_with(leader_provider).await?;
614        }
615        Ok(())
616    }
617
618    /// Ask the leader address of `metasrv`, and the heartbeat component
619    /// needs to create a bidirectional streaming to the leader.
620    pub async fn ask_leader(&self) -> Result<String> {
621        self.heartbeat_client()?.ask_leader().await
622    }
623
624    pub async fn pull_config<T, U>(&self, deserializer: T) -> Result<U>
625    where
626        T: PluginOptionsDeserializer<U>,
627        U: DeserializeOwned,
628    {
629        let res = self.config_client()?.pull_config().await?;
630        let v = deserializer
631            .deserialize(&res.payload)
632            .context(ConvertMetaConfigSnafu)?;
633        Ok(v)
634    }
635
636    /// Returns a heartbeat bidirectional streaming: (sender, receiver), the
637    /// other end is the leader of `metasrv`.
638    ///
639    /// The `datanode` needs to use the sender to continuously send heartbeat
640    /// packets (some self-state data), and the receiver can receive a response
641    /// from "metasrv" (which may contain some scheduling instructions).
642    ///
643    /// Returns the heartbeat sender, stream, and configuration received from Metasrv.
644    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
645        self.heartbeat_client()?.heartbeat().await
646    }
647
648    /// Range gets the keys in the range from the key-value store.
649    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
650        self.store_client()?
651            .range(req.into())
652            .await?
653            .try_into()
654            .context(ConvertMetaResponseSnafu)
655    }
656
657    /// Put puts the given key into the key-value store.
658    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
659        self.store_client()?
660            .put(req.into())
661            .await?
662            .try_into()
663            .context(ConvertMetaResponseSnafu)
664    }
665
666    /// BatchGet atomically get values by the given keys from the key-value store.
667    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
668        self.store_client()?
669            .batch_get(req.into())
670            .await?
671            .try_into()
672            .context(ConvertMetaResponseSnafu)
673    }
674
675    /// BatchPut atomically puts the given keys into the key-value store.
676    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
677        self.store_client()?
678            .batch_put(req.into())
679            .await?
680            .try_into()
681            .context(ConvertMetaResponseSnafu)
682    }
683
684    /// BatchDelete atomically deletes the given keys from the key-value store.
685    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
686        self.store_client()?
687            .batch_delete(req.into())
688            .await?
689            .try_into()
690            .context(ConvertMetaResponseSnafu)
691    }
692
693    /// CompareAndPut atomically puts the value to the given updated
694    /// value if the current value == the expected value.
695    pub async fn compare_and_put(
696        &self,
697        req: CompareAndPutRequest,
698    ) -> Result<CompareAndPutResponse> {
699        self.store_client()?
700            .compare_and_put(req.into())
701            .await?
702            .try_into()
703            .context(ConvertMetaResponseSnafu)
704    }
705
706    /// DeleteRange deletes the given range from the key-value store.
707    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
708        self.store_client()?
709            .delete_range(req.into())
710            .await?
711            .try_into()
712            .context(ConvertMetaResponseSnafu)
713    }
714
715    /// Query the procedure state by its id.
716    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
717        self.procedure_client()?.query_procedure_state(pid).await
718    }
719
720    /// Submit a region migration task.
721    pub async fn migrate_region(
722        &self,
723        request: MigrateRegionRequest,
724    ) -> Result<MigrateRegionResponse> {
725        self.procedure_client()?
726            .migrate_region(
727                request.region_id,
728                request.from_peer,
729                request.to_peer,
730                request.timeout,
731            )
732            .await
733    }
734
735    /// Reconcile the procedure state.
736    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
737        self.procedure_client()?.reconcile(request).await
738    }
739
740    /// Manually trigger GC for specific regions.
741    pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result<GcResponse> {
742        self.procedure_client()?.gc_regions(request).await
743    }
744
745    /// Manually trigger GC for a table (all its regions).
746    pub async fn gc_table(&self, request: GcTableRequest) -> Result<GcResponse> {
747        self.procedure_client()?.gc_table(request).await
748    }
749
750    /// Submit a DDL task
751    pub async fn submit_ddl_task(
752        &self,
753        req: SubmitDdlTaskRequest,
754    ) -> Result<SubmitDdlTaskResponse> {
755        let res = self
756            .procedure_client()?
757            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
758            .await?
759            .try_into()
760            .context(ConvertMetaResponseSnafu)?;
761
762        Ok(res)
763    }
764
765    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
766        self.heartbeat.clone().context(NotStartedSnafu {
767            name: "heartbeat_client",
768        })
769    }
770
771    pub fn config_client(&self) -> Result<ConfigClient> {
772        self.config.clone().context(NotStartedSnafu {
773            name: "config_client",
774        })
775    }
776
777    pub fn store_client(&self) -> Result<StoreClient> {
778        self.store.clone().context(NotStartedSnafu {
779            name: "store_client",
780        })
781    }
782
783    pub fn procedure_client(&self) -> Result<ProcedureClient> {
784        self.procedure.clone().context(NotStartedSnafu {
785            name: "procedure_client",
786        })
787    }
788
789    pub fn cluster_client(&self) -> Result<ClusterClient> {
790        self.cluster.clone().context(NotStartedSnafu {
791            name: "cluster_client",
792        })
793    }
794
795    pub fn channel_config(&self) -> &ChannelConfig {
796        self.channel_manager.config()
797    }
798
799    pub fn id(&self) -> Id {
800        self.id
801    }
802}
803
804#[cfg(test)]
805mod tests {
806    use std::sync::atomic::{AtomicUsize, Ordering};
807
808    use api::v1::meta::{HeartbeatRequest, Peer};
809    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
810    use rand::Rng;
811
812    use super::*;
813    use crate::error;
814    use crate::mocks::{self, MockMetaContext};
815
816    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
817
818    struct TestClient {
819        ns: String,
820        client: MetaClient,
821        meta_ctx: MockMetaContext,
822    }
823
824    impl TestClient {
825        async fn new(ns: impl Into<String>) -> Self {
826            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
827            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
828            Self {
829                ns: ns.into(),
830                client,
831                meta_ctx,
832            }
833        }
834
835        fn key(&self, name: &str) -> Vec<u8> {
836            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
837        }
838
839        async fn gen_data(&self) {
840            for i in 0..10 {
841                let req = PutRequest::new()
842                    .with_key(self.key(&format!("key-{i}")))
843                    .with_value(format!("{}-{}", "value", i).into_bytes())
844                    .with_prev_kv();
845                let res = self.client.put(req).await;
846                let _ = res.unwrap();
847            }
848        }
849
850        async fn clear_data(&self) {
851            let req =
852                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
853            let res = self.client.delete_range(req).await;
854            let _ = res.unwrap();
855        }
856
857        #[allow(dead_code)]
858        fn kv_backend(&self) -> KvBackendRef {
859            self.meta_ctx.kv_backend.clone()
860        }
861
862        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
863            self.meta_ctx.in_memory.clone()
864        }
865    }
866
867    async fn new_client(ns: impl Into<String>) -> TestClient {
868        let client = TestClient::new(ns).await;
869        client.clear_data().await;
870        client
871    }
872
873    #[tokio::test]
874    async fn test_meta_client_builder() {
875        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
876
877        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
878            .enable_heartbeat()
879            .build();
880        let _ = meta_client.heartbeat_client().unwrap();
881        assert!(meta_client.store_client().is_err());
882        meta_client.start(urls).await.unwrap();
883
884        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
885        assert!(meta_client.heartbeat_client().is_err());
886        assert!(meta_client.store_client().is_err());
887        meta_client.start(urls).await.unwrap();
888
889        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
890            .enable_store()
891            .build();
892        assert!(meta_client.heartbeat_client().is_err());
893        let _ = meta_client.store_client().unwrap();
894        meta_client.start(urls).await.unwrap();
895
896        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
897            .enable_heartbeat()
898            .enable_store()
899            .build();
900        assert_eq!(2, meta_client.id());
901        assert_eq!(2, meta_client.id());
902        let _ = meta_client.heartbeat_client().unwrap();
903        let _ = meta_client.store_client().unwrap();
904        meta_client.start(urls).await.unwrap();
905    }
906
907    #[tokio::test]
908    async fn test_not_start_heartbeat_client() {
909        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
910        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
911            .enable_store()
912            .build();
913        meta_client.start(urls).await.unwrap();
914        let res = meta_client.ask_leader().await;
915        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
916    }
917
918    #[tokio::test]
919    async fn test_not_start_store_client() {
920        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
921        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
922            .enable_heartbeat()
923            .build();
924
925        meta_client.start(urls).await.unwrap();
926        let res = meta_client.put(PutRequest::default()).await;
927        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
928    }
929
930    #[tokio::test]
931    async fn test_ask_leader() {
932        let tc = new_client("test_ask_leader").await;
933        tc.client.ask_leader().await.unwrap();
934    }
935
936    #[tokio::test]
937    async fn test_heartbeat() {
938        let tc = new_client("test_heartbeat").await;
939        let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
940        // send heartbeats
941
942        let request_sent = Arc::new(AtomicUsize::new(0));
943        let request_sent_clone = request_sent.clone();
944        let _handle = tokio::spawn(async move {
945            for _ in 0..5 {
946                let req = HeartbeatRequest {
947                    peer: Some(Peer {
948                        id: 1,
949                        addr: "meta_client_peer".to_string(),
950                    }),
951                    ..Default::default()
952                };
953                sender.send(req).await.unwrap();
954                request_sent_clone.fetch_add(1, Ordering::Relaxed);
955            }
956        });
957
958        let heartbeat_count = Arc::new(AtomicUsize::new(0));
959        let heartbeat_count_clone = heartbeat_count.clone();
960        let handle = tokio::spawn(async move {
961            while let Some(_resp) = receiver.message().await.unwrap() {
962                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
963            }
964        });
965
966        handle.await.unwrap();
967        //+1 for the initial response
968        assert_eq!(
969            request_sent.load(Ordering::Relaxed) + 1,
970            heartbeat_count.load(Ordering::Relaxed)
971        );
972    }
973
974    #[tokio::test]
975    async fn test_range_get() {
976        let tc = new_client("test_range_get").await;
977        tc.gen_data().await;
978
979        let key = tc.key("key-0");
980        let req = RangeRequest::new().with_key(key.as_slice());
981        let res = tc.client.range(req).await;
982        let mut kvs = res.unwrap().take_kvs();
983        assert_eq!(1, kvs.len());
984        let mut kv = kvs.pop().unwrap();
985        assert_eq!(key, kv.take_key());
986        assert_eq!(b"value-0".to_vec(), kv.take_value());
987    }
988
989    #[tokio::test]
990    async fn test_range_get_prefix() {
991        let tc = new_client("test_range_get_prefix").await;
992        tc.gen_data().await;
993
994        let req = RangeRequest::new().with_prefix(tc.key("key-"));
995        let res = tc.client.range(req).await;
996        let kvs = res.unwrap().take_kvs();
997        assert_eq!(10, kvs.len());
998        for (i, mut kv) in kvs.into_iter().enumerate() {
999            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
1000            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1001        }
1002    }
1003
1004    #[tokio::test]
1005    async fn test_range() {
1006        let tc = new_client("test_range").await;
1007        tc.gen_data().await;
1008
1009        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
1010        let res = tc.client.range(req).await;
1011        let kvs = res.unwrap().take_kvs();
1012        assert_eq!(3, kvs.len());
1013        for (i, mut kv) in kvs.into_iter().enumerate() {
1014            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1015            assert_eq!(
1016                format!("{}-{}", "value", i + 5).into_bytes(),
1017                kv.take_value()
1018            );
1019        }
1020    }
1021
1022    #[tokio::test]
1023    async fn test_range_keys_only() {
1024        let tc = new_client("test_range_keys_only").await;
1025        tc.gen_data().await;
1026
1027        let req = RangeRequest::new()
1028            .with_range(tc.key("key-5"), tc.key("key-8"))
1029            .with_keys_only();
1030        let res = tc.client.range(req).await;
1031        let kvs = res.unwrap().take_kvs();
1032        assert_eq!(3, kvs.len());
1033        for (i, mut kv) in kvs.into_iter().enumerate() {
1034            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1035            assert!(kv.take_value().is_empty());
1036        }
1037    }
1038
1039    #[tokio::test]
1040    async fn test_put() {
1041        let tc = new_client("test_put").await;
1042
1043        let req = PutRequest::new()
1044            .with_key(tc.key("key"))
1045            .with_value(b"value".to_vec());
1046        let res = tc.client.put(req).await;
1047        assert!(res.unwrap().prev_kv.is_none());
1048    }
1049
1050    #[tokio::test]
1051    async fn test_put_with_prev_kv() {
1052        let tc = new_client("test_put_with_prev_kv").await;
1053
1054        let key = tc.key("key");
1055        let req = PutRequest::new()
1056            .with_key(key.as_slice())
1057            .with_value(b"value".to_vec())
1058            .with_prev_kv();
1059        let res = tc.client.put(req).await;
1060        assert!(res.unwrap().prev_kv.is_none());
1061
1062        let req = PutRequest::new()
1063            .with_key(key.as_slice())
1064            .with_value(b"value1".to_vec())
1065            .with_prev_kv();
1066        let res = tc.client.put(req).await;
1067        let mut kv = res.unwrap().prev_kv.unwrap();
1068        assert_eq!(key, kv.take_key());
1069        assert_eq!(b"value".to_vec(), kv.take_value());
1070    }
1071
1072    #[tokio::test]
1073    async fn test_batch_put() {
1074        let tc = new_client("test_batch_put").await;
1075
1076        let mut req = BatchPutRequest::new();
1077        for i in 0..275 {
1078            req = req.add_kv(
1079                tc.key(&format!("key-{}", i)),
1080                format!("value-{}", i).into_bytes(),
1081            );
1082        }
1083
1084        let res = tc.client.batch_put(req).await;
1085        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1086
1087        let req = RangeRequest::new().with_prefix(tc.key("key-"));
1088        let res = tc.client.range(req).await;
1089        let kvs = res.unwrap().take_kvs();
1090        assert_eq!(275, kvs.len());
1091    }
1092
1093    #[tokio::test]
1094    async fn test_batch_get() {
1095        let tc = new_client("test_batch_get").await;
1096        tc.gen_data().await;
1097
1098        let mut req = BatchGetRequest::default();
1099        for i in 0..256 {
1100            req = req.add_key(tc.key(&format!("key-{}", i)));
1101        }
1102        let res = tc.client.batch_get(req).await.unwrap();
1103        assert_eq!(10, res.kvs.len());
1104
1105        let req = BatchGetRequest::default()
1106            .add_key(tc.key("key-1"))
1107            .add_key(tc.key("key-999"));
1108        let res = tc.client.batch_get(req).await.unwrap();
1109        assert_eq!(1, res.kvs.len());
1110    }
1111
1112    #[tokio::test]
1113    async fn test_batch_put_with_prev_kv() {
1114        let tc = new_client("test_batch_put_with_prev_kv").await;
1115
1116        let key = tc.key("key");
1117        let key2 = tc.key("key2");
1118        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1119        let res = tc.client.batch_put(req).await;
1120        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1121
1122        let req = BatchPutRequest::new()
1123            .add_kv(key.as_slice(), b"value-".to_vec())
1124            .add_kv(key2.as_slice(), b"value2-".to_vec())
1125            .with_prev_kv();
1126        let res = tc.client.batch_put(req).await;
1127        let mut kvs = res.unwrap().take_prev_kvs();
1128        assert_eq!(1, kvs.len());
1129        let mut kv = kvs.pop().unwrap();
1130        assert_eq!(key, kv.take_key());
1131        assert_eq!(b"value".to_vec(), kv.take_value());
1132    }
1133
1134    #[tokio::test]
1135    async fn test_compare_and_put() {
1136        let tc = new_client("test_compare_and_put").await;
1137
1138        let key = tc.key("key");
1139        let req = CompareAndPutRequest::new()
1140            .with_key(key.as_slice())
1141            .with_expect(b"expect".to_vec())
1142            .with_value(b"value".to_vec());
1143        let res = tc.client.compare_and_put(req).await;
1144        assert!(!res.unwrap().is_success());
1145
1146        // create if absent
1147        let req = CompareAndPutRequest::new()
1148            .with_key(key.as_slice())
1149            .with_value(b"value".to_vec());
1150        let res = tc.client.compare_and_put(req).await;
1151        let mut res = res.unwrap();
1152        assert!(res.is_success());
1153        assert!(res.take_prev_kv().is_none());
1154
1155        // compare and put fail
1156        let req = CompareAndPutRequest::new()
1157            .with_key(key.as_slice())
1158            .with_expect(b"not_eq".to_vec())
1159            .with_value(b"value2".to_vec());
1160        let res = tc.client.compare_and_put(req).await;
1161        let mut res = res.unwrap();
1162        assert!(!res.is_success());
1163        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1164
1165        // compare and put success
1166        let req = CompareAndPutRequest::new()
1167            .with_key(key.as_slice())
1168            .with_expect(b"value".to_vec())
1169            .with_value(b"value2".to_vec());
1170        let res = tc.client.compare_and_put(req).await;
1171        let mut res = res.unwrap();
1172        assert!(res.is_success());
1173
1174        // If compare-and-put is success, previous value doesn't need to be returned.
1175        assert!(res.take_prev_kv().is_none());
1176    }
1177
1178    #[tokio::test]
1179    async fn test_delete_with_key() {
1180        let tc = new_client("test_delete_with_key").await;
1181        tc.gen_data().await;
1182
1183        let req = DeleteRangeRequest::new()
1184            .with_key(tc.key("key-0"))
1185            .with_prev_kv();
1186        let res = tc.client.delete_range(req).await;
1187        let mut res = res.unwrap();
1188        assert_eq!(1, res.deleted());
1189        let mut kvs = res.take_prev_kvs();
1190        assert_eq!(1, kvs.len());
1191        let mut kv = kvs.pop().unwrap();
1192        assert_eq!(b"value-0".to_vec(), kv.take_value());
1193    }
1194
1195    #[tokio::test]
1196    async fn test_delete_with_prefix() {
1197        let tc = new_client("test_delete_with_prefix").await;
1198        tc.gen_data().await;
1199
1200        let req = DeleteRangeRequest::new()
1201            .with_prefix(tc.key("key-"))
1202            .with_prev_kv();
1203        let res = tc.client.delete_range(req).await;
1204        let mut res = res.unwrap();
1205        assert_eq!(10, res.deleted());
1206        let kvs = res.take_prev_kvs();
1207        assert_eq!(10, kvs.len());
1208        for (i, mut kv) in kvs.into_iter().enumerate() {
1209            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1210        }
1211    }
1212
1213    #[tokio::test]
1214    async fn test_delete_with_range() {
1215        let tc = new_client("test_delete_with_range").await;
1216        tc.gen_data().await;
1217
1218        let req = DeleteRangeRequest::new()
1219            .with_range(tc.key("key-2"), tc.key("key-7"))
1220            .with_prev_kv();
1221        let res = tc.client.delete_range(req).await;
1222        let mut res = res.unwrap();
1223        assert_eq!(5, res.deleted());
1224        let kvs = res.take_prev_kvs();
1225        assert_eq!(5, kvs.len());
1226        for (i, mut kv) in kvs.into_iter().enumerate() {
1227            assert_eq!(
1228                format!("{}-{}", "value", i + 2).into_bytes(),
1229                kv.take_value()
1230            );
1231        }
1232    }
1233
1234    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1235        Ok(())
1236    }
1237
1238    #[tokio::test]
1239    async fn test_cluster_client_adaptive_range() {
1240        let tx = new_client("test_cluster_client").await;
1241        let in_memory = tx.in_memory().unwrap();
1242        let cluster_client = tx.client.cluster_client().unwrap();
1243        let mut rng = rand::rng();
1244
1245        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1246        for i in 0..10 {
1247            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1248            in_memory
1249                .put(
1250                    PutRequest::new()
1251                        .with_key(format!("__prefix/{i}").as_bytes())
1252                        .with_value(data.clone()),
1253                )
1254                .await
1255                .unwrap();
1256        }
1257
1258        let req = RangeRequest::new().with_prefix(b"__prefix/");
1259        let stream =
1260            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1261
1262        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1263        assert_eq!(10, res.len());
1264    }
1265}