meta_client/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role};
28pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34    ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::error::{
38    self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
39};
40use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
41use common_meta::kv_backend::KvBackendRef;
42use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
45use common_meta::rpc::procedure::{
46    AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
47    RemoveRegionFollowerRequest,
48};
49use common_meta::rpc::store::{
50    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
51    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
52    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
53};
54use common_meta::rpc::KeyValue;
55use common_telemetry::info;
56use futures::TryStreamExt;
57use heartbeat::Client as HeartbeatClient;
58use procedure::Client as ProcedureClient;
59use snafu::{OptionExt, ResultExt};
60use store::Client as StoreClient;
61
62pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
63use crate::error::{
64    ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
65    Result,
66};
67
68pub type Id = u64;
69
70const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
71const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
72const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
73
74#[derive(Clone, Debug, Default)]
75pub struct MetaClientBuilder {
76    id: Id,
77    role: Role,
78    enable_heartbeat: bool,
79    enable_store: bool,
80    enable_procedure: bool,
81    enable_access_cluster_info: bool,
82    region_follower: Option<RegionFollowerClientRef>,
83    channel_manager: Option<ChannelManager>,
84    ddl_channel_manager: Option<ChannelManager>,
85    heartbeat_channel_manager: Option<ChannelManager>,
86}
87
88impl MetaClientBuilder {
89    pub fn new(member_id: u64, role: Role) -> Self {
90        Self {
91            id: member_id,
92            role,
93            ..Default::default()
94        }
95    }
96
97    /// Returns the role of Frontend's default options.
98    pub fn frontend_default_options() -> Self {
99        // Frontend does not need a member id.
100        Self::new(0, Role::Frontend)
101            .enable_store()
102            .enable_heartbeat()
103            .enable_procedure()
104            .enable_access_cluster_info()
105    }
106
107    /// Returns the role of Datanode's default options.
108    pub fn datanode_default_options(member_id: u64) -> Self {
109        Self::new(member_id, Role::Datanode)
110            .enable_store()
111            .enable_heartbeat()
112    }
113
114    /// Returns the role of Flownode's default options.
115    pub fn flownode_default_options(member_id: u64) -> Self {
116        Self::new(member_id, Role::Flownode)
117            .enable_store()
118            .enable_heartbeat()
119            .enable_procedure()
120            .enable_access_cluster_info()
121    }
122
123    pub fn enable_heartbeat(self) -> Self {
124        Self {
125            enable_heartbeat: true,
126            ..self
127        }
128    }
129
130    pub fn enable_store(self) -> Self {
131        Self {
132            enable_store: true,
133            ..self
134        }
135    }
136
137    pub fn enable_procedure(self) -> Self {
138        Self {
139            enable_procedure: true,
140            ..self
141        }
142    }
143
144    pub fn enable_access_cluster_info(self) -> Self {
145        Self {
146            enable_access_cluster_info: true,
147            ..self
148        }
149    }
150
151    pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
152        Self {
153            channel_manager: Some(channel_manager),
154            ..self
155        }
156    }
157
158    pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
159        Self {
160            ddl_channel_manager: Some(channel_manager),
161            ..self
162        }
163    }
164
165    pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
166        Self {
167            heartbeat_channel_manager: Some(channel_manager),
168            ..self
169        }
170    }
171
172    pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
173        Self {
174            region_follower: Some(region_follower),
175            ..self
176        }
177    }
178
179    pub fn build(self) -> MetaClient {
180        let mut client = if let Some(mgr) = self.channel_manager {
181            MetaClient::with_channel_manager(self.id, mgr)
182        } else {
183            MetaClient::new(self.id)
184        };
185
186        let mgr = client.channel_manager.clone();
187
188        if self.enable_heartbeat {
189            let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
190            client.heartbeat = Some(HeartbeatClient::new(
191                self.id,
192                self.role,
193                mgr,
194                DEFAULT_ASK_LEADER_MAX_RETRY,
195            ));
196        }
197
198        if self.enable_store {
199            client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
200        }
201
202        if self.enable_procedure {
203            let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
204            client.procedure = Some(ProcedureClient::new(
205                self.id,
206                self.role,
207                mgr,
208                DEFAULT_SUBMIT_DDL_MAX_RETRY,
209            ));
210        }
211
212        if self.enable_access_cluster_info {
213            client.cluster = Some(ClusterClient::new(
214                self.id,
215                self.role,
216                mgr,
217                DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
218            ))
219        }
220
221        if let Some(region_follower) = self.region_follower {
222            client.region_follower = Some(region_follower);
223        }
224
225        client
226    }
227}
228
229#[derive(Debug, Default)]
230pub struct MetaClient {
231    id: Id,
232    channel_manager: ChannelManager,
233    heartbeat: Option<HeartbeatClient>,
234    store: Option<StoreClient>,
235    procedure: Option<ProcedureClient>,
236    cluster: Option<ClusterClient>,
237    region_follower: Option<RegionFollowerClientRef>,
238}
239
240pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
241
242/// A trait for clients that can manage region followers.
243#[async_trait::async_trait]
244pub trait RegionFollowerClient: Sync + Send + Debug {
245    async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
246
247    async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
248
249    async fn start(&self, urls: &[&str]) -> Result<()>;
250
251    async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
252}
253
254#[async_trait::async_trait]
255impl ProcedureExecutor for MetaClient {
256    async fn submit_ddl_task(
257        &self,
258        _ctx: &ExecutorContext,
259        request: SubmitDdlTaskRequest,
260    ) -> MetaResult<SubmitDdlTaskResponse> {
261        self.submit_ddl_task(request)
262            .await
263            .map_err(BoxedError::new)
264            .context(meta_error::ExternalSnafu)
265    }
266
267    async fn migrate_region(
268        &self,
269        _ctx: &ExecutorContext,
270        request: MigrateRegionRequest,
271    ) -> MetaResult<MigrateRegionResponse> {
272        self.migrate_region(request)
273            .await
274            .map_err(BoxedError::new)
275            .context(meta_error::ExternalSnafu)
276    }
277
278    async fn reconcile(
279        &self,
280        _ctx: &ExecutorContext,
281        request: ReconcileRequest,
282    ) -> MetaResult<ReconcileResponse> {
283        self.reconcile(request)
284            .await
285            .map_err(BoxedError::new)
286            .context(meta_error::ExternalSnafu)
287    }
288
289    async fn add_region_follower(
290        &self,
291        _ctx: &ExecutorContext,
292        request: AddRegionFollowerRequest,
293    ) -> MetaResult<()> {
294        if let Some(region_follower) = &self.region_follower {
295            region_follower
296                .add_region_follower(request)
297                .await
298                .map_err(BoxedError::new)
299                .context(meta_error::ExternalSnafu)
300        } else {
301            UnsupportedSnafu {
302                operation: "add_region_follower",
303            }
304            .fail()
305        }
306    }
307
308    async fn remove_region_follower(
309        &self,
310        _ctx: &ExecutorContext,
311        request: RemoveRegionFollowerRequest,
312    ) -> MetaResult<()> {
313        if let Some(region_follower) = &self.region_follower {
314            region_follower
315                .remove_region_follower(request)
316                .await
317                .map_err(BoxedError::new)
318                .context(meta_error::ExternalSnafu)
319        } else {
320            UnsupportedSnafu {
321                operation: "remove_region_follower",
322            }
323            .fail()
324        }
325    }
326
327    async fn query_procedure_state(
328        &self,
329        _ctx: &ExecutorContext,
330        pid: &str,
331    ) -> MetaResult<ProcedureStateResponse> {
332        self.query_procedure_state(pid)
333            .await
334            .map_err(BoxedError::new)
335            .context(meta_error::ExternalSnafu)
336    }
337
338    async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
339        self.procedure_client()
340            .map_err(BoxedError::new)
341            .context(meta_error::ExternalSnafu)?
342            .list_procedures()
343            .await
344            .map_err(BoxedError::new)
345            .context(meta_error::ExternalSnafu)
346    }
347}
348
349#[async_trait::async_trait]
350impl ClusterInfo for MetaClient {
351    type Error = Error;
352
353    async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
354        let cluster_client = self.cluster_client()?;
355
356        let (get_metasrv_nodes, nodes_key_prefix) = match role {
357            None => (true, Some(NodeInfoKey::key_prefix())),
358            Some(ClusterRole::Metasrv) => (true, None),
359            Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
360        };
361
362        let mut nodes = if get_metasrv_nodes {
363            let last_activity_ts = -1; // Metasrv does not provide this information.
364
365            let (leader, followers) = cluster_client.get_metasrv_peers().await?;
366            followers
367                .into_iter()
368                .map(|node| NodeInfo {
369                    peer: node.peer.unwrap_or_default(),
370                    last_activity_ts,
371                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
372                    version: node.version,
373                    git_commit: node.git_commit,
374                    start_time_ms: node.start_time_ms,
375                })
376                .chain(leader.into_iter().map(|node| NodeInfo {
377                    peer: node.peer.unwrap_or_default(),
378                    last_activity_ts,
379                    status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
380                    version: node.version,
381                    git_commit: node.git_commit,
382                    start_time_ms: node.start_time_ms,
383                }))
384                .collect::<Vec<_>>()
385        } else {
386            Vec::new()
387        };
388
389        if let Some(prefix) = nodes_key_prefix {
390            let req = RangeRequest::new().with_prefix(prefix);
391            let res = cluster_client.range(req).await?;
392            for kv in res.kvs {
393                nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
394            }
395        }
396
397        Ok(nodes)
398    }
399
400    async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
401        let cluster_kv_backend = Arc::new(self.cluster_client()?);
402        let range_prefix = DatanodeStatKey::prefix_key();
403        let req = RangeRequest::new().with_prefix(range_prefix);
404        let stream =
405            PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
406        let mut datanode_stats = stream
407            .try_collect::<Vec<_>>()
408            .await
409            .context(ConvertMetaResponseSnafu)?;
410        let region_stats = datanode_stats
411            .iter_mut()
412            .flat_map(|datanode_stat| {
413                let last = datanode_stat.stats.pop();
414                last.map(|stat| stat.region_stats).unwrap_or_default()
415            })
416            .collect::<Vec<_>>();
417
418        Ok(region_stats)
419    }
420
421    async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
422        let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
423        let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
424        let flow_state_manager = FlowStateManager::new(cluster_backend);
425        let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
426
427        Ok(res.map(|r| r.into()))
428    }
429}
430
431fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
432    DatanodeStatValue::try_from(kv.value)
433        .map_err(BoxedError::new)
434        .context(ExternalSnafu)
435}
436
437impl MetaClient {
438    pub fn new(id: Id) -> Self {
439        Self {
440            id,
441            ..Default::default()
442        }
443    }
444
445    pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
446        Self {
447            id,
448            channel_manager,
449            ..Default::default()
450        }
451    }
452
453    pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
454    where
455        U: AsRef<str>,
456        A: AsRef<[U]> + Clone,
457    {
458        info!("MetaClient channel config: {:?}", self.channel_config());
459
460        if let Some(client) = &mut self.region_follower {
461            let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
462            client.start(&urls).await?;
463            info!("Region follower client started");
464        }
465        if let Some(client) = &mut self.heartbeat {
466            client.start(urls.clone()).await?;
467            info!("Heartbeat client started");
468        }
469        if let Some(client) = &mut self.store {
470            client.start(urls.clone()).await?;
471            info!("Store client started");
472        }
473        if let Some(client) = &mut self.procedure {
474            client.start(urls.clone()).await?;
475            info!("DDL client started");
476        }
477        if let Some(client) = &mut self.cluster {
478            client.start(urls).await?;
479            info!("Cluster client started");
480        }
481
482        Ok(())
483    }
484
485    /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
486    pub(crate) async fn start_with<U, A>(
487        &mut self,
488        leader_provider: LeaderProviderRef,
489        peers: A,
490    ) -> Result<()>
491    where
492        U: AsRef<str>,
493        A: AsRef<[U]> + Clone,
494    {
495        if let Some(client) = &self.region_follower {
496            info!("Starting region follower client ...");
497            client.start_with(leader_provider.clone()).await?;
498        }
499
500        if let Some(client) = &self.heartbeat {
501            info!("Starting heartbeat client ...");
502            client.start_with(leader_provider.clone()).await?;
503        }
504
505        if let Some(client) = &mut self.store {
506            info!("Starting store client ...");
507            client.start(peers.clone()).await?;
508        }
509
510        if let Some(client) = &self.procedure {
511            info!("Starting procedure client ...");
512            client.start_with(leader_provider.clone()).await?;
513        }
514
515        if let Some(client) = &mut self.cluster {
516            info!("Starting cluster client ...");
517            client.start_with(leader_provider).await?;
518        }
519        Ok(())
520    }
521
522    /// Ask the leader address of `metasrv`, and the heartbeat component
523    /// needs to create a bidirectional streaming to the leader.
524    pub async fn ask_leader(&self) -> Result<String> {
525        self.heartbeat_client()?.ask_leader().await
526    }
527
528    /// Returns a heartbeat bidirectional streaming: (sender, recever), the
529    /// other end is the leader of `metasrv`.
530    ///
531    /// The `datanode` needs to use the sender to continuously send heartbeat
532    /// packets (some self-state data), and the receiver can receive a response
533    /// from "metasrv" (which may contain some scheduling instructions).
534    pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
535        self.heartbeat_client()?.heartbeat().await
536    }
537
538    /// Range gets the keys in the range from the key-value store.
539    pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
540        self.store_client()?
541            .range(req.into())
542            .await?
543            .try_into()
544            .context(ConvertMetaResponseSnafu)
545    }
546
547    /// Put puts the given key into the key-value store.
548    pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
549        self.store_client()?
550            .put(req.into())
551            .await?
552            .try_into()
553            .context(ConvertMetaResponseSnafu)
554    }
555
556    /// BatchGet atomically get values by the given keys from the key-value store.
557    pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
558        self.store_client()?
559            .batch_get(req.into())
560            .await?
561            .try_into()
562            .context(ConvertMetaResponseSnafu)
563    }
564
565    /// BatchPut atomically puts the given keys into the key-value store.
566    pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
567        self.store_client()?
568            .batch_put(req.into())
569            .await?
570            .try_into()
571            .context(ConvertMetaResponseSnafu)
572    }
573
574    /// BatchDelete atomically deletes the given keys from the key-value store.
575    pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
576        self.store_client()?
577            .batch_delete(req.into())
578            .await?
579            .try_into()
580            .context(ConvertMetaResponseSnafu)
581    }
582
583    /// CompareAndPut atomically puts the value to the given updated
584    /// value if the current value == the expected value.
585    pub async fn compare_and_put(
586        &self,
587        req: CompareAndPutRequest,
588    ) -> Result<CompareAndPutResponse> {
589        self.store_client()?
590            .compare_and_put(req.into())
591            .await?
592            .try_into()
593            .context(ConvertMetaResponseSnafu)
594    }
595
596    /// DeleteRange deletes the given range from the key-value store.
597    pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
598        self.store_client()?
599            .delete_range(req.into())
600            .await?
601            .try_into()
602            .context(ConvertMetaResponseSnafu)
603    }
604
605    /// Query the procedure state by its id.
606    pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
607        self.procedure_client()?.query_procedure_state(pid).await
608    }
609
610    /// Submit a region migration task.
611    pub async fn migrate_region(
612        &self,
613        request: MigrateRegionRequest,
614    ) -> Result<MigrateRegionResponse> {
615        self.procedure_client()?
616            .migrate_region(
617                request.region_id,
618                request.from_peer,
619                request.to_peer,
620                request.timeout,
621            )
622            .await
623    }
624
625    /// Reconcile the procedure state.
626    pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
627        self.procedure_client()?.reconcile(request).await
628    }
629
630    /// Submit a DDL task
631    pub async fn submit_ddl_task(
632        &self,
633        req: SubmitDdlTaskRequest,
634    ) -> Result<SubmitDdlTaskResponse> {
635        let res = self
636            .procedure_client()?
637            .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
638            .await?
639            .try_into()
640            .context(ConvertMetaResponseSnafu)?;
641
642        Ok(res)
643    }
644
645    pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
646        self.heartbeat.clone().context(NotStartedSnafu {
647            name: "heartbeat_client",
648        })
649    }
650
651    pub fn store_client(&self) -> Result<StoreClient> {
652        self.store.clone().context(NotStartedSnafu {
653            name: "store_client",
654        })
655    }
656
657    pub fn procedure_client(&self) -> Result<ProcedureClient> {
658        self.procedure.clone().context(NotStartedSnafu {
659            name: "procedure_client",
660        })
661    }
662
663    pub fn cluster_client(&self) -> Result<ClusterClient> {
664        self.cluster.clone().context(NotStartedSnafu {
665            name: "cluster_client",
666        })
667    }
668
669    pub fn channel_config(&self) -> &ChannelConfig {
670        self.channel_manager.config()
671    }
672
673    pub fn id(&self) -> Id {
674        self.id
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use std::sync::atomic::{AtomicUsize, Ordering};
681
682    use api::v1::meta::{HeartbeatRequest, Peer};
683    use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
684    use rand::Rng;
685
686    use super::*;
687    use crate::error;
688    use crate::mocks::{self, MockMetaContext};
689
690    const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
691
692    struct TestClient {
693        ns: String,
694        client: MetaClient,
695        meta_ctx: MockMetaContext,
696    }
697
698    impl TestClient {
699        async fn new(ns: impl Into<String>) -> Self {
700            // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
701            let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
702            Self {
703                ns: ns.into(),
704                client,
705                meta_ctx,
706            }
707        }
708
709        fn key(&self, name: &str) -> Vec<u8> {
710            format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
711        }
712
713        async fn gen_data(&self) {
714            for i in 0..10 {
715                let req = PutRequest::new()
716                    .with_key(self.key(&format!("key-{i}")))
717                    .with_value(format!("{}-{}", "value", i).into_bytes())
718                    .with_prev_kv();
719                let res = self.client.put(req).await;
720                let _ = res.unwrap();
721            }
722        }
723
724        async fn clear_data(&self) {
725            let req =
726                DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
727            let res = self.client.delete_range(req).await;
728            let _ = res.unwrap();
729        }
730
731        #[allow(dead_code)]
732        fn kv_backend(&self) -> KvBackendRef {
733            self.meta_ctx.kv_backend.clone()
734        }
735
736        fn in_memory(&self) -> Option<ResettableKvBackendRef> {
737            self.meta_ctx.in_memory.clone()
738        }
739    }
740
741    async fn new_client(ns: impl Into<String>) -> TestClient {
742        let client = TestClient::new(ns).await;
743        client.clear_data().await;
744        client
745    }
746
747    #[tokio::test]
748    async fn test_meta_client_builder() {
749        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
750
751        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
752            .enable_heartbeat()
753            .build();
754        let _ = meta_client.heartbeat_client().unwrap();
755        assert!(meta_client.store_client().is_err());
756        meta_client.start(urls).await.unwrap();
757
758        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
759        assert!(meta_client.heartbeat_client().is_err());
760        assert!(meta_client.store_client().is_err());
761        meta_client.start(urls).await.unwrap();
762
763        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
764            .enable_store()
765            .build();
766        assert!(meta_client.heartbeat_client().is_err());
767        let _ = meta_client.store_client().unwrap();
768        meta_client.start(urls).await.unwrap();
769
770        let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
771            .enable_heartbeat()
772            .enable_store()
773            .build();
774        assert_eq!(2, meta_client.id());
775        assert_eq!(2, meta_client.id());
776        let _ = meta_client.heartbeat_client().unwrap();
777        let _ = meta_client.store_client().unwrap();
778        meta_client.start(urls).await.unwrap();
779    }
780
781    #[tokio::test]
782    async fn test_not_start_heartbeat_client() {
783        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
784        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
785            .enable_store()
786            .build();
787        meta_client.start(urls).await.unwrap();
788        let res = meta_client.ask_leader().await;
789        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
790    }
791
792    #[tokio::test]
793    async fn test_not_start_store_client() {
794        let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
795        let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
796            .enable_heartbeat()
797            .build();
798
799        meta_client.start(urls).await.unwrap();
800        let res = meta_client.put(PutRequest::default()).await;
801        assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
802    }
803
804    #[tokio::test]
805    async fn test_ask_leader() {
806        let tc = new_client("test_ask_leader").await;
807        tc.client.ask_leader().await.unwrap();
808    }
809
810    #[tokio::test]
811    async fn test_heartbeat() {
812        let tc = new_client("test_heartbeat").await;
813        let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
814        // send heartbeats
815
816        let request_sent = Arc::new(AtomicUsize::new(0));
817        let request_sent_clone = request_sent.clone();
818        let _handle = tokio::spawn(async move {
819            for _ in 0..5 {
820                let req = HeartbeatRequest {
821                    peer: Some(Peer {
822                        id: 1,
823                        addr: "meta_client_peer".to_string(),
824                    }),
825                    ..Default::default()
826                };
827                sender.send(req).await.unwrap();
828                request_sent_clone.fetch_add(1, Ordering::Relaxed);
829            }
830        });
831
832        let heartbeat_count = Arc::new(AtomicUsize::new(0));
833        let heartbeat_count_clone = heartbeat_count.clone();
834        let handle = tokio::spawn(async move {
835            while let Some(_resp) = receiver.message().await.unwrap() {
836                heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
837            }
838        });
839
840        handle.await.unwrap();
841        //+1 for the initial response
842        assert_eq!(
843            request_sent.load(Ordering::Relaxed) + 1,
844            heartbeat_count.load(Ordering::Relaxed)
845        );
846    }
847
848    #[tokio::test]
849    async fn test_range_get() {
850        let tc = new_client("test_range_get").await;
851        tc.gen_data().await;
852
853        let key = tc.key("key-0");
854        let req = RangeRequest::new().with_key(key.as_slice());
855        let res = tc.client.range(req).await;
856        let mut kvs = res.unwrap().take_kvs();
857        assert_eq!(1, kvs.len());
858        let mut kv = kvs.pop().unwrap();
859        assert_eq!(key, kv.take_key());
860        assert_eq!(b"value-0".to_vec(), kv.take_value());
861    }
862
863    #[tokio::test]
864    async fn test_range_get_prefix() {
865        let tc = new_client("test_range_get_prefix").await;
866        tc.gen_data().await;
867
868        let req = RangeRequest::new().with_prefix(tc.key("key-"));
869        let res = tc.client.range(req).await;
870        let kvs = res.unwrap().take_kvs();
871        assert_eq!(10, kvs.len());
872        for (i, mut kv) in kvs.into_iter().enumerate() {
873            assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
874            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
875        }
876    }
877
878    #[tokio::test]
879    async fn test_range() {
880        let tc = new_client("test_range").await;
881        tc.gen_data().await;
882
883        let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
884        let res = tc.client.range(req).await;
885        let kvs = res.unwrap().take_kvs();
886        assert_eq!(3, kvs.len());
887        for (i, mut kv) in kvs.into_iter().enumerate() {
888            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
889            assert_eq!(
890                format!("{}-{}", "value", i + 5).into_bytes(),
891                kv.take_value()
892            );
893        }
894    }
895
896    #[tokio::test]
897    async fn test_range_keys_only() {
898        let tc = new_client("test_range_keys_only").await;
899        tc.gen_data().await;
900
901        let req = RangeRequest::new()
902            .with_range(tc.key("key-5"), tc.key("key-8"))
903            .with_keys_only();
904        let res = tc.client.range(req).await;
905        let kvs = res.unwrap().take_kvs();
906        assert_eq!(3, kvs.len());
907        for (i, mut kv) in kvs.into_iter().enumerate() {
908            assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
909            assert!(kv.take_value().is_empty());
910        }
911    }
912
913    #[tokio::test]
914    async fn test_put() {
915        let tc = new_client("test_put").await;
916
917        let req = PutRequest::new()
918            .with_key(tc.key("key"))
919            .with_value(b"value".to_vec());
920        let res = tc.client.put(req).await;
921        assert!(res.unwrap().prev_kv.is_none());
922    }
923
924    #[tokio::test]
925    async fn test_put_with_prev_kv() {
926        let tc = new_client("test_put_with_prev_kv").await;
927
928        let key = tc.key("key");
929        let req = PutRequest::new()
930            .with_key(key.as_slice())
931            .with_value(b"value".to_vec())
932            .with_prev_kv();
933        let res = tc.client.put(req).await;
934        assert!(res.unwrap().prev_kv.is_none());
935
936        let req = PutRequest::new()
937            .with_key(key.as_slice())
938            .with_value(b"value1".to_vec())
939            .with_prev_kv();
940        let res = tc.client.put(req).await;
941        let mut kv = res.unwrap().prev_kv.unwrap();
942        assert_eq!(key, kv.take_key());
943        assert_eq!(b"value".to_vec(), kv.take_value());
944    }
945
946    #[tokio::test]
947    async fn test_batch_put() {
948        let tc = new_client("test_batch_put").await;
949
950        let mut req = BatchPutRequest::new();
951        for i in 0..275 {
952            req = req.add_kv(
953                tc.key(&format!("key-{}", i)),
954                format!("value-{}", i).into_bytes(),
955            );
956        }
957
958        let res = tc.client.batch_put(req).await;
959        assert_eq!(0, res.unwrap().take_prev_kvs().len());
960
961        let req = RangeRequest::new().with_prefix(tc.key("key-"));
962        let res = tc.client.range(req).await;
963        let kvs = res.unwrap().take_kvs();
964        assert_eq!(275, kvs.len());
965    }
966
967    #[tokio::test]
968    async fn test_batch_get() {
969        let tc = new_client("test_batch_get").await;
970        tc.gen_data().await;
971
972        let mut req = BatchGetRequest::default();
973        for i in 0..256 {
974            req = req.add_key(tc.key(&format!("key-{}", i)));
975        }
976        let res = tc.client.batch_get(req).await.unwrap();
977        assert_eq!(10, res.kvs.len());
978
979        let req = BatchGetRequest::default()
980            .add_key(tc.key("key-1"))
981            .add_key(tc.key("key-999"));
982        let res = tc.client.batch_get(req).await.unwrap();
983        assert_eq!(1, res.kvs.len());
984    }
985
986    #[tokio::test]
987    async fn test_batch_put_with_prev_kv() {
988        let tc = new_client("test_batch_put_with_prev_kv").await;
989
990        let key = tc.key("key");
991        let key2 = tc.key("key2");
992        let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
993        let res = tc.client.batch_put(req).await;
994        assert_eq!(0, res.unwrap().take_prev_kvs().len());
995
996        let req = BatchPutRequest::new()
997            .add_kv(key.as_slice(), b"value-".to_vec())
998            .add_kv(key2.as_slice(), b"value2-".to_vec())
999            .with_prev_kv();
1000        let res = tc.client.batch_put(req).await;
1001        let mut kvs = res.unwrap().take_prev_kvs();
1002        assert_eq!(1, kvs.len());
1003        let mut kv = kvs.pop().unwrap();
1004        assert_eq!(key, kv.take_key());
1005        assert_eq!(b"value".to_vec(), kv.take_value());
1006    }
1007
1008    #[tokio::test]
1009    async fn test_compare_and_put() {
1010        let tc = new_client("test_compare_and_put").await;
1011
1012        let key = tc.key("key");
1013        let req = CompareAndPutRequest::new()
1014            .with_key(key.as_slice())
1015            .with_expect(b"expect".to_vec())
1016            .with_value(b"value".to_vec());
1017        let res = tc.client.compare_and_put(req).await;
1018        assert!(!res.unwrap().is_success());
1019
1020        // create if absent
1021        let req = CompareAndPutRequest::new()
1022            .with_key(key.as_slice())
1023            .with_value(b"value".to_vec());
1024        let res = tc.client.compare_and_put(req).await;
1025        let mut res = res.unwrap();
1026        assert!(res.is_success());
1027        assert!(res.take_prev_kv().is_none());
1028
1029        // compare and put fail
1030        let req = CompareAndPutRequest::new()
1031            .with_key(key.as_slice())
1032            .with_expect(b"not_eq".to_vec())
1033            .with_value(b"value2".to_vec());
1034        let res = tc.client.compare_and_put(req).await;
1035        let mut res = res.unwrap();
1036        assert!(!res.is_success());
1037        assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1038
1039        // compare and put success
1040        let req = CompareAndPutRequest::new()
1041            .with_key(key.as_slice())
1042            .with_expect(b"value".to_vec())
1043            .with_value(b"value2".to_vec());
1044        let res = tc.client.compare_and_put(req).await;
1045        let mut res = res.unwrap();
1046        assert!(res.is_success());
1047
1048        // If compare-and-put is success, previous value doesn't need to be returned.
1049        assert!(res.take_prev_kv().is_none());
1050    }
1051
1052    #[tokio::test]
1053    async fn test_delete_with_key() {
1054        let tc = new_client("test_delete_with_key").await;
1055        tc.gen_data().await;
1056
1057        let req = DeleteRangeRequest::new()
1058            .with_key(tc.key("key-0"))
1059            .with_prev_kv();
1060        let res = tc.client.delete_range(req).await;
1061        let mut res = res.unwrap();
1062        assert_eq!(1, res.deleted());
1063        let mut kvs = res.take_prev_kvs();
1064        assert_eq!(1, kvs.len());
1065        let mut kv = kvs.pop().unwrap();
1066        assert_eq!(b"value-0".to_vec(), kv.take_value());
1067    }
1068
1069    #[tokio::test]
1070    async fn test_delete_with_prefix() {
1071        let tc = new_client("test_delete_with_prefix").await;
1072        tc.gen_data().await;
1073
1074        let req = DeleteRangeRequest::new()
1075            .with_prefix(tc.key("key-"))
1076            .with_prev_kv();
1077        let res = tc.client.delete_range(req).await;
1078        let mut res = res.unwrap();
1079        assert_eq!(10, res.deleted());
1080        let kvs = res.take_prev_kvs();
1081        assert_eq!(10, kvs.len());
1082        for (i, mut kv) in kvs.into_iter().enumerate() {
1083            assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1084        }
1085    }
1086
1087    #[tokio::test]
1088    async fn test_delete_with_range() {
1089        let tc = new_client("test_delete_with_range").await;
1090        tc.gen_data().await;
1091
1092        let req = DeleteRangeRequest::new()
1093            .with_range(tc.key("key-2"), tc.key("key-7"))
1094            .with_prev_kv();
1095        let res = tc.client.delete_range(req).await;
1096        let mut res = res.unwrap();
1097        assert_eq!(5, res.deleted());
1098        let kvs = res.take_prev_kvs();
1099        assert_eq!(5, kvs.len());
1100        for (i, mut kv) in kvs.into_iter().enumerate() {
1101            assert_eq!(
1102                format!("{}-{}", "value", i + 2).into_bytes(),
1103                kv.take_value()
1104            );
1105        }
1106    }
1107
1108    fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1109        Ok(())
1110    }
1111
1112    #[tokio::test]
1113    async fn test_cluster_client_adaptive_range() {
1114        let tx = new_client("test_cluster_client").await;
1115        let in_memory = tx.in_memory().unwrap();
1116        let cluster_client = tx.client.cluster_client().unwrap();
1117        let mut rng = rand::rng();
1118
1119        // Generates rough 10MB data, which is larger than the default grpc message size limit.
1120        for i in 0..10 {
1121            let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1122            in_memory
1123                .put(
1124                    PutRequest::new()
1125                        .with_key(format!("__prefix/{i}").as_bytes())
1126                        .with_value(data.clone()),
1127                )
1128                .await
1129                .unwrap();
1130        }
1131
1132        let req = RangeRequest::new().with_prefix(b"__prefix/");
1133        let stream =
1134            PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1135
1136        let res = stream.try_collect::<Vec<_>>().await.unwrap();
1137        assert_eq!(10, res.len());
1138    }
1139}