1mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role};
28pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::error::{
38 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
39};
40use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
41use common_meta::kv_backend::KvBackendRef;
42use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
45use common_meta::rpc::procedure::{
46 AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
47 RemoveRegionFollowerRequest,
48};
49use common_meta::rpc::store::{
50 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
51 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
52 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
53};
54use common_meta::rpc::KeyValue;
55use common_telemetry::info;
56use futures::TryStreamExt;
57use heartbeat::Client as HeartbeatClient;
58use procedure::Client as ProcedureClient;
59use snafu::{OptionExt, ResultExt};
60use store::Client as StoreClient;
61
62pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
63use crate::error::{
64 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
65 Result,
66};
67
68pub type Id = u64;
69
70const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
71const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
72const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
73
74#[derive(Clone, Debug, Default)]
75pub struct MetaClientBuilder {
76 id: Id,
77 role: Role,
78 enable_heartbeat: bool,
79 enable_store: bool,
80 enable_procedure: bool,
81 enable_access_cluster_info: bool,
82 region_follower: Option<RegionFollowerClientRef>,
83 channel_manager: Option<ChannelManager>,
84 ddl_channel_manager: Option<ChannelManager>,
85 heartbeat_channel_manager: Option<ChannelManager>,
86}
87
88impl MetaClientBuilder {
89 pub fn new(member_id: u64, role: Role) -> Self {
90 Self {
91 id: member_id,
92 role,
93 ..Default::default()
94 }
95 }
96
97 pub fn frontend_default_options() -> Self {
99 Self::new(0, Role::Frontend)
101 .enable_store()
102 .enable_heartbeat()
103 .enable_procedure()
104 .enable_access_cluster_info()
105 }
106
107 pub fn datanode_default_options(member_id: u64) -> Self {
109 Self::new(member_id, Role::Datanode)
110 .enable_store()
111 .enable_heartbeat()
112 }
113
114 pub fn flownode_default_options(member_id: u64) -> Self {
116 Self::new(member_id, Role::Flownode)
117 .enable_store()
118 .enable_heartbeat()
119 .enable_procedure()
120 .enable_access_cluster_info()
121 }
122
123 pub fn enable_heartbeat(self) -> Self {
124 Self {
125 enable_heartbeat: true,
126 ..self
127 }
128 }
129
130 pub fn enable_store(self) -> Self {
131 Self {
132 enable_store: true,
133 ..self
134 }
135 }
136
137 pub fn enable_procedure(self) -> Self {
138 Self {
139 enable_procedure: true,
140 ..self
141 }
142 }
143
144 pub fn enable_access_cluster_info(self) -> Self {
145 Self {
146 enable_access_cluster_info: true,
147 ..self
148 }
149 }
150
151 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
152 Self {
153 channel_manager: Some(channel_manager),
154 ..self
155 }
156 }
157
158 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
159 Self {
160 ddl_channel_manager: Some(channel_manager),
161 ..self
162 }
163 }
164
165 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
166 Self {
167 heartbeat_channel_manager: Some(channel_manager),
168 ..self
169 }
170 }
171
172 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
173 Self {
174 region_follower: Some(region_follower),
175 ..self
176 }
177 }
178
179 pub fn build(self) -> MetaClient {
180 let mut client = if let Some(mgr) = self.channel_manager {
181 MetaClient::with_channel_manager(self.id, mgr)
182 } else {
183 MetaClient::new(self.id)
184 };
185
186 let mgr = client.channel_manager.clone();
187
188 if self.enable_heartbeat {
189 let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
190 client.heartbeat = Some(HeartbeatClient::new(
191 self.id,
192 self.role,
193 mgr,
194 DEFAULT_ASK_LEADER_MAX_RETRY,
195 ));
196 }
197
198 if self.enable_store {
199 client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
200 }
201
202 if self.enable_procedure {
203 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
204 client.procedure = Some(ProcedureClient::new(
205 self.id,
206 self.role,
207 mgr,
208 DEFAULT_SUBMIT_DDL_MAX_RETRY,
209 ));
210 }
211
212 if self.enable_access_cluster_info {
213 client.cluster = Some(ClusterClient::new(
214 self.id,
215 self.role,
216 mgr,
217 DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
218 ))
219 }
220
221 if let Some(region_follower) = self.region_follower {
222 client.region_follower = Some(region_follower);
223 }
224
225 client
226 }
227}
228
229#[derive(Debug, Default)]
230pub struct MetaClient {
231 id: Id,
232 channel_manager: ChannelManager,
233 heartbeat: Option<HeartbeatClient>,
234 store: Option<StoreClient>,
235 procedure: Option<ProcedureClient>,
236 cluster: Option<ClusterClient>,
237 region_follower: Option<RegionFollowerClientRef>,
238}
239
240pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
241
242#[async_trait::async_trait]
244pub trait RegionFollowerClient: Sync + Send + Debug {
245 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
246
247 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
248
249 async fn start(&self, urls: &[&str]) -> Result<()>;
250
251 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
252}
253
254#[async_trait::async_trait]
255impl ProcedureExecutor for MetaClient {
256 async fn submit_ddl_task(
257 &self,
258 _ctx: &ExecutorContext,
259 request: SubmitDdlTaskRequest,
260 ) -> MetaResult<SubmitDdlTaskResponse> {
261 self.submit_ddl_task(request)
262 .await
263 .map_err(BoxedError::new)
264 .context(meta_error::ExternalSnafu)
265 }
266
267 async fn migrate_region(
268 &self,
269 _ctx: &ExecutorContext,
270 request: MigrateRegionRequest,
271 ) -> MetaResult<MigrateRegionResponse> {
272 self.migrate_region(request)
273 .await
274 .map_err(BoxedError::new)
275 .context(meta_error::ExternalSnafu)
276 }
277
278 async fn reconcile(
279 &self,
280 _ctx: &ExecutorContext,
281 request: ReconcileRequest,
282 ) -> MetaResult<ReconcileResponse> {
283 self.reconcile(request)
284 .await
285 .map_err(BoxedError::new)
286 .context(meta_error::ExternalSnafu)
287 }
288
289 async fn add_region_follower(
290 &self,
291 _ctx: &ExecutorContext,
292 request: AddRegionFollowerRequest,
293 ) -> MetaResult<()> {
294 if let Some(region_follower) = &self.region_follower {
295 region_follower
296 .add_region_follower(request)
297 .await
298 .map_err(BoxedError::new)
299 .context(meta_error::ExternalSnafu)
300 } else {
301 UnsupportedSnafu {
302 operation: "add_region_follower",
303 }
304 .fail()
305 }
306 }
307
308 async fn remove_region_follower(
309 &self,
310 _ctx: &ExecutorContext,
311 request: RemoveRegionFollowerRequest,
312 ) -> MetaResult<()> {
313 if let Some(region_follower) = &self.region_follower {
314 region_follower
315 .remove_region_follower(request)
316 .await
317 .map_err(BoxedError::new)
318 .context(meta_error::ExternalSnafu)
319 } else {
320 UnsupportedSnafu {
321 operation: "remove_region_follower",
322 }
323 .fail()
324 }
325 }
326
327 async fn query_procedure_state(
328 &self,
329 _ctx: &ExecutorContext,
330 pid: &str,
331 ) -> MetaResult<ProcedureStateResponse> {
332 self.query_procedure_state(pid)
333 .await
334 .map_err(BoxedError::new)
335 .context(meta_error::ExternalSnafu)
336 }
337
338 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
339 self.procedure_client()
340 .map_err(BoxedError::new)
341 .context(meta_error::ExternalSnafu)?
342 .list_procedures()
343 .await
344 .map_err(BoxedError::new)
345 .context(meta_error::ExternalSnafu)
346 }
347}
348
349#[async_trait::async_trait]
350impl ClusterInfo for MetaClient {
351 type Error = Error;
352
353 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
354 let cluster_client = self.cluster_client()?;
355
356 let (get_metasrv_nodes, nodes_key_prefix) = match role {
357 None => (true, Some(NodeInfoKey::key_prefix())),
358 Some(ClusterRole::Metasrv) => (true, None),
359 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
360 };
361
362 let mut nodes = if get_metasrv_nodes {
363 let last_activity_ts = -1; let (leader, followers) = cluster_client.get_metasrv_peers().await?;
366 followers
367 .into_iter()
368 .map(|node| NodeInfo {
369 peer: node.peer.unwrap_or_default(),
370 last_activity_ts,
371 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
372 version: node.version,
373 git_commit: node.git_commit,
374 start_time_ms: node.start_time_ms,
375 })
376 .chain(leader.into_iter().map(|node| NodeInfo {
377 peer: node.peer.unwrap_or_default(),
378 last_activity_ts,
379 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
380 version: node.version,
381 git_commit: node.git_commit,
382 start_time_ms: node.start_time_ms,
383 }))
384 .collect::<Vec<_>>()
385 } else {
386 Vec::new()
387 };
388
389 if let Some(prefix) = nodes_key_prefix {
390 let req = RangeRequest::new().with_prefix(prefix);
391 let res = cluster_client.range(req).await?;
392 for kv in res.kvs {
393 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
394 }
395 }
396
397 Ok(nodes)
398 }
399
400 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
401 let cluster_kv_backend = Arc::new(self.cluster_client()?);
402 let range_prefix = DatanodeStatKey::prefix_key();
403 let req = RangeRequest::new().with_prefix(range_prefix);
404 let stream =
405 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
406 let mut datanode_stats = stream
407 .try_collect::<Vec<_>>()
408 .await
409 .context(ConvertMetaResponseSnafu)?;
410 let region_stats = datanode_stats
411 .iter_mut()
412 .flat_map(|datanode_stat| {
413 let last = datanode_stat.stats.pop();
414 last.map(|stat| stat.region_stats).unwrap_or_default()
415 })
416 .collect::<Vec<_>>();
417
418 Ok(region_stats)
419 }
420
421 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
422 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
423 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
424 let flow_state_manager = FlowStateManager::new(cluster_backend);
425 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
426
427 Ok(res.map(|r| r.into()))
428 }
429}
430
431fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
432 DatanodeStatValue::try_from(kv.value)
433 .map_err(BoxedError::new)
434 .context(ExternalSnafu)
435}
436
437impl MetaClient {
438 pub fn new(id: Id) -> Self {
439 Self {
440 id,
441 ..Default::default()
442 }
443 }
444
445 pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
446 Self {
447 id,
448 channel_manager,
449 ..Default::default()
450 }
451 }
452
453 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
454 where
455 U: AsRef<str>,
456 A: AsRef<[U]> + Clone,
457 {
458 info!("MetaClient channel config: {:?}", self.channel_config());
459
460 if let Some(client) = &mut self.region_follower {
461 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
462 client.start(&urls).await?;
463 info!("Region follower client started");
464 }
465 if let Some(client) = &mut self.heartbeat {
466 client.start(urls.clone()).await?;
467 info!("Heartbeat client started");
468 }
469 if let Some(client) = &mut self.store {
470 client.start(urls.clone()).await?;
471 info!("Store client started");
472 }
473 if let Some(client) = &mut self.procedure {
474 client.start(urls.clone()).await?;
475 info!("DDL client started");
476 }
477 if let Some(client) = &mut self.cluster {
478 client.start(urls).await?;
479 info!("Cluster client started");
480 }
481
482 Ok(())
483 }
484
485 pub(crate) async fn start_with<U, A>(
487 &mut self,
488 leader_provider: LeaderProviderRef,
489 peers: A,
490 ) -> Result<()>
491 where
492 U: AsRef<str>,
493 A: AsRef<[U]> + Clone,
494 {
495 if let Some(client) = &self.region_follower {
496 info!("Starting region follower client ...");
497 client.start_with(leader_provider.clone()).await?;
498 }
499
500 if let Some(client) = &self.heartbeat {
501 info!("Starting heartbeat client ...");
502 client.start_with(leader_provider.clone()).await?;
503 }
504
505 if let Some(client) = &mut self.store {
506 info!("Starting store client ...");
507 client.start(peers.clone()).await?;
508 }
509
510 if let Some(client) = &self.procedure {
511 info!("Starting procedure client ...");
512 client.start_with(leader_provider.clone()).await?;
513 }
514
515 if let Some(client) = &mut self.cluster {
516 info!("Starting cluster client ...");
517 client.start_with(leader_provider).await?;
518 }
519 Ok(())
520 }
521
522 pub async fn ask_leader(&self) -> Result<String> {
525 self.heartbeat_client()?.ask_leader().await
526 }
527
528 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
535 self.heartbeat_client()?.heartbeat().await
536 }
537
538 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
540 self.store_client()?
541 .range(req.into())
542 .await?
543 .try_into()
544 .context(ConvertMetaResponseSnafu)
545 }
546
547 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
549 self.store_client()?
550 .put(req.into())
551 .await?
552 .try_into()
553 .context(ConvertMetaResponseSnafu)
554 }
555
556 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
558 self.store_client()?
559 .batch_get(req.into())
560 .await?
561 .try_into()
562 .context(ConvertMetaResponseSnafu)
563 }
564
565 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
567 self.store_client()?
568 .batch_put(req.into())
569 .await?
570 .try_into()
571 .context(ConvertMetaResponseSnafu)
572 }
573
574 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
576 self.store_client()?
577 .batch_delete(req.into())
578 .await?
579 .try_into()
580 .context(ConvertMetaResponseSnafu)
581 }
582
583 pub async fn compare_and_put(
586 &self,
587 req: CompareAndPutRequest,
588 ) -> Result<CompareAndPutResponse> {
589 self.store_client()?
590 .compare_and_put(req.into())
591 .await?
592 .try_into()
593 .context(ConvertMetaResponseSnafu)
594 }
595
596 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
598 self.store_client()?
599 .delete_range(req.into())
600 .await?
601 .try_into()
602 .context(ConvertMetaResponseSnafu)
603 }
604
605 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
607 self.procedure_client()?.query_procedure_state(pid).await
608 }
609
610 pub async fn migrate_region(
612 &self,
613 request: MigrateRegionRequest,
614 ) -> Result<MigrateRegionResponse> {
615 self.procedure_client()?
616 .migrate_region(
617 request.region_id,
618 request.from_peer,
619 request.to_peer,
620 request.timeout,
621 )
622 .await
623 }
624
625 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
627 self.procedure_client()?.reconcile(request).await
628 }
629
630 pub async fn submit_ddl_task(
632 &self,
633 req: SubmitDdlTaskRequest,
634 ) -> Result<SubmitDdlTaskResponse> {
635 let res = self
636 .procedure_client()?
637 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
638 .await?
639 .try_into()
640 .context(ConvertMetaResponseSnafu)?;
641
642 Ok(res)
643 }
644
645 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
646 self.heartbeat.clone().context(NotStartedSnafu {
647 name: "heartbeat_client",
648 })
649 }
650
651 pub fn store_client(&self) -> Result<StoreClient> {
652 self.store.clone().context(NotStartedSnafu {
653 name: "store_client",
654 })
655 }
656
657 pub fn procedure_client(&self) -> Result<ProcedureClient> {
658 self.procedure.clone().context(NotStartedSnafu {
659 name: "procedure_client",
660 })
661 }
662
663 pub fn cluster_client(&self) -> Result<ClusterClient> {
664 self.cluster.clone().context(NotStartedSnafu {
665 name: "cluster_client",
666 })
667 }
668
669 pub fn channel_config(&self) -> &ChannelConfig {
670 self.channel_manager.config()
671 }
672
673 pub fn id(&self) -> Id {
674 self.id
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use std::sync::atomic::{AtomicUsize, Ordering};
681
682 use api::v1::meta::{HeartbeatRequest, Peer};
683 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
684 use rand::Rng;
685
686 use super::*;
687 use crate::error;
688 use crate::mocks::{self, MockMetaContext};
689
690 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
691
692 struct TestClient {
693 ns: String,
694 client: MetaClient,
695 meta_ctx: MockMetaContext,
696 }
697
698 impl TestClient {
699 async fn new(ns: impl Into<String>) -> Self {
700 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
702 Self {
703 ns: ns.into(),
704 client,
705 meta_ctx,
706 }
707 }
708
709 fn key(&self, name: &str) -> Vec<u8> {
710 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
711 }
712
713 async fn gen_data(&self) {
714 for i in 0..10 {
715 let req = PutRequest::new()
716 .with_key(self.key(&format!("key-{i}")))
717 .with_value(format!("{}-{}", "value", i).into_bytes())
718 .with_prev_kv();
719 let res = self.client.put(req).await;
720 let _ = res.unwrap();
721 }
722 }
723
724 async fn clear_data(&self) {
725 let req =
726 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
727 let res = self.client.delete_range(req).await;
728 let _ = res.unwrap();
729 }
730
731 #[allow(dead_code)]
732 fn kv_backend(&self) -> KvBackendRef {
733 self.meta_ctx.kv_backend.clone()
734 }
735
736 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
737 self.meta_ctx.in_memory.clone()
738 }
739 }
740
741 async fn new_client(ns: impl Into<String>) -> TestClient {
742 let client = TestClient::new(ns).await;
743 client.clear_data().await;
744 client
745 }
746
747 #[tokio::test]
748 async fn test_meta_client_builder() {
749 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
750
751 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
752 .enable_heartbeat()
753 .build();
754 let _ = meta_client.heartbeat_client().unwrap();
755 assert!(meta_client.store_client().is_err());
756 meta_client.start(urls).await.unwrap();
757
758 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
759 assert!(meta_client.heartbeat_client().is_err());
760 assert!(meta_client.store_client().is_err());
761 meta_client.start(urls).await.unwrap();
762
763 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
764 .enable_store()
765 .build();
766 assert!(meta_client.heartbeat_client().is_err());
767 let _ = meta_client.store_client().unwrap();
768 meta_client.start(urls).await.unwrap();
769
770 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
771 .enable_heartbeat()
772 .enable_store()
773 .build();
774 assert_eq!(2, meta_client.id());
775 assert_eq!(2, meta_client.id());
776 let _ = meta_client.heartbeat_client().unwrap();
777 let _ = meta_client.store_client().unwrap();
778 meta_client.start(urls).await.unwrap();
779 }
780
781 #[tokio::test]
782 async fn test_not_start_heartbeat_client() {
783 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
784 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
785 .enable_store()
786 .build();
787 meta_client.start(urls).await.unwrap();
788 let res = meta_client.ask_leader().await;
789 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
790 }
791
792 #[tokio::test]
793 async fn test_not_start_store_client() {
794 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
795 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
796 .enable_heartbeat()
797 .build();
798
799 meta_client.start(urls).await.unwrap();
800 let res = meta_client.put(PutRequest::default()).await;
801 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
802 }
803
804 #[tokio::test]
805 async fn test_ask_leader() {
806 let tc = new_client("test_ask_leader").await;
807 tc.client.ask_leader().await.unwrap();
808 }
809
810 #[tokio::test]
811 async fn test_heartbeat() {
812 let tc = new_client("test_heartbeat").await;
813 let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
814 let request_sent = Arc::new(AtomicUsize::new(0));
817 let request_sent_clone = request_sent.clone();
818 let _handle = tokio::spawn(async move {
819 for _ in 0..5 {
820 let req = HeartbeatRequest {
821 peer: Some(Peer {
822 id: 1,
823 addr: "meta_client_peer".to_string(),
824 }),
825 ..Default::default()
826 };
827 sender.send(req).await.unwrap();
828 request_sent_clone.fetch_add(1, Ordering::Relaxed);
829 }
830 });
831
832 let heartbeat_count = Arc::new(AtomicUsize::new(0));
833 let heartbeat_count_clone = heartbeat_count.clone();
834 let handle = tokio::spawn(async move {
835 while let Some(_resp) = receiver.message().await.unwrap() {
836 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
837 }
838 });
839
840 handle.await.unwrap();
841 assert_eq!(
843 request_sent.load(Ordering::Relaxed) + 1,
844 heartbeat_count.load(Ordering::Relaxed)
845 );
846 }
847
848 #[tokio::test]
849 async fn test_range_get() {
850 let tc = new_client("test_range_get").await;
851 tc.gen_data().await;
852
853 let key = tc.key("key-0");
854 let req = RangeRequest::new().with_key(key.as_slice());
855 let res = tc.client.range(req).await;
856 let mut kvs = res.unwrap().take_kvs();
857 assert_eq!(1, kvs.len());
858 let mut kv = kvs.pop().unwrap();
859 assert_eq!(key, kv.take_key());
860 assert_eq!(b"value-0".to_vec(), kv.take_value());
861 }
862
863 #[tokio::test]
864 async fn test_range_get_prefix() {
865 let tc = new_client("test_range_get_prefix").await;
866 tc.gen_data().await;
867
868 let req = RangeRequest::new().with_prefix(tc.key("key-"));
869 let res = tc.client.range(req).await;
870 let kvs = res.unwrap().take_kvs();
871 assert_eq!(10, kvs.len());
872 for (i, mut kv) in kvs.into_iter().enumerate() {
873 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
874 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
875 }
876 }
877
878 #[tokio::test]
879 async fn test_range() {
880 let tc = new_client("test_range").await;
881 tc.gen_data().await;
882
883 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
884 let res = tc.client.range(req).await;
885 let kvs = res.unwrap().take_kvs();
886 assert_eq!(3, kvs.len());
887 for (i, mut kv) in kvs.into_iter().enumerate() {
888 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
889 assert_eq!(
890 format!("{}-{}", "value", i + 5).into_bytes(),
891 kv.take_value()
892 );
893 }
894 }
895
896 #[tokio::test]
897 async fn test_range_keys_only() {
898 let tc = new_client("test_range_keys_only").await;
899 tc.gen_data().await;
900
901 let req = RangeRequest::new()
902 .with_range(tc.key("key-5"), tc.key("key-8"))
903 .with_keys_only();
904 let res = tc.client.range(req).await;
905 let kvs = res.unwrap().take_kvs();
906 assert_eq!(3, kvs.len());
907 for (i, mut kv) in kvs.into_iter().enumerate() {
908 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
909 assert!(kv.take_value().is_empty());
910 }
911 }
912
913 #[tokio::test]
914 async fn test_put() {
915 let tc = new_client("test_put").await;
916
917 let req = PutRequest::new()
918 .with_key(tc.key("key"))
919 .with_value(b"value".to_vec());
920 let res = tc.client.put(req).await;
921 assert!(res.unwrap().prev_kv.is_none());
922 }
923
924 #[tokio::test]
925 async fn test_put_with_prev_kv() {
926 let tc = new_client("test_put_with_prev_kv").await;
927
928 let key = tc.key("key");
929 let req = PutRequest::new()
930 .with_key(key.as_slice())
931 .with_value(b"value".to_vec())
932 .with_prev_kv();
933 let res = tc.client.put(req).await;
934 assert!(res.unwrap().prev_kv.is_none());
935
936 let req = PutRequest::new()
937 .with_key(key.as_slice())
938 .with_value(b"value1".to_vec())
939 .with_prev_kv();
940 let res = tc.client.put(req).await;
941 let mut kv = res.unwrap().prev_kv.unwrap();
942 assert_eq!(key, kv.take_key());
943 assert_eq!(b"value".to_vec(), kv.take_value());
944 }
945
946 #[tokio::test]
947 async fn test_batch_put() {
948 let tc = new_client("test_batch_put").await;
949
950 let mut req = BatchPutRequest::new();
951 for i in 0..275 {
952 req = req.add_kv(
953 tc.key(&format!("key-{}", i)),
954 format!("value-{}", i).into_bytes(),
955 );
956 }
957
958 let res = tc.client.batch_put(req).await;
959 assert_eq!(0, res.unwrap().take_prev_kvs().len());
960
961 let req = RangeRequest::new().with_prefix(tc.key("key-"));
962 let res = tc.client.range(req).await;
963 let kvs = res.unwrap().take_kvs();
964 assert_eq!(275, kvs.len());
965 }
966
967 #[tokio::test]
968 async fn test_batch_get() {
969 let tc = new_client("test_batch_get").await;
970 tc.gen_data().await;
971
972 let mut req = BatchGetRequest::default();
973 for i in 0..256 {
974 req = req.add_key(tc.key(&format!("key-{}", i)));
975 }
976 let res = tc.client.batch_get(req).await.unwrap();
977 assert_eq!(10, res.kvs.len());
978
979 let req = BatchGetRequest::default()
980 .add_key(tc.key("key-1"))
981 .add_key(tc.key("key-999"));
982 let res = tc.client.batch_get(req).await.unwrap();
983 assert_eq!(1, res.kvs.len());
984 }
985
986 #[tokio::test]
987 async fn test_batch_put_with_prev_kv() {
988 let tc = new_client("test_batch_put_with_prev_kv").await;
989
990 let key = tc.key("key");
991 let key2 = tc.key("key2");
992 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
993 let res = tc.client.batch_put(req).await;
994 assert_eq!(0, res.unwrap().take_prev_kvs().len());
995
996 let req = BatchPutRequest::new()
997 .add_kv(key.as_slice(), b"value-".to_vec())
998 .add_kv(key2.as_slice(), b"value2-".to_vec())
999 .with_prev_kv();
1000 let res = tc.client.batch_put(req).await;
1001 let mut kvs = res.unwrap().take_prev_kvs();
1002 assert_eq!(1, kvs.len());
1003 let mut kv = kvs.pop().unwrap();
1004 assert_eq!(key, kv.take_key());
1005 assert_eq!(b"value".to_vec(), kv.take_value());
1006 }
1007
1008 #[tokio::test]
1009 async fn test_compare_and_put() {
1010 let tc = new_client("test_compare_and_put").await;
1011
1012 let key = tc.key("key");
1013 let req = CompareAndPutRequest::new()
1014 .with_key(key.as_slice())
1015 .with_expect(b"expect".to_vec())
1016 .with_value(b"value".to_vec());
1017 let res = tc.client.compare_and_put(req).await;
1018 assert!(!res.unwrap().is_success());
1019
1020 let req = CompareAndPutRequest::new()
1022 .with_key(key.as_slice())
1023 .with_value(b"value".to_vec());
1024 let res = tc.client.compare_and_put(req).await;
1025 let mut res = res.unwrap();
1026 assert!(res.is_success());
1027 assert!(res.take_prev_kv().is_none());
1028
1029 let req = CompareAndPutRequest::new()
1031 .with_key(key.as_slice())
1032 .with_expect(b"not_eq".to_vec())
1033 .with_value(b"value2".to_vec());
1034 let res = tc.client.compare_and_put(req).await;
1035 let mut res = res.unwrap();
1036 assert!(!res.is_success());
1037 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1038
1039 let req = CompareAndPutRequest::new()
1041 .with_key(key.as_slice())
1042 .with_expect(b"value".to_vec())
1043 .with_value(b"value2".to_vec());
1044 let res = tc.client.compare_and_put(req).await;
1045 let mut res = res.unwrap();
1046 assert!(res.is_success());
1047
1048 assert!(res.take_prev_kv().is_none());
1050 }
1051
1052 #[tokio::test]
1053 async fn test_delete_with_key() {
1054 let tc = new_client("test_delete_with_key").await;
1055 tc.gen_data().await;
1056
1057 let req = DeleteRangeRequest::new()
1058 .with_key(tc.key("key-0"))
1059 .with_prev_kv();
1060 let res = tc.client.delete_range(req).await;
1061 let mut res = res.unwrap();
1062 assert_eq!(1, res.deleted());
1063 let mut kvs = res.take_prev_kvs();
1064 assert_eq!(1, kvs.len());
1065 let mut kv = kvs.pop().unwrap();
1066 assert_eq!(b"value-0".to_vec(), kv.take_value());
1067 }
1068
1069 #[tokio::test]
1070 async fn test_delete_with_prefix() {
1071 let tc = new_client("test_delete_with_prefix").await;
1072 tc.gen_data().await;
1073
1074 let req = DeleteRangeRequest::new()
1075 .with_prefix(tc.key("key-"))
1076 .with_prev_kv();
1077 let res = tc.client.delete_range(req).await;
1078 let mut res = res.unwrap();
1079 assert_eq!(10, res.deleted());
1080 let kvs = res.take_prev_kvs();
1081 assert_eq!(10, kvs.len());
1082 for (i, mut kv) in kvs.into_iter().enumerate() {
1083 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1084 }
1085 }
1086
1087 #[tokio::test]
1088 async fn test_delete_with_range() {
1089 let tc = new_client("test_delete_with_range").await;
1090 tc.gen_data().await;
1091
1092 let req = DeleteRangeRequest::new()
1093 .with_range(tc.key("key-2"), tc.key("key-7"))
1094 .with_prev_kv();
1095 let res = tc.client.delete_range(req).await;
1096 let mut res = res.unwrap();
1097 assert_eq!(5, res.deleted());
1098 let kvs = res.take_prev_kvs();
1099 assert_eq!(5, kvs.len());
1100 for (i, mut kv) in kvs.into_iter().enumerate() {
1101 assert_eq!(
1102 format!("{}-{}", "value", i + 2).into_bytes(),
1103 kv.take_value()
1104 );
1105 }
1106 }
1107
1108 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1109 Ok(())
1110 }
1111
1112 #[tokio::test]
1113 async fn test_cluster_client_adaptive_range() {
1114 let tx = new_client("test_cluster_client").await;
1115 let in_memory = tx.in_memory().unwrap();
1116 let cluster_client = tx.client.cluster_client().unwrap();
1117 let mut rng = rand::rng();
1118
1119 for i in 0..10 {
1121 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1122 in_memory
1123 .put(
1124 PutRequest::new()
1125 .with_key(format!("__prefix/{i}").as_bytes())
1126 .with_value(data.clone()),
1127 )
1128 .await
1129 .unwrap();
1130 }
1131
1132 let req = RangeRequest::new().with_prefix(b"__prefix/");
1133 let stream =
1134 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1135
1136 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1137 assert_eq!(10, res.len());
1138 }
1139}