meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role};
28pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::error::{
38    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
39};
40use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
41use common_meta::kv_backend::KvBackendRef;
42use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::KeyValue;
45use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
46use common_meta::rpc::procedure::{
47    AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
48    MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
49    RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
50};
51use common_meta::rpc::store::{
52    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
53    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
54    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
55};
56use common_telemetry::info;
57use futures::TryStreamExt;
58use heartbeat::Client as HeartbeatClient;
59use procedure::Client as ProcedureClient;
60use snafu::{OptionExt, ResultExt};
61use store::Client as StoreClient;
62
63pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
64use crate::error::{
65    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
66    Result,
67};
68
69pub type Id = u64;
70
71const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
72const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
73const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
74
75#[derive(Clone, Debug, Default)]
76pub struct MetaClientBuilder {
77    id: Id,
78    role: Role,
79    enable_heartbeat: bool,
80    enable_store: bool,
81    enable_procedure: bool,
82    enable_access_cluster_info: bool,
83    region_follower: Option<RegionFollowerClientRef>,
84    channel_manager: Option<ChannelManager>,
85    ddl_channel_manager: Option<ChannelManager>,
86    heartbeat_channel_manager: Option<ChannelManager>,
87}
88
89impl MetaClientBuilder {
90    pub fn new(member_id: u64, role: Role) -> Self {
91        Self {
92            id: member_id,
93            role,
94            ..Default::default()
95        }
96    }
97
98    /// Returns the role of Frontend's default options.
99    pub fn frontend_default_options() -> Self {
100        // Frontend does not need a member id.
101        Self::new(0, Role::Frontend)
102            .enable_store()
103            .enable_heartbeat()
104            .enable_procedure()
105            .enable_access_cluster_info()
106    }
107
108    /// Returns the role of Datanode's default options.
109    pub fn datanode_default_options(member_id: u64) -> Self {
110        Self::new(member_id, Role::Datanode)
111            .enable_store()
112            .enable_heartbeat()
113    }
114
115    /// Returns the role of Flownode's default options.
116    pub fn flownode_default_options(member_id: u64) -> Self {
117        Self::new(member_id, Role::Flownode)
118            .enable_store()
119            .enable_heartbeat()
120            .enable_procedure()
121            .enable_access_cluster_info()
122    }
123
124    pub fn enable_heartbeat(self) -> Self {
125        Self {
126            enable_heartbeat: true,
127            ..self
128        }
129    }
130
131    pub fn enable_store(self) -> Self {
132        Self {
133            enable_store: true,
134            ..self
135        }
136    }
137
138    pub fn enable_procedure(self) -> Self {
139        Self {
140            enable_procedure: true,
141            ..self
142        }
143    }
144
145    pub fn enable_access_cluster_info(self) -> Self {
146        Self {
147            enable_access_cluster_info: true,
148            ..self
149        }
150    }
151
152    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
153        Self {
154            channel_manager: Some(channel_manager),
155            ..self
156        }
157    }
158
159    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
160        Self {
161            ddl_channel_manager: Some(channel_manager),
162            ..self
163        }
164    }
165
166    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
167        Self {
168            heartbeat_channel_manager: Some(channel_manager),
169            ..self
170        }
171    }
172
173    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
174        Self {
175            region_follower: Some(region_follower),
176            ..self
177        }
178    }
179
180    pub fn build(self) -> MetaClient {
181        let mut client = if let Some(mgr) = self.channel_manager {
182            MetaClient::with_channel_manager(self.id, mgr)
183        } else {
184            MetaClient::new(self.id)
185        };
186
187        let mgr = client.channel_manager.clone();
188
189        if self.enable_heartbeat {
190            let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
191            client.heartbeat = Some(HeartbeatClient::new(
192                self.id,
193                self.role,
194                mgr,
195                DEFAULT_ASK_LEADER_MAX_RETRY,
196            ));
197        }
198
199        if self.enable_store {
200            client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
201        }
202
203        if self.enable_procedure {
204            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
205            client.procedure = Some(ProcedureClient::new(
206                self.id,
207                self.role,
208                mgr,
209                DEFAULT_SUBMIT_DDL_MAX_RETRY,
210            ));
211        }
212
213        if self.enable_access_cluster_info {
214            client.cluster = Some(ClusterClient::new(
215                self.id,
216                self.role,
217                mgr,
218                DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
219            ))
220        }
221
222        if let Some(region_follower) = self.region_follower {
223            client.region_follower = Some(region_follower);
224        }
225
226        client
227    }
228}
229
230#[derive(Debug, Default)]
231pub struct MetaClient {
232    id: Id,
233    channel_manager: ChannelManager,
234    heartbeat: Option<HeartbeatClient>,
235    store: Option<StoreClient>,
236    procedure: Option<ProcedureClient>,
237    cluster: Option<ClusterClient>,
238    region_follower: Option<RegionFollowerClientRef>,
239}
240
241pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
242
243/// A trait for clients that can manage region followers.
244#[async_trait::async_trait]
245pub trait RegionFollowerClient: Sync + Send + Debug {
246    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
247
248    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
249
250    async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
251
252    async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
253
254    async fn start(&self, urls: &[&str]) -> Result<()>;
255
256    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
257}
258
259#[async_trait::async_trait]
260impl ProcedureExecutor for MetaClient {
261    async fn submit_ddl_task(
262        &self,
263        _ctx: &ExecutorContext,
264        request: SubmitDdlTaskRequest,
265    ) -> MetaResult<SubmitDdlTaskResponse> {
266        self.submit_ddl_task(request)
267            .await
268            .map_err(BoxedError::new)
269            .context(meta_error::ExternalSnafu)
270    }
271
272    async fn migrate_region(
273        &self,
274        _ctx: &ExecutorContext,
275        request: MigrateRegionRequest,
276    ) -> MetaResult<MigrateRegionResponse> {
277        self.migrate_region(request)
278            .await
279            .map_err(BoxedError::new)
280            .context(meta_error::ExternalSnafu)
281    }
282
283    async fn reconcile(
284        &self,
285        _ctx: &ExecutorContext,
286        request: ReconcileRequest,
287    ) -> MetaResult<ReconcileResponse> {
288        self.reconcile(request)
289            .await
290            .map_err(BoxedError::new)
291            .context(meta_error::ExternalSnafu)
292    }
293
294    async fn manage_region_follower(
295        &self,
296        _ctx: &ExecutorContext,
297        request: ManageRegionFollowerRequest,
298    ) -> MetaResult<()> {
299        if let Some(region_follower) = &self.region_follower {
300            match request {
301                ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
302                    region_follower
303                        .add_region_follower(add_region_follower_request)
304                        .await
305                }
306                ManageRegionFollowerRequest::RemoveRegionFollower(
307                    remove_region_follower_request,
308                ) => {
309                    region_follower
310                        .remove_region_follower(remove_region_follower_request)
311                        .await
312                }
313                ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
314                    region_follower
315                        .add_table_follower(add_table_follower_request)
316                        .await
317                }
318                ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
319                    region_follower
320                        .remove_table_follower(remove_table_follower_request)
321                        .await
322                }
323            }
324            .map_err(BoxedError::new)
325            .context(meta_error::ExternalSnafu)
326        } else {
327            UnsupportedSnafu {
328                operation: "manage_region_follower",
329            }
330            .fail()
331        }
332    }
333
334    async fn query_procedure_state(
335        &self,
336        _ctx: &ExecutorContext,
337        pid: &str,
338    ) -> MetaResult<ProcedureStateResponse> {
339        self.query_procedure_state(pid)
340            .await
341            .map_err(BoxedError::new)
342            .context(meta_error::ExternalSnafu)
343    }
344
345    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
346        self.procedure_client()
347            .map_err(BoxedError::new)
348            .context(meta_error::ExternalSnafu)?
349            .list_procedures()
350            .await
351            .map_err(BoxedError::new)
352            .context(meta_error::ExternalSnafu)
353    }
354}
355
356#[async_trait::async_trait]
357impl ClusterInfo for MetaClient {
358    type Error = Error;
359
360    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
361        let cluster_client = self.cluster_client()?;
362
363        let (get_metasrv_nodes, nodes_key_prefix) = match role {
364            None => (true, Some(NodeInfoKey::key_prefix())),
365            Some(ClusterRole::Metasrv) => (true, None),
366            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
367        };
368
369        let mut nodes = if get_metasrv_nodes {
370            let last_activity_ts = -1; // Metasrv does not provide this information.
371
372            let (leader, followers) = cluster_client.get_metasrv_peers().await?;
373            followers
374                .into_iter()
375                .map(|node| NodeInfo {
376                    peer: node.peer.unwrap_or_default(),
377                    last_activity_ts,
378                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
379                    version: node.version,
380                    git_commit: node.git_commit,
381                    start_time_ms: node.start_time_ms,
382                    cpus: node.cpus,
383                    memory_bytes: node.memory_bytes,
384                })
385                .chain(leader.into_iter().map(|node| NodeInfo {
386                    peer: node.peer.unwrap_or_default(),
387                    last_activity_ts,
388                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
389                    version: node.version,
390                    git_commit: node.git_commit,
391                    start_time_ms: node.start_time_ms,
392                    cpus: node.cpus,
393                    memory_bytes: node.memory_bytes,
394                }))
395                .collect::<Vec<_>>()
396        } else {
397            Vec::new()
398        };
399
400        if let Some(prefix) = nodes_key_prefix {
401            let req = RangeRequest::new().with_prefix(prefix);
402            let res = cluster_client.range(req).await?;
403            for kv in res.kvs {
404                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
405            }
406        }
407
408        Ok(nodes)
409    }
410
411    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
412        let cluster_kv_backend = Arc::new(self.cluster_client()?);
413        let range_prefix = DatanodeStatKey::prefix_key();
414        let req = RangeRequest::new().with_prefix(range_prefix);
415        let stream =
416            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
417        let mut datanode_stats = stream
418            .try_collect::<Vec<_>>()
419            .await
420            .context(ConvertMetaResponseSnafu)?;
421        let region_stats = datanode_stats
422            .iter_mut()
423            .flat_map(|datanode_stat| {
424                let last = datanode_stat.stats.pop();
425                last.map(|stat| stat.region_stats).unwrap_or_default()
426            })
427            .collect::<Vec<_>>();
428
429        Ok(region_stats)
430    }
431
432    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
433        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
434        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
435        let flow_state_manager = FlowStateManager::new(cluster_backend);
436        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
437
438        Ok(res.map(|r| r.into()))
439    }
440}
441
442fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
443    DatanodeStatValue::try_from(kv.value)
444        .map_err(BoxedError::new)
445        .context(ExternalSnafu)
446}
447
448impl MetaClient {
449    pub fn new(id: Id) -> Self {
450        Self {
451            id,
452            ..Default::default()
453        }
454    }
455
456    pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
457        Self {
458            id,
459            channel_manager,
460            ..Default::default()
461        }
462    }
463
464    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
465    where
466        U: AsRef<str>,
467        A: AsRef<[U]> + Clone,
468    {
469        info!("MetaClient channel config: {:?}", self.channel_config());
470
471        if let Some(client) = &mut self.region_follower {
472            let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
473            client.start(&urls).await?;
474            info!("Region follower client started");
475        }
476        if let Some(client) = &mut self.heartbeat {
477            client.start(urls.clone()).await?;
478            info!("Heartbeat client started");
479        }
480        if let Some(client) = &mut self.store {
481            client.start(urls.clone()).await?;
482            info!("Store client started");
483        }
484        if let Some(client) = &mut self.procedure {
485            client.start(urls.clone()).await?;
486            info!("DDL client started");
487        }
488        if let Some(client) = &mut self.cluster {
489            client.start(urls).await?;
490            info!("Cluster client started");
491        }
492
493        Ok(())
494    }
495
496    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
497    pub(crate) async fn start_with<U, A>(
498        &mut self,
499        leader_provider: LeaderProviderRef,
500        peers: A,
501    ) -> Result<()>
502    where
503        U: AsRef<str>,
504        A: AsRef<[U]> + Clone,
505    {
506        if let Some(client) = &self.region_follower {
507            info!("Starting region follower client ...");
508            client.start_with(leader_provider.clone()).await?;
509        }
510
511        if let Some(client) = &self.heartbeat {
512            info!("Starting heartbeat client ...");
513            client.start_with(leader_provider.clone()).await?;
514        }
515
516        if let Some(client) = &mut self.store {
517            info!("Starting store client ...");
518            client.start(peers.clone()).await?;
519        }
520
521        if let Some(client) = &self.procedure {
522            info!("Starting procedure client ...");
523            client.start_with(leader_provider.clone()).await?;
524        }
525
526        if let Some(client) = &mut self.cluster {
527            info!("Starting cluster client ...");
528            client.start_with(leader_provider).await?;
529        }
530        Ok(())
531    }
532
533    /// Ask the leader address of `metasrv`, and the heartbeat component
534    /// needs to create a bidirectional streaming to the leader.
535    pub async fn ask_leader(&self) -> Result<String> {
536        self.heartbeat_client()?.ask_leader().await
537    }
538
539    /// Returns a heartbeat bidirectional streaming: (sender, recever), the
540    /// other end is the leader of `metasrv`.
541    ///
542    /// The `datanode` needs to use the sender to continuously send heartbeat
543    /// packets (some self-state data), and the receiver can receive a response
544    /// from "metasrv" (which may contain some scheduling instructions).
545    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
546        self.heartbeat_client()?.heartbeat().await
547    }
548
549    /// Range gets the keys in the range from the key-value store.
550    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
551        self.store_client()?
552            .range(req.into())
553            .await?
554            .try_into()
555            .context(ConvertMetaResponseSnafu)
556    }
557
558    /// Put puts the given key into the key-value store.
559    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
560        self.store_client()?
561            .put(req.into())
562            .await?
563            .try_into()
564            .context(ConvertMetaResponseSnafu)
565    }
566
567    /// BatchGet atomically get values by the given keys from the key-value store.
568    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
569        self.store_client()?
570            .batch_get(req.into())
571            .await?
572            .try_into()
573            .context(ConvertMetaResponseSnafu)
574    }
575
576    /// BatchPut atomically puts the given keys into the key-value store.
577    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
578        self.store_client()?
579            .batch_put(req.into())
580            .await?
581            .try_into()
582            .context(ConvertMetaResponseSnafu)
583    }
584
585    /// BatchDelete atomically deletes the given keys from the key-value store.
586    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
587        self.store_client()?
588            .batch_delete(req.into())
589            .await?
590            .try_into()
591            .context(ConvertMetaResponseSnafu)
592    }
593
594    /// CompareAndPut atomically puts the value to the given updated
595    /// value if the current value == the expected value.
596    pub async fn compare_and_put(
597        &self,
598        req: CompareAndPutRequest,
599    ) -> Result<CompareAndPutResponse> {
600        self.store_client()?
601            .compare_and_put(req.into())
602            .await?
603            .try_into()
604            .context(ConvertMetaResponseSnafu)
605    }
606
607    /// DeleteRange deletes the given range from the key-value store.
608    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
609        self.store_client()?
610            .delete_range(req.into())
611            .await?
612            .try_into()
613            .context(ConvertMetaResponseSnafu)
614    }
615
616    /// Query the procedure state by its id.
617    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
618        self.procedure_client()?.query_procedure_state(pid).await
619    }
620
621    /// Submit a region migration task.
622    pub async fn migrate_region(
623        &self,
624        request: MigrateRegionRequest,
625    ) -> Result<MigrateRegionResponse> {
626        self.procedure_client()?
627            .migrate_region(
628                request.region_id,
629                request.from_peer,
630                request.to_peer,
631                request.timeout,
632            )
633            .await
634    }
635
636    /// Reconcile the procedure state.
637    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
638        self.procedure_client()?.reconcile(request).await
639    }
640
641    /// Submit a DDL task
642    pub async fn submit_ddl_task(
643        &self,
644        req: SubmitDdlTaskRequest,
645    ) -> Result<SubmitDdlTaskResponse> {
646        let res = self
647            .procedure_client()?
648            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
649            .await?
650            .try_into()
651            .context(ConvertMetaResponseSnafu)?;
652
653        Ok(res)
654    }
655
656    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
657        self.heartbeat.clone().context(NotStartedSnafu {
658            name: "heartbeat_client",
659        })
660    }
661
662    pub fn store_client(&self) -> Result<StoreClient> {
663        self.store.clone().context(NotStartedSnafu {
664            name: "store_client",
665        })
666    }
667
668    pub fn procedure_client(&self) -> Result<ProcedureClient> {
669        self.procedure.clone().context(NotStartedSnafu {
670            name: "procedure_client",
671        })
672    }
673
674    pub fn cluster_client(&self) -> Result<ClusterClient> {
675        self.cluster.clone().context(NotStartedSnafu {
676            name: "cluster_client",
677        })
678    }
679
680    pub fn channel_config(&self) -> &ChannelConfig {
681        self.channel_manager.config()
682    }
683
684    pub fn id(&self) -> Id {
685        self.id
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use std::sync::atomic::{AtomicUsize, Ordering};
692
693    use api::v1::meta::{HeartbeatRequest, Peer};
694    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
695    use rand::Rng;
696
697    use super::*;
698    use crate::error;
699    use crate::mocks::{self, MockMetaContext};
700
701    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
702
703    struct TestClient {
704        ns: String,
705        client: MetaClient,
706        meta_ctx: MockMetaContext,
707    }
708
709    impl TestClient {
710        async fn new(ns: impl Into<String>) -> Self {
711            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
712            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
713            Self {
714                ns: ns.into(),
715                client,
716                meta_ctx,
717            }
718        }
719
720        fn key(&self, name: &str) -> Vec<u8> {
721            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
722        }
723
724        async fn gen_data(&self) {
725            for i in 0..10 {
726                let req = PutRequest::new()
727                    .with_key(self.key(&format!("key-{i}")))
728                    .with_value(format!("{}-{}", "value", i).into_bytes())
729                    .with_prev_kv();
730                let res = self.client.put(req).await;
731                let _ = res.unwrap();
732            }
733        }
734
735        async fn clear_data(&self) {
736            let req =
737                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
738            let res = self.client.delete_range(req).await;
739            let _ = res.unwrap();
740        }
741
742        #[allow(dead_code)]
743        fn kv_backend(&self) -> KvBackendRef {
744            self.meta_ctx.kv_backend.clone()
745        }
746
747        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
748            self.meta_ctx.in_memory.clone()
749        }
750    }
751
752    async fn new_client(ns: impl Into<String>) -> TestClient {
753        let client = TestClient::new(ns).await;
754        client.clear_data().await;
755        client
756    }
757
758    #[tokio::test]
759    async fn test_meta_client_builder() {
760        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
761
762        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
763            .enable_heartbeat()
764            .build();
765        let _ = meta_client.heartbeat_client().unwrap();
766        assert!(meta_client.store_client().is_err());
767        meta_client.start(urls).await.unwrap();
768
769        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
770        assert!(meta_client.heartbeat_client().is_err());
771        assert!(meta_client.store_client().is_err());
772        meta_client.start(urls).await.unwrap();
773
774        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
775            .enable_store()
776            .build();
777        assert!(meta_client.heartbeat_client().is_err());
778        let _ = meta_client.store_client().unwrap();
779        meta_client.start(urls).await.unwrap();
780
781        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
782            .enable_heartbeat()
783            .enable_store()
784            .build();
785        assert_eq!(2, meta_client.id());
786        assert_eq!(2, meta_client.id());
787        let _ = meta_client.heartbeat_client().unwrap();
788        let _ = meta_client.store_client().unwrap();
789        meta_client.start(urls).await.unwrap();
790    }
791
792    #[tokio::test]
793    async fn test_not_start_heartbeat_client() {
794        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
795        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
796            .enable_store()
797            .build();
798        meta_client.start(urls).await.unwrap();
799        let res = meta_client.ask_leader().await;
800        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
801    }
802
803    #[tokio::test]
804    async fn test_not_start_store_client() {
805        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
806        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
807            .enable_heartbeat()
808            .build();
809
810        meta_client.start(urls).await.unwrap();
811        let res = meta_client.put(PutRequest::default()).await;
812        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
813    }
814
815    #[tokio::test]
816    async fn test_ask_leader() {
817        let tc = new_client("test_ask_leader").await;
818        tc.client.ask_leader().await.unwrap();
819    }
820
821    #[tokio::test]
822    async fn test_heartbeat() {
823        let tc = new_client("test_heartbeat").await;
824        let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
825        // send heartbeats
826
827        let request_sent = Arc::new(AtomicUsize::new(0));
828        let request_sent_clone = request_sent.clone();
829        let _handle = tokio::spawn(async move {
830            for _ in 0..5 {
831                let req = HeartbeatRequest {
832                    peer: Some(Peer {
833                        id: 1,
834                        addr: "meta_client_peer".to_string(),
835                    }),
836                    ..Default::default()
837                };
838                sender.send(req).await.unwrap();
839                request_sent_clone.fetch_add(1, Ordering::Relaxed);
840            }
841        });
842
843        let heartbeat_count = Arc::new(AtomicUsize::new(0));
844        let heartbeat_count_clone = heartbeat_count.clone();
845        let handle = tokio::spawn(async move {
846            while let Some(_resp) = receiver.message().await.unwrap() {
847                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
848            }
849        });
850
851        handle.await.unwrap();
852        //+1 for the initial response
853        assert_eq!(
854            request_sent.load(Ordering::Relaxed) + 1,
855            heartbeat_count.load(Ordering::Relaxed)
856        );
857    }
858
859    #[tokio::test]
860    async fn test_range_get() {
861        let tc = new_client("test_range_get").await;
862        tc.gen_data().await;
863
864        let key = tc.key("key-0");
865        let req = RangeRequest::new().with_key(key.as_slice());
866        let res = tc.client.range(req).await;
867        let mut kvs = res.unwrap().take_kvs();
868        assert_eq!(1, kvs.len());
869        let mut kv = kvs.pop().unwrap();
870        assert_eq!(key, kv.take_key());
871        assert_eq!(b"value-0".to_vec(), kv.take_value());
872    }
873
874    #[tokio::test]
875    async fn test_range_get_prefix() {
876        let tc = new_client("test_range_get_prefix").await;
877        tc.gen_data().await;
878
879        let req = RangeRequest::new().with_prefix(tc.key("key-"));
880        let res = tc.client.range(req).await;
881        let kvs = res.unwrap().take_kvs();
882        assert_eq!(10, kvs.len());
883        for (i, mut kv) in kvs.into_iter().enumerate() {
884            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
885            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
886        }
887    }
888
889    #[tokio::test]
890    async fn test_range() {
891        let tc = new_client("test_range").await;
892        tc.gen_data().await;
893
894        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
895        let res = tc.client.range(req).await;
896        let kvs = res.unwrap().take_kvs();
897        assert_eq!(3, kvs.len());
898        for (i, mut kv) in kvs.into_iter().enumerate() {
899            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
900            assert_eq!(
901                format!("{}-{}", "value", i + 5).into_bytes(),
902                kv.take_value()
903            );
904        }
905    }
906
907    #[tokio::test]
908    async fn test_range_keys_only() {
909        let tc = new_client("test_range_keys_only").await;
910        tc.gen_data().await;
911
912        let req = RangeRequest::new()
913            .with_range(tc.key("key-5"), tc.key("key-8"))
914            .with_keys_only();
915        let res = tc.client.range(req).await;
916        let kvs = res.unwrap().take_kvs();
917        assert_eq!(3, kvs.len());
918        for (i, mut kv) in kvs.into_iter().enumerate() {
919            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
920            assert!(kv.take_value().is_empty());
921        }
922    }
923
924    #[tokio::test]
925    async fn test_put() {
926        let tc = new_client("test_put").await;
927
928        let req = PutRequest::new()
929            .with_key(tc.key("key"))
930            .with_value(b"value".to_vec());
931        let res = tc.client.put(req).await;
932        assert!(res.unwrap().prev_kv.is_none());
933    }
934
935    #[tokio::test]
936    async fn test_put_with_prev_kv() {
937        let tc = new_client("test_put_with_prev_kv").await;
938
939        let key = tc.key("key");
940        let req = PutRequest::new()
941            .with_key(key.as_slice())
942            .with_value(b"value".to_vec())
943            .with_prev_kv();
944        let res = tc.client.put(req).await;
945        assert!(res.unwrap().prev_kv.is_none());
946
947        let req = PutRequest::new()
948            .with_key(key.as_slice())
949            .with_value(b"value1".to_vec())
950            .with_prev_kv();
951        let res = tc.client.put(req).await;
952        let mut kv = res.unwrap().prev_kv.unwrap();
953        assert_eq!(key, kv.take_key());
954        assert_eq!(b"value".to_vec(), kv.take_value());
955    }
956
957    #[tokio::test]
958    async fn test_batch_put() {
959        let tc = new_client("test_batch_put").await;
960
961        let mut req = BatchPutRequest::new();
962        for i in 0..275 {
963            req = req.add_kv(
964                tc.key(&format!("key-{}", i)),
965                format!("value-{}", i).into_bytes(),
966            );
967        }
968
969        let res = tc.client.batch_put(req).await;
970        assert_eq!(0, res.unwrap().take_prev_kvs().len());
971
972        let req = RangeRequest::new().with_prefix(tc.key("key-"));
973        let res = tc.client.range(req).await;
974        let kvs = res.unwrap().take_kvs();
975        assert_eq!(275, kvs.len());
976    }
977
978    #[tokio::test]
979    async fn test_batch_get() {
980        let tc = new_client("test_batch_get").await;
981        tc.gen_data().await;
982
983        let mut req = BatchGetRequest::default();
984        for i in 0..256 {
985            req = req.add_key(tc.key(&format!("key-{}", i)));
986        }
987        let res = tc.client.batch_get(req).await.unwrap();
988        assert_eq!(10, res.kvs.len());
989
990        let req = BatchGetRequest::default()
991            .add_key(tc.key("key-1"))
992            .add_key(tc.key("key-999"));
993        let res = tc.client.batch_get(req).await.unwrap();
994        assert_eq!(1, res.kvs.len());
995    }
996
997    #[tokio::test]
998    async fn test_batch_put_with_prev_kv() {
999        let tc = new_client("test_batch_put_with_prev_kv").await;
1000
1001        let key = tc.key("key");
1002        let key2 = tc.key("key2");
1003        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1004        let res = tc.client.batch_put(req).await;
1005        assert_eq!(0, res.unwrap().take_prev_kvs().len());
1006
1007        let req = BatchPutRequest::new()
1008            .add_kv(key.as_slice(), b"value-".to_vec())
1009            .add_kv(key2.as_slice(), b"value2-".to_vec())
1010            .with_prev_kv();
1011        let res = tc.client.batch_put(req).await;
1012        let mut kvs = res.unwrap().take_prev_kvs();
1013        assert_eq!(1, kvs.len());
1014        let mut kv = kvs.pop().unwrap();
1015        assert_eq!(key, kv.take_key());
1016        assert_eq!(b"value".to_vec(), kv.take_value());
1017    }
1018
1019    #[tokio::test]
1020    async fn test_compare_and_put() {
1021        let tc = new_client("test_compare_and_put").await;
1022
1023        let key = tc.key("key");
1024        let req = CompareAndPutRequest::new()
1025            .with_key(key.as_slice())
1026            .with_expect(b"expect".to_vec())
1027            .with_value(b"value".to_vec());
1028        let res = tc.client.compare_and_put(req).await;
1029        assert!(!res.unwrap().is_success());
1030
1031        // create if absent
1032        let req = CompareAndPutRequest::new()
1033            .with_key(key.as_slice())
1034            .with_value(b"value".to_vec());
1035        let res = tc.client.compare_and_put(req).await;
1036        let mut res = res.unwrap();
1037        assert!(res.is_success());
1038        assert!(res.take_prev_kv().is_none());
1039
1040        // compare and put fail
1041        let req = CompareAndPutRequest::new()
1042            .with_key(key.as_slice())
1043            .with_expect(b"not_eq".to_vec())
1044            .with_value(b"value2".to_vec());
1045        let res = tc.client.compare_and_put(req).await;
1046        let mut res = res.unwrap();
1047        assert!(!res.is_success());
1048        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1049
1050        // compare and put success
1051        let req = CompareAndPutRequest::new()
1052            .with_key(key.as_slice())
1053            .with_expect(b"value".to_vec())
1054            .with_value(b"value2".to_vec());
1055        let res = tc.client.compare_and_put(req).await;
1056        let mut res = res.unwrap();
1057        assert!(res.is_success());
1058
1059        // If compare-and-put is success, previous value doesn't need to be returned.
1060        assert!(res.take_prev_kv().is_none());
1061    }
1062
1063    #[tokio::test]
1064    async fn test_delete_with_key() {
1065        let tc = new_client("test_delete_with_key").await;
1066        tc.gen_data().await;
1067
1068        let req = DeleteRangeRequest::new()
1069            .with_key(tc.key("key-0"))
1070            .with_prev_kv();
1071        let res = tc.client.delete_range(req).await;
1072        let mut res = res.unwrap();
1073        assert_eq!(1, res.deleted());
1074        let mut kvs = res.take_prev_kvs();
1075        assert_eq!(1, kvs.len());
1076        let mut kv = kvs.pop().unwrap();
1077        assert_eq!(b"value-0".to_vec(), kv.take_value());
1078    }
1079
1080    #[tokio::test]
1081    async fn test_delete_with_prefix() {
1082        let tc = new_client("test_delete_with_prefix").await;
1083        tc.gen_data().await;
1084
1085        let req = DeleteRangeRequest::new()
1086            .with_prefix(tc.key("key-"))
1087            .with_prev_kv();
1088        let res = tc.client.delete_range(req).await;
1089        let mut res = res.unwrap();
1090        assert_eq!(10, res.deleted());
1091        let kvs = res.take_prev_kvs();
1092        assert_eq!(10, kvs.len());
1093        for (i, mut kv) in kvs.into_iter().enumerate() {
1094            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1095        }
1096    }
1097
1098    #[tokio::test]
1099    async fn test_delete_with_range() {
1100        let tc = new_client("test_delete_with_range").await;
1101        tc.gen_data().await;
1102
1103        let req = DeleteRangeRequest::new()
1104            .with_range(tc.key("key-2"), tc.key("key-7"))
1105            .with_prev_kv();
1106        let res = tc.client.delete_range(req).await;
1107        let mut res = res.unwrap();
1108        assert_eq!(5, res.deleted());
1109        let kvs = res.take_prev_kvs();
1110        assert_eq!(5, kvs.len());
1111        for (i, mut kv) in kvs.into_iter().enumerate() {
1112            assert_eq!(
1113                format!("{}-{}", "value", i + 2).into_bytes(),
1114                kv.take_value()
1115            );
1116        }
1117    }
1118
1119    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1120        Ok(())
1121    }
1122
1123    #[tokio::test]
1124    async fn test_cluster_client_adaptive_range() {
1125        let tx = new_client("test_cluster_client").await;
1126        let in_memory = tx.in_memory().unwrap();
1127        let cluster_client = tx.client.cluster_client().unwrap();
1128        let mut rng = rand::rng();
1129
1130        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1131        for i in 0..10 {
1132            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1133            in_memory
1134                .put(
1135                    PutRequest::new()
1136                        .with_key(format!("__prefix/{i}").as_bytes())
1137                        .with_value(data.clone()),
1138                )
1139                .await
1140                .unwrap();
1141        }
1142
1143        let req = RangeRequest::new().with_prefix(b"__prefix/");
1144        let stream =
1145            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1146
1147        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1148        assert_eq!(10, res.len());
1149    }
1150}