meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, Role};
28pub use ask_leader::AskLeader;
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
38use common_meta::error::{
39    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
40};
41use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
45use common_meta::rpc::procedure::{
46    AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
47    RemoveRegionFollowerRequest,
48};
49use common_meta::rpc::store::{
50    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
51    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
52    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
53};
54use common_meta::rpc::KeyValue;
55use common_telemetry::info;
56use futures::TryStreamExt;
57use heartbeat::Client as HeartbeatClient;
58use procedure::Client as ProcedureClient;
59use snafu::{OptionExt, ResultExt};
60use store::Client as StoreClient;
61
62pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
63use crate::error::{
64    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
65    Result,
66};
67
68pub type Id = u64;
69
70const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
71const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
72const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
73
74#[derive(Clone, Debug, Default)]
75pub struct MetaClientBuilder {
76    id: Id,
77    role: Role,
78    enable_heartbeat: bool,
79    enable_store: bool,
80    enable_procedure: bool,
81    enable_access_cluster_info: bool,
82    region_follower: Option<RegionFollowerClientRef>,
83    channel_manager: Option<ChannelManager>,
84    ddl_channel_manager: Option<ChannelManager>,
85    heartbeat_channel_manager: Option<ChannelManager>,
86}
87
88impl MetaClientBuilder {
89    pub fn new(member_id: u64, role: Role) -> Self {
90        Self {
91            id: member_id,
92            role,
93            ..Default::default()
94        }
95    }
96
97    /// Returns the role of Frontend's default options.
98    pub fn frontend_default_options() -> Self {
99        // Frontend does not need a member id.
100        Self::new(0, Role::Frontend)
101            .enable_store()
102            .enable_heartbeat()
103            .enable_procedure()
104            .enable_access_cluster_info()
105    }
106
107    /// Returns the role of Datanode's default options.
108    pub fn datanode_default_options(member_id: u64) -> Self {
109        Self::new(member_id, Role::Datanode)
110            .enable_store()
111            .enable_heartbeat()
112    }
113
114    /// Returns the role of Flownode's default options.
115    pub fn flownode_default_options(member_id: u64) -> Self {
116        Self::new(member_id, Role::Flownode)
117            .enable_store()
118            .enable_heartbeat()
119            .enable_procedure()
120            .enable_access_cluster_info()
121    }
122
123    pub fn enable_heartbeat(self) -> Self {
124        Self {
125            enable_heartbeat: true,
126            ..self
127        }
128    }
129
130    pub fn enable_store(self) -> Self {
131        Self {
132            enable_store: true,
133            ..self
134        }
135    }
136
137    pub fn enable_procedure(self) -> Self {
138        Self {
139            enable_procedure: true,
140            ..self
141        }
142    }
143
144    pub fn enable_access_cluster_info(self) -> Self {
145        Self {
146            enable_access_cluster_info: true,
147            ..self
148        }
149    }
150
151    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
152        Self {
153            channel_manager: Some(channel_manager),
154            ..self
155        }
156    }
157
158    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
159        Self {
160            ddl_channel_manager: Some(channel_manager),
161            ..self
162        }
163    }
164
165    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
166        Self {
167            heartbeat_channel_manager: Some(channel_manager),
168            ..self
169        }
170    }
171
172    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
173        Self {
174            region_follower: Some(region_follower),
175            ..self
176        }
177    }
178
179    pub fn build(self) -> MetaClient {
180        let mut client = if let Some(mgr) = self.channel_manager {
181            MetaClient::with_channel_manager(self.id, mgr)
182        } else {
183            MetaClient::new(self.id)
184        };
185
186        let mgr = client.channel_manager.clone();
187
188        if self.enable_heartbeat {
189            let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
190            client.heartbeat = Some(HeartbeatClient::new(
191                self.id,
192                self.role,
193                mgr,
194                DEFAULT_ASK_LEADER_MAX_RETRY,
195            ));
196        }
197
198        if self.enable_store {
199            client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
200        }
201
202        if self.enable_procedure {
203            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
204            client.procedure = Some(ProcedureClient::new(
205                self.id,
206                self.role,
207                mgr,
208                DEFAULT_SUBMIT_DDL_MAX_RETRY,
209            ));
210        }
211
212        if self.enable_access_cluster_info {
213            client.cluster = Some(ClusterClient::new(
214                self.id,
215                self.role,
216                mgr,
217                DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
218            ))
219        }
220
221        if let Some(region_follower) = self.region_follower {
222            client.region_follower = Some(region_follower);
223        }
224
225        client
226    }
227}
228
229#[derive(Debug, Default)]
230pub struct MetaClient {
231    id: Id,
232    channel_manager: ChannelManager,
233    heartbeat: Option<HeartbeatClient>,
234    store: Option<StoreClient>,
235    procedure: Option<ProcedureClient>,
236    cluster: Option<ClusterClient>,
237    region_follower: Option<RegionFollowerClientRef>,
238}
239
240pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
241
242/// A trait for clients that can manage region followers.
243#[async_trait::async_trait]
244pub trait RegionFollowerClient: Sync + Send + Debug {
245    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
246
247    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
248
249    async fn start(&self, urls: &[&str]) -> Result<()>;
250}
251
252#[async_trait::async_trait]
253impl ProcedureExecutor for MetaClient {
254    async fn submit_ddl_task(
255        &self,
256        _ctx: &ExecutorContext,
257        request: SubmitDdlTaskRequest,
258    ) -> MetaResult<SubmitDdlTaskResponse> {
259        self.submit_ddl_task(request)
260            .await
261            .map_err(BoxedError::new)
262            .context(meta_error::ExternalSnafu)
263    }
264
265    async fn migrate_region(
266        &self,
267        _ctx: &ExecutorContext,
268        request: MigrateRegionRequest,
269    ) -> MetaResult<MigrateRegionResponse> {
270        self.migrate_region(request)
271            .await
272            .map_err(BoxedError::new)
273            .context(meta_error::ExternalSnafu)
274    }
275
276    async fn add_region_follower(
277        &self,
278        _ctx: &ExecutorContext,
279        request: AddRegionFollowerRequest,
280    ) -> MetaResult<()> {
281        if let Some(region_follower) = &self.region_follower {
282            region_follower
283                .add_region_follower(request)
284                .await
285                .map_err(BoxedError::new)
286                .context(meta_error::ExternalSnafu)
287        } else {
288            UnsupportedSnafu {
289                operation: "add_region_follower",
290            }
291            .fail()
292        }
293    }
294
295    async fn remove_region_follower(
296        &self,
297        _ctx: &ExecutorContext,
298        request: RemoveRegionFollowerRequest,
299    ) -> MetaResult<()> {
300        if let Some(region_follower) = &self.region_follower {
301            region_follower
302                .remove_region_follower(request)
303                .await
304                .map_err(BoxedError::new)
305                .context(meta_error::ExternalSnafu)
306        } else {
307            UnsupportedSnafu {
308                operation: "remove_region_follower",
309            }
310            .fail()
311        }
312    }
313
314    async fn query_procedure_state(
315        &self,
316        _ctx: &ExecutorContext,
317        pid: &str,
318    ) -> MetaResult<ProcedureStateResponse> {
319        self.query_procedure_state(pid)
320            .await
321            .map_err(BoxedError::new)
322            .context(meta_error::ExternalSnafu)
323    }
324
325    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
326        self.procedure_client()
327            .map_err(BoxedError::new)
328            .context(meta_error::ExternalSnafu)?
329            .list_procedures()
330            .await
331            .map_err(BoxedError::new)
332            .context(meta_error::ExternalSnafu)
333    }
334}
335
336#[async_trait::async_trait]
337impl ClusterInfo for MetaClient {
338    type Error = Error;
339
340    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
341        let cluster_client = self.cluster_client()?;
342
343        let (get_metasrv_nodes, nodes_key_prefix) = match role {
344            None => (true, Some(NodeInfoKey::key_prefix())),
345            Some(ClusterRole::Metasrv) => (true, None),
346            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
347        };
348
349        let mut nodes = if get_metasrv_nodes {
350            let last_activity_ts = -1; // Metasrv does not provide this information.
351
352            let (leader, followers) = cluster_client.get_metasrv_peers().await?;
353            followers
354                .into_iter()
355                .map(|node| NodeInfo {
356                    peer: node.peer.unwrap_or_default(),
357                    last_activity_ts,
358                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
359                    version: node.version,
360                    git_commit: node.git_commit,
361                    start_time_ms: node.start_time_ms,
362                })
363                .chain(leader.into_iter().map(|node| NodeInfo {
364                    peer: node.peer.unwrap_or_default(),
365                    last_activity_ts,
366                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
367                    version: node.version,
368                    git_commit: node.git_commit,
369                    start_time_ms: node.start_time_ms,
370                }))
371                .collect::<Vec<_>>()
372        } else {
373            Vec::new()
374        };
375
376        if let Some(prefix) = nodes_key_prefix {
377            let req = RangeRequest::new().with_prefix(prefix);
378            let res = cluster_client.range(req).await?;
379            for kv in res.kvs {
380                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
381            }
382        }
383
384        Ok(nodes)
385    }
386
387    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
388        let cluster_kv_backend = Arc::new(self.cluster_client()?);
389        let range_prefix = DatanodeStatKey::prefix_key();
390        let req = RangeRequest::new().with_prefix(range_prefix);
391        let stream =
392            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
393        let mut datanode_stats = stream
394            .try_collect::<Vec<_>>()
395            .await
396            .context(ConvertMetaResponseSnafu)?;
397        let region_stats = datanode_stats
398            .iter_mut()
399            .flat_map(|datanode_stat| {
400                let last = datanode_stat.stats.pop();
401                last.map(|stat| stat.region_stats).unwrap_or_default()
402            })
403            .collect::<Vec<_>>();
404
405        Ok(region_stats)
406    }
407
408    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
409        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
410        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
411        let flow_state_manager = FlowStateManager::new(cluster_backend);
412        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
413
414        Ok(res.map(|r| r.into()))
415    }
416}
417
418fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
419    DatanodeStatValue::try_from(kv.value)
420        .map_err(BoxedError::new)
421        .context(ExternalSnafu)
422}
423
424impl MetaClient {
425    pub fn new(id: Id) -> Self {
426        Self {
427            id,
428            ..Default::default()
429        }
430    }
431
432    pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
433        Self {
434            id,
435            channel_manager,
436            ..Default::default()
437        }
438    }
439
440    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
441    where
442        U: AsRef<str>,
443        A: AsRef<[U]> + Clone,
444    {
445        info!("MetaClient channel config: {:?}", self.channel_config());
446
447        if let Some(client) = &mut self.region_follower {
448            let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
449            client.start(&urls).await?;
450            info!("Region follower client started");
451        }
452        if let Some(client) = &mut self.heartbeat {
453            client.start(urls.clone()).await?;
454            info!("Heartbeat client started");
455        }
456        if let Some(client) = &mut self.store {
457            client.start(urls.clone()).await?;
458            info!("Store client started");
459        }
460        if let Some(client) = &mut self.procedure {
461            client.start(urls.clone()).await?;
462            info!("DDL client started");
463        }
464        if let Some(client) = &mut self.cluster {
465            client.start(urls).await?;
466            info!("Cluster client started");
467        }
468
469        Ok(())
470    }
471
472    /// Ask the leader address of `metasrv`, and the heartbeat component
473    /// needs to create a bidirectional streaming to the leader.
474    pub async fn ask_leader(&self) -> Result<String> {
475        self.heartbeat_client()?.ask_leader().await
476    }
477
478    /// Returns a heartbeat bidirectional streaming: (sender, recever), the
479    /// other end is the leader of `metasrv`.
480    ///
481    /// The `datanode` needs to use the sender to continuously send heartbeat
482    /// packets (some self-state data), and the receiver can receive a response
483    /// from "metasrv" (which may contain some scheduling instructions).
484    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
485        self.heartbeat_client()?.heartbeat().await
486    }
487
488    /// Range gets the keys in the range from the key-value store.
489    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
490        self.store_client()?
491            .range(req.into())
492            .await?
493            .try_into()
494            .context(ConvertMetaResponseSnafu)
495    }
496
497    /// Put puts the given key into the key-value store.
498    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
499        self.store_client()?
500            .put(req.into())
501            .await?
502            .try_into()
503            .context(ConvertMetaResponseSnafu)
504    }
505
506    /// BatchGet atomically get values by the given keys from the key-value store.
507    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
508        self.store_client()?
509            .batch_get(req.into())
510            .await?
511            .try_into()
512            .context(ConvertMetaResponseSnafu)
513    }
514
515    /// BatchPut atomically puts the given keys into the key-value store.
516    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
517        self.store_client()?
518            .batch_put(req.into())
519            .await?
520            .try_into()
521            .context(ConvertMetaResponseSnafu)
522    }
523
524    /// BatchDelete atomically deletes the given keys from the key-value store.
525    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
526        self.store_client()?
527            .batch_delete(req.into())
528            .await?
529            .try_into()
530            .context(ConvertMetaResponseSnafu)
531    }
532
533    /// CompareAndPut atomically puts the value to the given updated
534    /// value if the current value == the expected value.
535    pub async fn compare_and_put(
536        &self,
537        req: CompareAndPutRequest,
538    ) -> Result<CompareAndPutResponse> {
539        self.store_client()?
540            .compare_and_put(req.into())
541            .await?
542            .try_into()
543            .context(ConvertMetaResponseSnafu)
544    }
545
546    /// DeleteRange deletes the given range from the key-value store.
547    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
548        self.store_client()?
549            .delete_range(req.into())
550            .await?
551            .try_into()
552            .context(ConvertMetaResponseSnafu)
553    }
554
555    /// Query the procedure state by its id.
556    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
557        self.procedure_client()?.query_procedure_state(pid).await
558    }
559
560    /// Submit a region migration task.
561    pub async fn migrate_region(
562        &self,
563        request: MigrateRegionRequest,
564    ) -> Result<MigrateRegionResponse> {
565        self.procedure_client()?
566            .migrate_region(
567                request.region_id,
568                request.from_peer,
569                request.to_peer,
570                request.timeout,
571            )
572            .await
573    }
574
575    /// Submit a DDL task
576    pub async fn submit_ddl_task(
577        &self,
578        req: SubmitDdlTaskRequest,
579    ) -> Result<SubmitDdlTaskResponse> {
580        let res = self
581            .procedure_client()?
582            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
583            .await?
584            .try_into()
585            .context(ConvertMetaResponseSnafu)?;
586
587        Ok(res)
588    }
589
590    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
591        self.heartbeat.clone().context(NotStartedSnafu {
592            name: "heartbeat_client",
593        })
594    }
595
596    pub fn store_client(&self) -> Result<StoreClient> {
597        self.store.clone().context(NotStartedSnafu {
598            name: "store_client",
599        })
600    }
601
602    pub fn procedure_client(&self) -> Result<ProcedureClient> {
603        self.procedure.clone().context(NotStartedSnafu {
604            name: "procedure_client",
605        })
606    }
607
608    pub fn cluster_client(&self) -> Result<ClusterClient> {
609        self.cluster.clone().context(NotStartedSnafu {
610            name: "cluster_client",
611        })
612    }
613
614    pub fn channel_config(&self) -> &ChannelConfig {
615        self.channel_manager.config()
616    }
617
618    pub fn id(&self) -> Id {
619        self.id
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use std::sync::atomic::{AtomicUsize, Ordering};
626
627    use api::v1::meta::{HeartbeatRequest, Peer};
628    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
629    use rand::Rng;
630
631    use super::*;
632    use crate::error;
633    use crate::mocks::{self, MockMetaContext};
634
635    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
636
637    struct TestClient {
638        ns: String,
639        client: MetaClient,
640        meta_ctx: MockMetaContext,
641    }
642
643    impl TestClient {
644        async fn new(ns: impl Into<String>) -> Self {
645            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
646            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
647            Self {
648                ns: ns.into(),
649                client,
650                meta_ctx,
651            }
652        }
653
654        fn key(&self, name: &str) -> Vec<u8> {
655            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
656        }
657
658        async fn gen_data(&self) {
659            for i in 0..10 {
660                let req = PutRequest::new()
661                    .with_key(self.key(&format!("key-{i}")))
662                    .with_value(format!("{}-{}", "value", i).into_bytes())
663                    .with_prev_kv();
664                let res = self.client.put(req).await;
665                let _ = res.unwrap();
666            }
667        }
668
669        async fn clear_data(&self) {
670            let req =
671                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
672            let res = self.client.delete_range(req).await;
673            let _ = res.unwrap();
674        }
675
676        #[allow(dead_code)]
677        fn kv_backend(&self) -> KvBackendRef {
678            self.meta_ctx.kv_backend.clone()
679        }
680
681        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
682            self.meta_ctx.in_memory.clone()
683        }
684    }
685
686    async fn new_client(ns: impl Into<String>) -> TestClient {
687        let client = TestClient::new(ns).await;
688        client.clear_data().await;
689        client
690    }
691
692    #[tokio::test]
693    async fn test_meta_client_builder() {
694        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
695
696        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
697            .enable_heartbeat()
698            .build();
699        let _ = meta_client.heartbeat_client().unwrap();
700        assert!(meta_client.store_client().is_err());
701        meta_client.start(urls).await.unwrap();
702
703        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
704        assert!(meta_client.heartbeat_client().is_err());
705        assert!(meta_client.store_client().is_err());
706        meta_client.start(urls).await.unwrap();
707
708        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
709            .enable_store()
710            .build();
711        assert!(meta_client.heartbeat_client().is_err());
712        let _ = meta_client.store_client().unwrap();
713        meta_client.start(urls).await.unwrap();
714
715        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
716            .enable_heartbeat()
717            .enable_store()
718            .build();
719        assert_eq!(2, meta_client.id());
720        assert_eq!(2, meta_client.id());
721        let _ = meta_client.heartbeat_client().unwrap();
722        let _ = meta_client.store_client().unwrap();
723        meta_client.start(urls).await.unwrap();
724    }
725
726    #[tokio::test]
727    async fn test_not_start_heartbeat_client() {
728        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
729        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
730            .enable_store()
731            .build();
732        meta_client.start(urls).await.unwrap();
733        let res = meta_client.ask_leader().await;
734        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
735    }
736
737    #[tokio::test]
738    async fn test_not_start_store_client() {
739        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
740        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
741            .enable_heartbeat()
742            .build();
743
744        meta_client.start(urls).await.unwrap();
745        let res = meta_client.put(PutRequest::default()).await;
746        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
747    }
748
749    #[tokio::test]
750    async fn test_ask_leader() {
751        let tc = new_client("test_ask_leader").await;
752        tc.client.ask_leader().await.unwrap();
753    }
754
755    #[tokio::test]
756    async fn test_heartbeat() {
757        let tc = new_client("test_heartbeat").await;
758        let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
759        // send heartbeats
760
761        let request_sent = Arc::new(AtomicUsize::new(0));
762        let request_sent_clone = request_sent.clone();
763        let _handle = tokio::spawn(async move {
764            for _ in 0..5 {
765                let req = HeartbeatRequest {
766                    peer: Some(Peer {
767                        id: 1,
768                        addr: "meta_client_peer".to_string(),
769                    }),
770                    ..Default::default()
771                };
772                sender.send(req).await.unwrap();
773                request_sent_clone.fetch_add(1, Ordering::Relaxed);
774            }
775        });
776
777        let heartbeat_count = Arc::new(AtomicUsize::new(0));
778        let heartbeat_count_clone = heartbeat_count.clone();
779        let handle = tokio::spawn(async move {
780            while let Some(_resp) = receiver.message().await.unwrap() {
781                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
782            }
783        });
784
785        handle.await.unwrap();
786        //+1 for the initial response
787        assert_eq!(
788            request_sent.load(Ordering::Relaxed) + 1,
789            heartbeat_count.load(Ordering::Relaxed)
790        );
791    }
792
793    #[tokio::test]
794    async fn test_range_get() {
795        let tc = new_client("test_range_get").await;
796        tc.gen_data().await;
797
798        let key = tc.key("key-0");
799        let req = RangeRequest::new().with_key(key.as_slice());
800        let res = tc.client.range(req).await;
801        let mut kvs = res.unwrap().take_kvs();
802        assert_eq!(1, kvs.len());
803        let mut kv = kvs.pop().unwrap();
804        assert_eq!(key, kv.take_key());
805        assert_eq!(b"value-0".to_vec(), kv.take_value());
806    }
807
808    #[tokio::test]
809    async fn test_range_get_prefix() {
810        let tc = new_client("test_range_get_prefix").await;
811        tc.gen_data().await;
812
813        let req = RangeRequest::new().with_prefix(tc.key("key-"));
814        let res = tc.client.range(req).await;
815        let kvs = res.unwrap().take_kvs();
816        assert_eq!(10, kvs.len());
817        for (i, mut kv) in kvs.into_iter().enumerate() {
818            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
819            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
820        }
821    }
822
823    #[tokio::test]
824    async fn test_range() {
825        let tc = new_client("test_range").await;
826        tc.gen_data().await;
827
828        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
829        let res = tc.client.range(req).await;
830        let kvs = res.unwrap().take_kvs();
831        assert_eq!(3, kvs.len());
832        for (i, mut kv) in kvs.into_iter().enumerate() {
833            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
834            assert_eq!(
835                format!("{}-{}", "value", i + 5).into_bytes(),
836                kv.take_value()
837            );
838        }
839    }
840
841    #[tokio::test]
842    async fn test_range_keys_only() {
843        let tc = new_client("test_range_keys_only").await;
844        tc.gen_data().await;
845
846        let req = RangeRequest::new()
847            .with_range(tc.key("key-5"), tc.key("key-8"))
848            .with_keys_only();
849        let res = tc.client.range(req).await;
850        let kvs = res.unwrap().take_kvs();
851        assert_eq!(3, kvs.len());
852        for (i, mut kv) in kvs.into_iter().enumerate() {
853            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
854            assert!(kv.take_value().is_empty());
855        }
856    }
857
858    #[tokio::test]
859    async fn test_put() {
860        let tc = new_client("test_put").await;
861
862        let req = PutRequest::new()
863            .with_key(tc.key("key"))
864            .with_value(b"value".to_vec());
865        let res = tc.client.put(req).await;
866        assert!(res.unwrap().prev_kv.is_none());
867    }
868
869    #[tokio::test]
870    async fn test_put_with_prev_kv() {
871        let tc = new_client("test_put_with_prev_kv").await;
872
873        let key = tc.key("key");
874        let req = PutRequest::new()
875            .with_key(key.as_slice())
876            .with_value(b"value".to_vec())
877            .with_prev_kv();
878        let res = tc.client.put(req).await;
879        assert!(res.unwrap().prev_kv.is_none());
880
881        let req = PutRequest::new()
882            .with_key(key.as_slice())
883            .with_value(b"value1".to_vec())
884            .with_prev_kv();
885        let res = tc.client.put(req).await;
886        let mut kv = res.unwrap().prev_kv.unwrap();
887        assert_eq!(key, kv.take_key());
888        assert_eq!(b"value".to_vec(), kv.take_value());
889    }
890
891    #[tokio::test]
892    async fn test_batch_put() {
893        let tc = new_client("test_batch_put").await;
894
895        let mut req = BatchPutRequest::new();
896        for i in 0..275 {
897            req = req.add_kv(
898                tc.key(&format!("key-{}", i)),
899                format!("value-{}", i).into_bytes(),
900            );
901        }
902
903        let res = tc.client.batch_put(req).await;
904        assert_eq!(0, res.unwrap().take_prev_kvs().len());
905
906        let req = RangeRequest::new().with_prefix(tc.key("key-"));
907        let res = tc.client.range(req).await;
908        let kvs = res.unwrap().take_kvs();
909        assert_eq!(275, kvs.len());
910    }
911
912    #[tokio::test]
913    async fn test_batch_get() {
914        let tc = new_client("test_batch_get").await;
915        tc.gen_data().await;
916
917        let mut req = BatchGetRequest::default();
918        for i in 0..256 {
919            req = req.add_key(tc.key(&format!("key-{}", i)));
920        }
921        let res = tc.client.batch_get(req).await.unwrap();
922        assert_eq!(10, res.kvs.len());
923
924        let req = BatchGetRequest::default()
925            .add_key(tc.key("key-1"))
926            .add_key(tc.key("key-999"));
927        let res = tc.client.batch_get(req).await.unwrap();
928        assert_eq!(1, res.kvs.len());
929    }
930
931    #[tokio::test]
932    async fn test_batch_put_with_prev_kv() {
933        let tc = new_client("test_batch_put_with_prev_kv").await;
934
935        let key = tc.key("key");
936        let key2 = tc.key("key2");
937        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
938        let res = tc.client.batch_put(req).await;
939        assert_eq!(0, res.unwrap().take_prev_kvs().len());
940
941        let req = BatchPutRequest::new()
942            .add_kv(key.as_slice(), b"value-".to_vec())
943            .add_kv(key2.as_slice(), b"value2-".to_vec())
944            .with_prev_kv();
945        let res = tc.client.batch_put(req).await;
946        let mut kvs = res.unwrap().take_prev_kvs();
947        assert_eq!(1, kvs.len());
948        let mut kv = kvs.pop().unwrap();
949        assert_eq!(key, kv.take_key());
950        assert_eq!(b"value".to_vec(), kv.take_value());
951    }
952
953    #[tokio::test]
954    async fn test_compare_and_put() {
955        let tc = new_client("test_compare_and_put").await;
956
957        let key = tc.key("key");
958        let req = CompareAndPutRequest::new()
959            .with_key(key.as_slice())
960            .with_expect(b"expect".to_vec())
961            .with_value(b"value".to_vec());
962        let res = tc.client.compare_and_put(req).await;
963        assert!(!res.unwrap().is_success());
964
965        // create if absent
966        let req = CompareAndPutRequest::new()
967            .with_key(key.as_slice())
968            .with_value(b"value".to_vec());
969        let res = tc.client.compare_and_put(req).await;
970        let mut res = res.unwrap();
971        assert!(res.is_success());
972        assert!(res.take_prev_kv().is_none());
973
974        // compare and put fail
975        let req = CompareAndPutRequest::new()
976            .with_key(key.as_slice())
977            .with_expect(b"not_eq".to_vec())
978            .with_value(b"value2".to_vec());
979        let res = tc.client.compare_and_put(req).await;
980        let mut res = res.unwrap();
981        assert!(!res.is_success());
982        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
983
984        // compare and put success
985        let req = CompareAndPutRequest::new()
986            .with_key(key.as_slice())
987            .with_expect(b"value".to_vec())
988            .with_value(b"value2".to_vec());
989        let res = tc.client.compare_and_put(req).await;
990        let mut res = res.unwrap();
991        assert!(res.is_success());
992
993        // If compare-and-put is success, previous value doesn't need to be returned.
994        assert!(res.take_prev_kv().is_none());
995    }
996
997    #[tokio::test]
998    async fn test_delete_with_key() {
999        let tc = new_client("test_delete_with_key").await;
1000        tc.gen_data().await;
1001
1002        let req = DeleteRangeRequest::new()
1003            .with_key(tc.key("key-0"))
1004            .with_prev_kv();
1005        let res = tc.client.delete_range(req).await;
1006        let mut res = res.unwrap();
1007        assert_eq!(1, res.deleted());
1008        let mut kvs = res.take_prev_kvs();
1009        assert_eq!(1, kvs.len());
1010        let mut kv = kvs.pop().unwrap();
1011        assert_eq!(b"value-0".to_vec(), kv.take_value());
1012    }
1013
1014    #[tokio::test]
1015    async fn test_delete_with_prefix() {
1016        let tc = new_client("test_delete_with_prefix").await;
1017        tc.gen_data().await;
1018
1019        let req = DeleteRangeRequest::new()
1020            .with_prefix(tc.key("key-"))
1021            .with_prev_kv();
1022        let res = tc.client.delete_range(req).await;
1023        let mut res = res.unwrap();
1024        assert_eq!(10, res.deleted());
1025        let kvs = res.take_prev_kvs();
1026        assert_eq!(10, kvs.len());
1027        for (i, mut kv) in kvs.into_iter().enumerate() {
1028            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1029        }
1030    }
1031
1032    #[tokio::test]
1033    async fn test_delete_with_range() {
1034        let tc = new_client("test_delete_with_range").await;
1035        tc.gen_data().await;
1036
1037        let req = DeleteRangeRequest::new()
1038            .with_range(tc.key("key-2"), tc.key("key-7"))
1039            .with_prev_kv();
1040        let res = tc.client.delete_range(req).await;
1041        let mut res = res.unwrap();
1042        assert_eq!(5, res.deleted());
1043        let kvs = res.take_prev_kvs();
1044        assert_eq!(5, kvs.len());
1045        for (i, mut kv) in kvs.into_iter().enumerate() {
1046            assert_eq!(
1047                format!("{}-{}", "value", i + 2).into_bytes(),
1048                kv.take_value()
1049            );
1050        }
1051    }
1052
1053    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1054        Ok(())
1055    }
1056
1057    #[tokio::test]
1058    async fn test_cluster_client_adaptive_range() {
1059        let tx = new_client("test_cluster_client").await;
1060        let in_memory = tx.in_memory().unwrap();
1061        let cluster_client = tx.client.cluster_client().unwrap();
1062        let mut rng = rand::rng();
1063
1064        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1065        for i in 0..10 {
1066            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1067            in_memory
1068                .put(
1069                    PutRequest::new()
1070                        .with_key(format!("__prefix/{i}").as_bytes())
1071                        .with_value(data.clone()),
1072                )
1073                .await
1074                .unwrap();
1075        }
1076
1077        let req = RangeRequest::new().with_prefix(b"__prefix/");
1078        let stream =
1079            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1080
1081        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1082        assert_eq!(10, res.len());
1083    }
1084}