meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, Role};
28pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
38use common_meta::error::{
39    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
40};
41use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
45use common_meta::rpc::procedure::{
46    AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
47    RemoveRegionFollowerRequest,
48};
49use common_meta::rpc::store::{
50    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
51    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
52    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
53};
54use common_meta::rpc::KeyValue;
55use common_telemetry::info;
56use futures::TryStreamExt;
57use heartbeat::Client as HeartbeatClient;
58use procedure::Client as ProcedureClient;
59use snafu::{OptionExt, ResultExt};
60use store::Client as StoreClient;
61
62pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
63use crate::error::{
64    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
65    Result,
66};
67
68pub type Id = u64;
69
70const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
71const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
72const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
73
74#[derive(Clone, Debug, Default)]
75pub struct MetaClientBuilder {
76    id: Id,
77    role: Role,
78    enable_heartbeat: bool,
79    enable_store: bool,
80    enable_procedure: bool,
81    enable_access_cluster_info: bool,
82    region_follower: Option<RegionFollowerClientRef>,
83    channel_manager: Option<ChannelManager>,
84    ddl_channel_manager: Option<ChannelManager>,
85    heartbeat_channel_manager: Option<ChannelManager>,
86}
87
88impl MetaClientBuilder {
89    pub fn new(member_id: u64, role: Role) -> Self {
90        Self {
91            id: member_id,
92            role,
93            ..Default::default()
94        }
95    }
96
97    /// Returns the role of Frontend's default options.
98    pub fn frontend_default_options() -> Self {
99        // Frontend does not need a member id.
100        Self::new(0, Role::Frontend)
101            .enable_store()
102            .enable_heartbeat()
103            .enable_procedure()
104            .enable_access_cluster_info()
105    }
106
107    /// Returns the role of Datanode's default options.
108    pub fn datanode_default_options(member_id: u64) -> Self {
109        Self::new(member_id, Role::Datanode)
110            .enable_store()
111            .enable_heartbeat()
112    }
113
114    /// Returns the role of Flownode's default options.
115    pub fn flownode_default_options(member_id: u64) -> Self {
116        Self::new(member_id, Role::Flownode)
117            .enable_store()
118            .enable_heartbeat()
119            .enable_procedure()
120            .enable_access_cluster_info()
121    }
122
123    pub fn enable_heartbeat(self) -> Self {
124        Self {
125            enable_heartbeat: true,
126            ..self
127        }
128    }
129
130    pub fn enable_store(self) -> Self {
131        Self {
132            enable_store: true,
133            ..self
134        }
135    }
136
137    pub fn enable_procedure(self) -> Self {
138        Self {
139            enable_procedure: true,
140            ..self
141        }
142    }
143
144    pub fn enable_access_cluster_info(self) -> Self {
145        Self {
146            enable_access_cluster_info: true,
147            ..self
148        }
149    }
150
151    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
152        Self {
153            channel_manager: Some(channel_manager),
154            ..self
155        }
156    }
157
158    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
159        Self {
160            ddl_channel_manager: Some(channel_manager),
161            ..self
162        }
163    }
164
165    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
166        Self {
167            heartbeat_channel_manager: Some(channel_manager),
168            ..self
169        }
170    }
171
172    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
173        Self {
174            region_follower: Some(region_follower),
175            ..self
176        }
177    }
178
179    pub fn build(self) -> MetaClient {
180        let mut client = if let Some(mgr) = self.channel_manager {
181            MetaClient::with_channel_manager(self.id, mgr)
182        } else {
183            MetaClient::new(self.id)
184        };
185
186        let mgr = client.channel_manager.clone();
187
188        if self.enable_heartbeat {
189            let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
190            client.heartbeat = Some(HeartbeatClient::new(
191                self.id,
192                self.role,
193                mgr,
194                DEFAULT_ASK_LEADER_MAX_RETRY,
195            ));
196        }
197
198        if self.enable_store {
199            client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
200        }
201
202        if self.enable_procedure {
203            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
204            client.procedure = Some(ProcedureClient::new(
205                self.id,
206                self.role,
207                mgr,
208                DEFAULT_SUBMIT_DDL_MAX_RETRY,
209            ));
210        }
211
212        if self.enable_access_cluster_info {
213            client.cluster = Some(ClusterClient::new(
214                self.id,
215                self.role,
216                mgr,
217                DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
218            ))
219        }
220
221        if let Some(region_follower) = self.region_follower {
222            client.region_follower = Some(region_follower);
223        }
224
225        client
226    }
227}
228
229#[derive(Debug, Default)]
230pub struct MetaClient {
231    id: Id,
232    channel_manager: ChannelManager,
233    heartbeat: Option<HeartbeatClient>,
234    store: Option<StoreClient>,
235    procedure: Option<ProcedureClient>,
236    cluster: Option<ClusterClient>,
237    region_follower: Option<RegionFollowerClientRef>,
238}
239
240pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
241
242/// A trait for clients that can manage region followers.
243#[async_trait::async_trait]
244pub trait RegionFollowerClient: Sync + Send + Debug {
245    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
246
247    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
248
249    async fn start(&self, urls: &[&str]) -> Result<()>;
250
251    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
252}
253
254#[async_trait::async_trait]
255impl ProcedureExecutor for MetaClient {
256    async fn submit_ddl_task(
257        &self,
258        _ctx: &ExecutorContext,
259        request: SubmitDdlTaskRequest,
260    ) -> MetaResult<SubmitDdlTaskResponse> {
261        self.submit_ddl_task(request)
262            .await
263            .map_err(BoxedError::new)
264            .context(meta_error::ExternalSnafu)
265    }
266
267    async fn migrate_region(
268        &self,
269        _ctx: &ExecutorContext,
270        request: MigrateRegionRequest,
271    ) -> MetaResult<MigrateRegionResponse> {
272        self.migrate_region(request)
273            .await
274            .map_err(BoxedError::new)
275            .context(meta_error::ExternalSnafu)
276    }
277
278    async fn add_region_follower(
279        &self,
280        _ctx: &ExecutorContext,
281        request: AddRegionFollowerRequest,
282    ) -> MetaResult<()> {
283        if let Some(region_follower) = &self.region_follower {
284            region_follower
285                .add_region_follower(request)
286                .await
287                .map_err(BoxedError::new)
288                .context(meta_error::ExternalSnafu)
289        } else {
290            UnsupportedSnafu {
291                operation: "add_region_follower",
292            }
293            .fail()
294        }
295    }
296
297    async fn remove_region_follower(
298        &self,
299        _ctx: &ExecutorContext,
300        request: RemoveRegionFollowerRequest,
301    ) -> MetaResult<()> {
302        if let Some(region_follower) = &self.region_follower {
303            region_follower
304                .remove_region_follower(request)
305                .await
306                .map_err(BoxedError::new)
307                .context(meta_error::ExternalSnafu)
308        } else {
309            UnsupportedSnafu {
310                operation: "remove_region_follower",
311            }
312            .fail()
313        }
314    }
315
316    async fn query_procedure_state(
317        &self,
318        _ctx: &ExecutorContext,
319        pid: &str,
320    ) -> MetaResult<ProcedureStateResponse> {
321        self.query_procedure_state(pid)
322            .await
323            .map_err(BoxedError::new)
324            .context(meta_error::ExternalSnafu)
325    }
326
327    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
328        self.procedure_client()
329            .map_err(BoxedError::new)
330            .context(meta_error::ExternalSnafu)?
331            .list_procedures()
332            .await
333            .map_err(BoxedError::new)
334            .context(meta_error::ExternalSnafu)
335    }
336}
337
338#[async_trait::async_trait]
339impl ClusterInfo for MetaClient {
340    type Error = Error;
341
342    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
343        let cluster_client = self.cluster_client()?;
344
345        let (get_metasrv_nodes, nodes_key_prefix) = match role {
346            None => (true, Some(NodeInfoKey::key_prefix())),
347            Some(ClusterRole::Metasrv) => (true, None),
348            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
349        };
350
351        let mut nodes = if get_metasrv_nodes {
352            let last_activity_ts = -1; // Metasrv does not provide this information.
353
354            let (leader, followers) = cluster_client.get_metasrv_peers().await?;
355            followers
356                .into_iter()
357                .map(|node| NodeInfo {
358                    peer: node.peer.unwrap_or_default(),
359                    last_activity_ts,
360                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
361                    version: node.version,
362                    git_commit: node.git_commit,
363                    start_time_ms: node.start_time_ms,
364                })
365                .chain(leader.into_iter().map(|node| NodeInfo {
366                    peer: node.peer.unwrap_or_default(),
367                    last_activity_ts,
368                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
369                    version: node.version,
370                    git_commit: node.git_commit,
371                    start_time_ms: node.start_time_ms,
372                }))
373                .collect::<Vec<_>>()
374        } else {
375            Vec::new()
376        };
377
378        if let Some(prefix) = nodes_key_prefix {
379            let req = RangeRequest::new().with_prefix(prefix);
380            let res = cluster_client.range(req).await?;
381            for kv in res.kvs {
382                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
383            }
384        }
385
386        Ok(nodes)
387    }
388
389    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
390        let cluster_kv_backend = Arc::new(self.cluster_client()?);
391        let range_prefix = DatanodeStatKey::prefix_key();
392        let req = RangeRequest::new().with_prefix(range_prefix);
393        let stream =
394            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
395        let mut datanode_stats = stream
396            .try_collect::<Vec<_>>()
397            .await
398            .context(ConvertMetaResponseSnafu)?;
399        let region_stats = datanode_stats
400            .iter_mut()
401            .flat_map(|datanode_stat| {
402                let last = datanode_stat.stats.pop();
403                last.map(|stat| stat.region_stats).unwrap_or_default()
404            })
405            .collect::<Vec<_>>();
406
407        Ok(region_stats)
408    }
409
410    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
411        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
412        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
413        let flow_state_manager = FlowStateManager::new(cluster_backend);
414        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
415
416        Ok(res.map(|r| r.into()))
417    }
418}
419
420fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
421    DatanodeStatValue::try_from(kv.value)
422        .map_err(BoxedError::new)
423        .context(ExternalSnafu)
424}
425
426impl MetaClient {
427    pub fn new(id: Id) -> Self {
428        Self {
429            id,
430            ..Default::default()
431        }
432    }
433
434    pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
435        Self {
436            id,
437            channel_manager,
438            ..Default::default()
439        }
440    }
441
442    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
443    where
444        U: AsRef<str>,
445        A: AsRef<[U]> + Clone,
446    {
447        info!("MetaClient channel config: {:?}", self.channel_config());
448
449        if let Some(client) = &mut self.region_follower {
450            let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
451            client.start(&urls).await?;
452            info!("Region follower client started");
453        }
454        if let Some(client) = &mut self.heartbeat {
455            client.start(urls.clone()).await?;
456            info!("Heartbeat client started");
457        }
458        if let Some(client) = &mut self.store {
459            client.start(urls.clone()).await?;
460            info!("Store client started");
461        }
462        if let Some(client) = &mut self.procedure {
463            client.start(urls.clone()).await?;
464            info!("DDL client started");
465        }
466        if let Some(client) = &mut self.cluster {
467            client.start(urls).await?;
468            info!("Cluster client started");
469        }
470
471        Ok(())
472    }
473
474    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
475    pub(crate) async fn start_with<U, A>(
476        &mut self,
477        leader_provider: LeaderProviderRef,
478        peers: A,
479    ) -> Result<()>
480    where
481        U: AsRef<str>,
482        A: AsRef<[U]> + Clone,
483    {
484        if let Some(client) = &self.region_follower {
485            info!("Starting region follower client ...");
486            client.start_with(leader_provider.clone()).await?;
487        }
488
489        if let Some(client) = &self.heartbeat {
490            info!("Starting heartbeat client ...");
491            client.start_with(leader_provider.clone()).await?;
492        }
493
494        if let Some(client) = &mut self.store {
495            info!("Starting store client ...");
496            client.start(peers.clone()).await?;
497        }
498
499        if let Some(client) = &self.procedure {
500            info!("Starting procedure client ...");
501            client.start_with(leader_provider.clone()).await?;
502        }
503
504        if let Some(client) = &mut self.cluster {
505            info!("Starting cluster client ...");
506            client.start_with(leader_provider).await?;
507        }
508        Ok(())
509    }
510
511    /// Ask the leader address of `metasrv`, and the heartbeat component
512    /// needs to create a bidirectional streaming to the leader.
513    pub async fn ask_leader(&self) -> Result<String> {
514        self.heartbeat_client()?.ask_leader().await
515    }
516
517    /// Returns a heartbeat bidirectional streaming: (sender, recever), the
518    /// other end is the leader of `metasrv`.
519    ///
520    /// The `datanode` needs to use the sender to continuously send heartbeat
521    /// packets (some self-state data), and the receiver can receive a response
522    /// from "metasrv" (which may contain some scheduling instructions).
523    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
524        self.heartbeat_client()?.heartbeat().await
525    }
526
527    /// Range gets the keys in the range from the key-value store.
528    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
529        self.store_client()?
530            .range(req.into())
531            .await?
532            .try_into()
533            .context(ConvertMetaResponseSnafu)
534    }
535
536    /// Put puts the given key into the key-value store.
537    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
538        self.store_client()?
539            .put(req.into())
540            .await?
541            .try_into()
542            .context(ConvertMetaResponseSnafu)
543    }
544
545    /// BatchGet atomically get values by the given keys from the key-value store.
546    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
547        self.store_client()?
548            .batch_get(req.into())
549            .await?
550            .try_into()
551            .context(ConvertMetaResponseSnafu)
552    }
553
554    /// BatchPut atomically puts the given keys into the key-value store.
555    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
556        self.store_client()?
557            .batch_put(req.into())
558            .await?
559            .try_into()
560            .context(ConvertMetaResponseSnafu)
561    }
562
563    /// BatchDelete atomically deletes the given keys from the key-value store.
564    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
565        self.store_client()?
566            .batch_delete(req.into())
567            .await?
568            .try_into()
569            .context(ConvertMetaResponseSnafu)
570    }
571
572    /// CompareAndPut atomically puts the value to the given updated
573    /// value if the current value == the expected value.
574    pub async fn compare_and_put(
575        &self,
576        req: CompareAndPutRequest,
577    ) -> Result<CompareAndPutResponse> {
578        self.store_client()?
579            .compare_and_put(req.into())
580            .await?
581            .try_into()
582            .context(ConvertMetaResponseSnafu)
583    }
584
585    /// DeleteRange deletes the given range from the key-value store.
586    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
587        self.store_client()?
588            .delete_range(req.into())
589            .await?
590            .try_into()
591            .context(ConvertMetaResponseSnafu)
592    }
593
594    /// Query the procedure state by its id.
595    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
596        self.procedure_client()?.query_procedure_state(pid).await
597    }
598
599    /// Submit a region migration task.
600    pub async fn migrate_region(
601        &self,
602        request: MigrateRegionRequest,
603    ) -> Result<MigrateRegionResponse> {
604        self.procedure_client()?
605            .migrate_region(
606                request.region_id,
607                request.from_peer,
608                request.to_peer,
609                request.timeout,
610            )
611            .await
612    }
613
614    /// Submit a DDL task
615    pub async fn submit_ddl_task(
616        &self,
617        req: SubmitDdlTaskRequest,
618    ) -> Result<SubmitDdlTaskResponse> {
619        let res = self
620            .procedure_client()?
621            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
622            .await?
623            .try_into()
624            .context(ConvertMetaResponseSnafu)?;
625
626        Ok(res)
627    }
628
629    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
630        self.heartbeat.clone().context(NotStartedSnafu {
631            name: "heartbeat_client",
632        })
633    }
634
635    pub fn store_client(&self) -> Result<StoreClient> {
636        self.store.clone().context(NotStartedSnafu {
637            name: "store_client",
638        })
639    }
640
641    pub fn procedure_client(&self) -> Result<ProcedureClient> {
642        self.procedure.clone().context(NotStartedSnafu {
643            name: "procedure_client",
644        })
645    }
646
647    pub fn cluster_client(&self) -> Result<ClusterClient> {
648        self.cluster.clone().context(NotStartedSnafu {
649            name: "cluster_client",
650        })
651    }
652
653    pub fn channel_config(&self) -> &ChannelConfig {
654        self.channel_manager.config()
655    }
656
657    pub fn id(&self) -> Id {
658        self.id
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use std::sync::atomic::{AtomicUsize, Ordering};
665
666    use api::v1::meta::{HeartbeatRequest, Peer};
667    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
668    use rand::Rng;
669
670    use super::*;
671    use crate::error;
672    use crate::mocks::{self, MockMetaContext};
673
674    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
675
676    struct TestClient {
677        ns: String,
678        client: MetaClient,
679        meta_ctx: MockMetaContext,
680    }
681
682    impl TestClient {
683        async fn new(ns: impl Into<String>) -> Self {
684            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
685            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
686            Self {
687                ns: ns.into(),
688                client,
689                meta_ctx,
690            }
691        }
692
693        fn key(&self, name: &str) -> Vec<u8> {
694            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
695        }
696
697        async fn gen_data(&self) {
698            for i in 0..10 {
699                let req = PutRequest::new()
700                    .with_key(self.key(&format!("key-{i}")))
701                    .with_value(format!("{}-{}", "value", i).into_bytes())
702                    .with_prev_kv();
703                let res = self.client.put(req).await;
704                let _ = res.unwrap();
705            }
706        }
707
708        async fn clear_data(&self) {
709            let req =
710                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
711            let res = self.client.delete_range(req).await;
712            let _ = res.unwrap();
713        }
714
715        #[allow(dead_code)]
716        fn kv_backend(&self) -> KvBackendRef {
717            self.meta_ctx.kv_backend.clone()
718        }
719
720        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
721            self.meta_ctx.in_memory.clone()
722        }
723    }
724
725    async fn new_client(ns: impl Into<String>) -> TestClient {
726        let client = TestClient::new(ns).await;
727        client.clear_data().await;
728        client
729    }
730
731    #[tokio::test]
732    async fn test_meta_client_builder() {
733        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
734
735        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
736            .enable_heartbeat()
737            .build();
738        let _ = meta_client.heartbeat_client().unwrap();
739        assert!(meta_client.store_client().is_err());
740        meta_client.start(urls).await.unwrap();
741
742        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
743        assert!(meta_client.heartbeat_client().is_err());
744        assert!(meta_client.store_client().is_err());
745        meta_client.start(urls).await.unwrap();
746
747        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
748            .enable_store()
749            .build();
750        assert!(meta_client.heartbeat_client().is_err());
751        let _ = meta_client.store_client().unwrap();
752        meta_client.start(urls).await.unwrap();
753
754        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
755            .enable_heartbeat()
756            .enable_store()
757            .build();
758        assert_eq!(2, meta_client.id());
759        assert_eq!(2, meta_client.id());
760        let _ = meta_client.heartbeat_client().unwrap();
761        let _ = meta_client.store_client().unwrap();
762        meta_client.start(urls).await.unwrap();
763    }
764
765    #[tokio::test]
766    async fn test_not_start_heartbeat_client() {
767        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
768        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
769            .enable_store()
770            .build();
771        meta_client.start(urls).await.unwrap();
772        let res = meta_client.ask_leader().await;
773        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
774    }
775
776    #[tokio::test]
777    async fn test_not_start_store_client() {
778        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
779        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
780            .enable_heartbeat()
781            .build();
782
783        meta_client.start(urls).await.unwrap();
784        let res = meta_client.put(PutRequest::default()).await;
785        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
786    }
787
788    #[tokio::test]
789    async fn test_ask_leader() {
790        let tc = new_client("test_ask_leader").await;
791        tc.client.ask_leader().await.unwrap();
792    }
793
794    #[tokio::test]
795    async fn test_heartbeat() {
796        let tc = new_client("test_heartbeat").await;
797        let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
798        // send heartbeats
799
800        let request_sent = Arc::new(AtomicUsize::new(0));
801        let request_sent_clone = request_sent.clone();
802        let _handle = tokio::spawn(async move {
803            for _ in 0..5 {
804                let req = HeartbeatRequest {
805                    peer: Some(Peer {
806                        id: 1,
807                        addr: "meta_client_peer".to_string(),
808                    }),
809                    ..Default::default()
810                };
811                sender.send(req).await.unwrap();
812                request_sent_clone.fetch_add(1, Ordering::Relaxed);
813            }
814        });
815
816        let heartbeat_count = Arc::new(AtomicUsize::new(0));
817        let heartbeat_count_clone = heartbeat_count.clone();
818        let handle = tokio::spawn(async move {
819            while let Some(_resp) = receiver.message().await.unwrap() {
820                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
821            }
822        });
823
824        handle.await.unwrap();
825        //+1 for the initial response
826        assert_eq!(
827            request_sent.load(Ordering::Relaxed) + 1,
828            heartbeat_count.load(Ordering::Relaxed)
829        );
830    }
831
832    #[tokio::test]
833    async fn test_range_get() {
834        let tc = new_client("test_range_get").await;
835        tc.gen_data().await;
836
837        let key = tc.key("key-0");
838        let req = RangeRequest::new().with_key(key.as_slice());
839        let res = tc.client.range(req).await;
840        let mut kvs = res.unwrap().take_kvs();
841        assert_eq!(1, kvs.len());
842        let mut kv = kvs.pop().unwrap();
843        assert_eq!(key, kv.take_key());
844        assert_eq!(b"value-0".to_vec(), kv.take_value());
845    }
846
847    #[tokio::test]
848    async fn test_range_get_prefix() {
849        let tc = new_client("test_range_get_prefix").await;
850        tc.gen_data().await;
851
852        let req = RangeRequest::new().with_prefix(tc.key("key-"));
853        let res = tc.client.range(req).await;
854        let kvs = res.unwrap().take_kvs();
855        assert_eq!(10, kvs.len());
856        for (i, mut kv) in kvs.into_iter().enumerate() {
857            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
858            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
859        }
860    }
861
862    #[tokio::test]
863    async fn test_range() {
864        let tc = new_client("test_range").await;
865        tc.gen_data().await;
866
867        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
868        let res = tc.client.range(req).await;
869        let kvs = res.unwrap().take_kvs();
870        assert_eq!(3, kvs.len());
871        for (i, mut kv) in kvs.into_iter().enumerate() {
872            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
873            assert_eq!(
874                format!("{}-{}", "value", i + 5).into_bytes(),
875                kv.take_value()
876            );
877        }
878    }
879
880    #[tokio::test]
881    async fn test_range_keys_only() {
882        let tc = new_client("test_range_keys_only").await;
883        tc.gen_data().await;
884
885        let req = RangeRequest::new()
886            .with_range(tc.key("key-5"), tc.key("key-8"))
887            .with_keys_only();
888        let res = tc.client.range(req).await;
889        let kvs = res.unwrap().take_kvs();
890        assert_eq!(3, kvs.len());
891        for (i, mut kv) in kvs.into_iter().enumerate() {
892            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
893            assert!(kv.take_value().is_empty());
894        }
895    }
896
897    #[tokio::test]
898    async fn test_put() {
899        let tc = new_client("test_put").await;
900
901        let req = PutRequest::new()
902            .with_key(tc.key("key"))
903            .with_value(b"value".to_vec());
904        let res = tc.client.put(req).await;
905        assert!(res.unwrap().prev_kv.is_none());
906    }
907
908    #[tokio::test]
909    async fn test_put_with_prev_kv() {
910        let tc = new_client("test_put_with_prev_kv").await;
911
912        let key = tc.key("key");
913        let req = PutRequest::new()
914            .with_key(key.as_slice())
915            .with_value(b"value".to_vec())
916            .with_prev_kv();
917        let res = tc.client.put(req).await;
918        assert!(res.unwrap().prev_kv.is_none());
919
920        let req = PutRequest::new()
921            .with_key(key.as_slice())
922            .with_value(b"value1".to_vec())
923            .with_prev_kv();
924        let res = tc.client.put(req).await;
925        let mut kv = res.unwrap().prev_kv.unwrap();
926        assert_eq!(key, kv.take_key());
927        assert_eq!(b"value".to_vec(), kv.take_value());
928    }
929
930    #[tokio::test]
931    async fn test_batch_put() {
932        let tc = new_client("test_batch_put").await;
933
934        let mut req = BatchPutRequest::new();
935        for i in 0..275 {
936            req = req.add_kv(
937                tc.key(&format!("key-{}", i)),
938                format!("value-{}", i).into_bytes(),
939            );
940        }
941
942        let res = tc.client.batch_put(req).await;
943        assert_eq!(0, res.unwrap().take_prev_kvs().len());
944
945        let req = RangeRequest::new().with_prefix(tc.key("key-"));
946        let res = tc.client.range(req).await;
947        let kvs = res.unwrap().take_kvs();
948        assert_eq!(275, kvs.len());
949    }
950
951    #[tokio::test]
952    async fn test_batch_get() {
953        let tc = new_client("test_batch_get").await;
954        tc.gen_data().await;
955
956        let mut req = BatchGetRequest::default();
957        for i in 0..256 {
958            req = req.add_key(tc.key(&format!("key-{}", i)));
959        }
960        let res = tc.client.batch_get(req).await.unwrap();
961        assert_eq!(10, res.kvs.len());
962
963        let req = BatchGetRequest::default()
964            .add_key(tc.key("key-1"))
965            .add_key(tc.key("key-999"));
966        let res = tc.client.batch_get(req).await.unwrap();
967        assert_eq!(1, res.kvs.len());
968    }
969
970    #[tokio::test]
971    async fn test_batch_put_with_prev_kv() {
972        let tc = new_client("test_batch_put_with_prev_kv").await;
973
974        let key = tc.key("key");
975        let key2 = tc.key("key2");
976        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
977        let res = tc.client.batch_put(req).await;
978        assert_eq!(0, res.unwrap().take_prev_kvs().len());
979
980        let req = BatchPutRequest::new()
981            .add_kv(key.as_slice(), b"value-".to_vec())
982            .add_kv(key2.as_slice(), b"value2-".to_vec())
983            .with_prev_kv();
984        let res = tc.client.batch_put(req).await;
985        let mut kvs = res.unwrap().take_prev_kvs();
986        assert_eq!(1, kvs.len());
987        let mut kv = kvs.pop().unwrap();
988        assert_eq!(key, kv.take_key());
989        assert_eq!(b"value".to_vec(), kv.take_value());
990    }
991
992    #[tokio::test]
993    async fn test_compare_and_put() {
994        let tc = new_client("test_compare_and_put").await;
995
996        let key = tc.key("key");
997        let req = CompareAndPutRequest::new()
998            .with_key(key.as_slice())
999            .with_expect(b"expect".to_vec())
1000            .with_value(b"value".to_vec());
1001        let res = tc.client.compare_and_put(req).await;
1002        assert!(!res.unwrap().is_success());
1003
1004        // create if absent
1005        let req = CompareAndPutRequest::new()
1006            .with_key(key.as_slice())
1007            .with_value(b"value".to_vec());
1008        let res = tc.client.compare_and_put(req).await;
1009        let mut res = res.unwrap();
1010        assert!(res.is_success());
1011        assert!(res.take_prev_kv().is_none());
1012
1013        // compare and put fail
1014        let req = CompareAndPutRequest::new()
1015            .with_key(key.as_slice())
1016            .with_expect(b"not_eq".to_vec())
1017            .with_value(b"value2".to_vec());
1018        let res = tc.client.compare_and_put(req).await;
1019        let mut res = res.unwrap();
1020        assert!(!res.is_success());
1021        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1022
1023        // compare and put success
1024        let req = CompareAndPutRequest::new()
1025            .with_key(key.as_slice())
1026            .with_expect(b"value".to_vec())
1027            .with_value(b"value2".to_vec());
1028        let res = tc.client.compare_and_put(req).await;
1029        let mut res = res.unwrap();
1030        assert!(res.is_success());
1031
1032        // If compare-and-put is success, previous value doesn't need to be returned.
1033        assert!(res.take_prev_kv().is_none());
1034    }
1035
1036    #[tokio::test]
1037    async fn test_delete_with_key() {
1038        let tc = new_client("test_delete_with_key").await;
1039        tc.gen_data().await;
1040
1041        let req = DeleteRangeRequest::new()
1042            .with_key(tc.key("key-0"))
1043            .with_prev_kv();
1044        let res = tc.client.delete_range(req).await;
1045        let mut res = res.unwrap();
1046        assert_eq!(1, res.deleted());
1047        let mut kvs = res.take_prev_kvs();
1048        assert_eq!(1, kvs.len());
1049        let mut kv = kvs.pop().unwrap();
1050        assert_eq!(b"value-0".to_vec(), kv.take_value());
1051    }
1052
1053    #[tokio::test]
1054    async fn test_delete_with_prefix() {
1055        let tc = new_client("test_delete_with_prefix").await;
1056        tc.gen_data().await;
1057
1058        let req = DeleteRangeRequest::new()
1059            .with_prefix(tc.key("key-"))
1060            .with_prev_kv();
1061        let res = tc.client.delete_range(req).await;
1062        let mut res = res.unwrap();
1063        assert_eq!(10, res.deleted());
1064        let kvs = res.take_prev_kvs();
1065        assert_eq!(10, kvs.len());
1066        for (i, mut kv) in kvs.into_iter().enumerate() {
1067            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1068        }
1069    }
1070
1071    #[tokio::test]
1072    async fn test_delete_with_range() {
1073        let tc = new_client("test_delete_with_range").await;
1074        tc.gen_data().await;
1075
1076        let req = DeleteRangeRequest::new()
1077            .with_range(tc.key("key-2"), tc.key("key-7"))
1078            .with_prev_kv();
1079        let res = tc.client.delete_range(req).await;
1080        let mut res = res.unwrap();
1081        assert_eq!(5, res.deleted());
1082        let kvs = res.take_prev_kvs();
1083        assert_eq!(5, kvs.len());
1084        for (i, mut kv) in kvs.into_iter().enumerate() {
1085            assert_eq!(
1086                format!("{}-{}", "value", i + 2).into_bytes(),
1087                kv.take_value()
1088            );
1089        }
1090    }
1091
1092    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1093        Ok(())
1094    }
1095
1096    #[tokio::test]
1097    async fn test_cluster_client_adaptive_range() {
1098        let tx = new_client("test_cluster_client").await;
1099        let in_memory = tx.in_memory().unwrap();
1100        let cluster_client = tx.client.cluster_client().unwrap();
1101        let mut rng = rand::rng();
1102
1103        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1104        for i in 0..10 {
1105            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1106            in_memory
1107                .put(
1108                    PutRequest::new()
1109                        .with_key(format!("__prefix/{i}").as_bytes())
1110                        .with_value(data.clone()),
1111                )
1112                .await
1113                .unwrap();
1114        }
1115
1116        let req = RangeRequest::new().with_prefix(b"__prefix/");
1117        let stream =
1118            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1119
1120        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1121        assert_eq!(10, res.len());
1122    }
1123}