1mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, Role};
28pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
38use common_meta::error::{
39 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
40};
41use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
45use common_meta::rpc::procedure::{
46 AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
47 RemoveRegionFollowerRequest,
48};
49use common_meta::rpc::store::{
50 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
51 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
52 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
53};
54use common_meta::rpc::KeyValue;
55use common_telemetry::info;
56use futures::TryStreamExt;
57use heartbeat::Client as HeartbeatClient;
58use procedure::Client as ProcedureClient;
59use snafu::{OptionExt, ResultExt};
60use store::Client as StoreClient;
61
62pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
63use crate::error::{
64 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
65 Result,
66};
67
68pub type Id = u64;
69
70const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
71const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
72const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
73
74#[derive(Clone, Debug, Default)]
75pub struct MetaClientBuilder {
76 id: Id,
77 role: Role,
78 enable_heartbeat: bool,
79 enable_store: bool,
80 enable_procedure: bool,
81 enable_access_cluster_info: bool,
82 region_follower: Option<RegionFollowerClientRef>,
83 channel_manager: Option<ChannelManager>,
84 ddl_channel_manager: Option<ChannelManager>,
85 heartbeat_channel_manager: Option<ChannelManager>,
86}
87
88impl MetaClientBuilder {
89 pub fn new(member_id: u64, role: Role) -> Self {
90 Self {
91 id: member_id,
92 role,
93 ..Default::default()
94 }
95 }
96
97 pub fn frontend_default_options() -> Self {
99 Self::new(0, Role::Frontend)
101 .enable_store()
102 .enable_heartbeat()
103 .enable_procedure()
104 .enable_access_cluster_info()
105 }
106
107 pub fn datanode_default_options(member_id: u64) -> Self {
109 Self::new(member_id, Role::Datanode)
110 .enable_store()
111 .enable_heartbeat()
112 }
113
114 pub fn flownode_default_options(member_id: u64) -> Self {
116 Self::new(member_id, Role::Flownode)
117 .enable_store()
118 .enable_heartbeat()
119 .enable_procedure()
120 .enable_access_cluster_info()
121 }
122
123 pub fn enable_heartbeat(self) -> Self {
124 Self {
125 enable_heartbeat: true,
126 ..self
127 }
128 }
129
130 pub fn enable_store(self) -> Self {
131 Self {
132 enable_store: true,
133 ..self
134 }
135 }
136
137 pub fn enable_procedure(self) -> Self {
138 Self {
139 enable_procedure: true,
140 ..self
141 }
142 }
143
144 pub fn enable_access_cluster_info(self) -> Self {
145 Self {
146 enable_access_cluster_info: true,
147 ..self
148 }
149 }
150
151 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
152 Self {
153 channel_manager: Some(channel_manager),
154 ..self
155 }
156 }
157
158 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
159 Self {
160 ddl_channel_manager: Some(channel_manager),
161 ..self
162 }
163 }
164
165 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
166 Self {
167 heartbeat_channel_manager: Some(channel_manager),
168 ..self
169 }
170 }
171
172 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
173 Self {
174 region_follower: Some(region_follower),
175 ..self
176 }
177 }
178
179 pub fn build(self) -> MetaClient {
180 let mut client = if let Some(mgr) = self.channel_manager {
181 MetaClient::with_channel_manager(self.id, mgr)
182 } else {
183 MetaClient::new(self.id)
184 };
185
186 let mgr = client.channel_manager.clone();
187
188 if self.enable_heartbeat {
189 let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
190 client.heartbeat = Some(HeartbeatClient::new(
191 self.id,
192 self.role,
193 mgr,
194 DEFAULT_ASK_LEADER_MAX_RETRY,
195 ));
196 }
197
198 if self.enable_store {
199 client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
200 }
201
202 if self.enable_procedure {
203 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
204 client.procedure = Some(ProcedureClient::new(
205 self.id,
206 self.role,
207 mgr,
208 DEFAULT_SUBMIT_DDL_MAX_RETRY,
209 ));
210 }
211
212 if self.enable_access_cluster_info {
213 client.cluster = Some(ClusterClient::new(
214 self.id,
215 self.role,
216 mgr,
217 DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
218 ))
219 }
220
221 if let Some(region_follower) = self.region_follower {
222 client.region_follower = Some(region_follower);
223 }
224
225 client
226 }
227}
228
229#[derive(Debug, Default)]
230pub struct MetaClient {
231 id: Id,
232 channel_manager: ChannelManager,
233 heartbeat: Option<HeartbeatClient>,
234 store: Option<StoreClient>,
235 procedure: Option<ProcedureClient>,
236 cluster: Option<ClusterClient>,
237 region_follower: Option<RegionFollowerClientRef>,
238}
239
240pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
241
242#[async_trait::async_trait]
244pub trait RegionFollowerClient: Sync + Send + Debug {
245 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
246
247 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
248
249 async fn start(&self, urls: &[&str]) -> Result<()>;
250
251 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
252}
253
254#[async_trait::async_trait]
255impl ProcedureExecutor for MetaClient {
256 async fn submit_ddl_task(
257 &self,
258 _ctx: &ExecutorContext,
259 request: SubmitDdlTaskRequest,
260 ) -> MetaResult<SubmitDdlTaskResponse> {
261 self.submit_ddl_task(request)
262 .await
263 .map_err(BoxedError::new)
264 .context(meta_error::ExternalSnafu)
265 }
266
267 async fn migrate_region(
268 &self,
269 _ctx: &ExecutorContext,
270 request: MigrateRegionRequest,
271 ) -> MetaResult<MigrateRegionResponse> {
272 self.migrate_region(request)
273 .await
274 .map_err(BoxedError::new)
275 .context(meta_error::ExternalSnafu)
276 }
277
278 async fn add_region_follower(
279 &self,
280 _ctx: &ExecutorContext,
281 request: AddRegionFollowerRequest,
282 ) -> MetaResult<()> {
283 if let Some(region_follower) = &self.region_follower {
284 region_follower
285 .add_region_follower(request)
286 .await
287 .map_err(BoxedError::new)
288 .context(meta_error::ExternalSnafu)
289 } else {
290 UnsupportedSnafu {
291 operation: "add_region_follower",
292 }
293 .fail()
294 }
295 }
296
297 async fn remove_region_follower(
298 &self,
299 _ctx: &ExecutorContext,
300 request: RemoveRegionFollowerRequest,
301 ) -> MetaResult<()> {
302 if let Some(region_follower) = &self.region_follower {
303 region_follower
304 .remove_region_follower(request)
305 .await
306 .map_err(BoxedError::new)
307 .context(meta_error::ExternalSnafu)
308 } else {
309 UnsupportedSnafu {
310 operation: "remove_region_follower",
311 }
312 .fail()
313 }
314 }
315
316 async fn query_procedure_state(
317 &self,
318 _ctx: &ExecutorContext,
319 pid: &str,
320 ) -> MetaResult<ProcedureStateResponse> {
321 self.query_procedure_state(pid)
322 .await
323 .map_err(BoxedError::new)
324 .context(meta_error::ExternalSnafu)
325 }
326
327 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
328 self.procedure_client()
329 .map_err(BoxedError::new)
330 .context(meta_error::ExternalSnafu)?
331 .list_procedures()
332 .await
333 .map_err(BoxedError::new)
334 .context(meta_error::ExternalSnafu)
335 }
336}
337
338#[async_trait::async_trait]
339impl ClusterInfo for MetaClient {
340 type Error = Error;
341
342 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
343 let cluster_client = self.cluster_client()?;
344
345 let (get_metasrv_nodes, nodes_key_prefix) = match role {
346 None => (true, Some(NodeInfoKey::key_prefix())),
347 Some(ClusterRole::Metasrv) => (true, None),
348 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
349 };
350
351 let mut nodes = if get_metasrv_nodes {
352 let last_activity_ts = -1; let (leader, followers) = cluster_client.get_metasrv_peers().await?;
355 followers
356 .into_iter()
357 .map(|node| NodeInfo {
358 peer: node.peer.unwrap_or_default(),
359 last_activity_ts,
360 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
361 version: node.version,
362 git_commit: node.git_commit,
363 start_time_ms: node.start_time_ms,
364 })
365 .chain(leader.into_iter().map(|node| NodeInfo {
366 peer: node.peer.unwrap_or_default(),
367 last_activity_ts,
368 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
369 version: node.version,
370 git_commit: node.git_commit,
371 start_time_ms: node.start_time_ms,
372 }))
373 .collect::<Vec<_>>()
374 } else {
375 Vec::new()
376 };
377
378 if let Some(prefix) = nodes_key_prefix {
379 let req = RangeRequest::new().with_prefix(prefix);
380 let res = cluster_client.range(req).await?;
381 for kv in res.kvs {
382 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
383 }
384 }
385
386 Ok(nodes)
387 }
388
389 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
390 let cluster_kv_backend = Arc::new(self.cluster_client()?);
391 let range_prefix = DatanodeStatKey::prefix_key();
392 let req = RangeRequest::new().with_prefix(range_prefix);
393 let stream =
394 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
395 let mut datanode_stats = stream
396 .try_collect::<Vec<_>>()
397 .await
398 .context(ConvertMetaResponseSnafu)?;
399 let region_stats = datanode_stats
400 .iter_mut()
401 .flat_map(|datanode_stat| {
402 let last = datanode_stat.stats.pop();
403 last.map(|stat| stat.region_stats).unwrap_or_default()
404 })
405 .collect::<Vec<_>>();
406
407 Ok(region_stats)
408 }
409
410 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
411 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
412 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
413 let flow_state_manager = FlowStateManager::new(cluster_backend);
414 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
415
416 Ok(res.map(|r| r.into()))
417 }
418}
419
420fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
421 DatanodeStatValue::try_from(kv.value)
422 .map_err(BoxedError::new)
423 .context(ExternalSnafu)
424}
425
426impl MetaClient {
427 pub fn new(id: Id) -> Self {
428 Self {
429 id,
430 ..Default::default()
431 }
432 }
433
434 pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
435 Self {
436 id,
437 channel_manager,
438 ..Default::default()
439 }
440 }
441
442 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
443 where
444 U: AsRef<str>,
445 A: AsRef<[U]> + Clone,
446 {
447 info!("MetaClient channel config: {:?}", self.channel_config());
448
449 if let Some(client) = &mut self.region_follower {
450 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
451 client.start(&urls).await?;
452 info!("Region follower client started");
453 }
454 if let Some(client) = &mut self.heartbeat {
455 client.start(urls.clone()).await?;
456 info!("Heartbeat client started");
457 }
458 if let Some(client) = &mut self.store {
459 client.start(urls.clone()).await?;
460 info!("Store client started");
461 }
462 if let Some(client) = &mut self.procedure {
463 client.start(urls.clone()).await?;
464 info!("DDL client started");
465 }
466 if let Some(client) = &mut self.cluster {
467 client.start(urls).await?;
468 info!("Cluster client started");
469 }
470
471 Ok(())
472 }
473
474 pub(crate) async fn start_with<U, A>(
476 &mut self,
477 leader_provider: LeaderProviderRef,
478 peers: A,
479 ) -> Result<()>
480 where
481 U: AsRef<str>,
482 A: AsRef<[U]> + Clone,
483 {
484 if let Some(client) = &self.region_follower {
485 info!("Starting region follower client ...");
486 client.start_with(leader_provider.clone()).await?;
487 }
488
489 if let Some(client) = &self.heartbeat {
490 info!("Starting heartbeat client ...");
491 client.start_with(leader_provider.clone()).await?;
492 }
493
494 if let Some(client) = &mut self.store {
495 info!("Starting store client ...");
496 client.start(peers.clone()).await?;
497 }
498
499 if let Some(client) = &self.procedure {
500 info!("Starting procedure client ...");
501 client.start_with(leader_provider.clone()).await?;
502 }
503
504 if let Some(client) = &mut self.cluster {
505 info!("Starting cluster client ...");
506 client.start_with(leader_provider).await?;
507 }
508 Ok(())
509 }
510
511 pub async fn ask_leader(&self) -> Result<String> {
514 self.heartbeat_client()?.ask_leader().await
515 }
516
517 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
524 self.heartbeat_client()?.heartbeat().await
525 }
526
527 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
529 self.store_client()?
530 .range(req.into())
531 .await?
532 .try_into()
533 .context(ConvertMetaResponseSnafu)
534 }
535
536 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
538 self.store_client()?
539 .put(req.into())
540 .await?
541 .try_into()
542 .context(ConvertMetaResponseSnafu)
543 }
544
545 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
547 self.store_client()?
548 .batch_get(req.into())
549 .await?
550 .try_into()
551 .context(ConvertMetaResponseSnafu)
552 }
553
554 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
556 self.store_client()?
557 .batch_put(req.into())
558 .await?
559 .try_into()
560 .context(ConvertMetaResponseSnafu)
561 }
562
563 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
565 self.store_client()?
566 .batch_delete(req.into())
567 .await?
568 .try_into()
569 .context(ConvertMetaResponseSnafu)
570 }
571
572 pub async fn compare_and_put(
575 &self,
576 req: CompareAndPutRequest,
577 ) -> Result<CompareAndPutResponse> {
578 self.store_client()?
579 .compare_and_put(req.into())
580 .await?
581 .try_into()
582 .context(ConvertMetaResponseSnafu)
583 }
584
585 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
587 self.store_client()?
588 .delete_range(req.into())
589 .await?
590 .try_into()
591 .context(ConvertMetaResponseSnafu)
592 }
593
594 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
596 self.procedure_client()?.query_procedure_state(pid).await
597 }
598
599 pub async fn migrate_region(
601 &self,
602 request: MigrateRegionRequest,
603 ) -> Result<MigrateRegionResponse> {
604 self.procedure_client()?
605 .migrate_region(
606 request.region_id,
607 request.from_peer,
608 request.to_peer,
609 request.timeout,
610 )
611 .await
612 }
613
614 pub async fn submit_ddl_task(
616 &self,
617 req: SubmitDdlTaskRequest,
618 ) -> Result<SubmitDdlTaskResponse> {
619 let res = self
620 .procedure_client()?
621 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
622 .await?
623 .try_into()
624 .context(ConvertMetaResponseSnafu)?;
625
626 Ok(res)
627 }
628
629 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
630 self.heartbeat.clone().context(NotStartedSnafu {
631 name: "heartbeat_client",
632 })
633 }
634
635 pub fn store_client(&self) -> Result<StoreClient> {
636 self.store.clone().context(NotStartedSnafu {
637 name: "store_client",
638 })
639 }
640
641 pub fn procedure_client(&self) -> Result<ProcedureClient> {
642 self.procedure.clone().context(NotStartedSnafu {
643 name: "procedure_client",
644 })
645 }
646
647 pub fn cluster_client(&self) -> Result<ClusterClient> {
648 self.cluster.clone().context(NotStartedSnafu {
649 name: "cluster_client",
650 })
651 }
652
653 pub fn channel_config(&self) -> &ChannelConfig {
654 self.channel_manager.config()
655 }
656
657 pub fn id(&self) -> Id {
658 self.id
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use std::sync::atomic::{AtomicUsize, Ordering};
665
666 use api::v1::meta::{HeartbeatRequest, Peer};
667 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
668 use rand::Rng;
669
670 use super::*;
671 use crate::error;
672 use crate::mocks::{self, MockMetaContext};
673
674 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
675
676 struct TestClient {
677 ns: String,
678 client: MetaClient,
679 meta_ctx: MockMetaContext,
680 }
681
682 impl TestClient {
683 async fn new(ns: impl Into<String>) -> Self {
684 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
686 Self {
687 ns: ns.into(),
688 client,
689 meta_ctx,
690 }
691 }
692
693 fn key(&self, name: &str) -> Vec<u8> {
694 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
695 }
696
697 async fn gen_data(&self) {
698 for i in 0..10 {
699 let req = PutRequest::new()
700 .with_key(self.key(&format!("key-{i}")))
701 .with_value(format!("{}-{}", "value", i).into_bytes())
702 .with_prev_kv();
703 let res = self.client.put(req).await;
704 let _ = res.unwrap();
705 }
706 }
707
708 async fn clear_data(&self) {
709 let req =
710 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
711 let res = self.client.delete_range(req).await;
712 let _ = res.unwrap();
713 }
714
715 #[allow(dead_code)]
716 fn kv_backend(&self) -> KvBackendRef {
717 self.meta_ctx.kv_backend.clone()
718 }
719
720 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
721 self.meta_ctx.in_memory.clone()
722 }
723 }
724
725 async fn new_client(ns: impl Into<String>) -> TestClient {
726 let client = TestClient::new(ns).await;
727 client.clear_data().await;
728 client
729 }
730
731 #[tokio::test]
732 async fn test_meta_client_builder() {
733 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
734
735 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
736 .enable_heartbeat()
737 .build();
738 let _ = meta_client.heartbeat_client().unwrap();
739 assert!(meta_client.store_client().is_err());
740 meta_client.start(urls).await.unwrap();
741
742 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
743 assert!(meta_client.heartbeat_client().is_err());
744 assert!(meta_client.store_client().is_err());
745 meta_client.start(urls).await.unwrap();
746
747 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
748 .enable_store()
749 .build();
750 assert!(meta_client.heartbeat_client().is_err());
751 let _ = meta_client.store_client().unwrap();
752 meta_client.start(urls).await.unwrap();
753
754 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
755 .enable_heartbeat()
756 .enable_store()
757 .build();
758 assert_eq!(2, meta_client.id());
759 assert_eq!(2, meta_client.id());
760 let _ = meta_client.heartbeat_client().unwrap();
761 let _ = meta_client.store_client().unwrap();
762 meta_client.start(urls).await.unwrap();
763 }
764
765 #[tokio::test]
766 async fn test_not_start_heartbeat_client() {
767 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
768 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
769 .enable_store()
770 .build();
771 meta_client.start(urls).await.unwrap();
772 let res = meta_client.ask_leader().await;
773 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
774 }
775
776 #[tokio::test]
777 async fn test_not_start_store_client() {
778 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
779 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
780 .enable_heartbeat()
781 .build();
782
783 meta_client.start(urls).await.unwrap();
784 let res = meta_client.put(PutRequest::default()).await;
785 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
786 }
787
788 #[tokio::test]
789 async fn test_ask_leader() {
790 let tc = new_client("test_ask_leader").await;
791 tc.client.ask_leader().await.unwrap();
792 }
793
794 #[tokio::test]
795 async fn test_heartbeat() {
796 let tc = new_client("test_heartbeat").await;
797 let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
798 let request_sent = Arc::new(AtomicUsize::new(0));
801 let request_sent_clone = request_sent.clone();
802 let _handle = tokio::spawn(async move {
803 for _ in 0..5 {
804 let req = HeartbeatRequest {
805 peer: Some(Peer {
806 id: 1,
807 addr: "meta_client_peer".to_string(),
808 }),
809 ..Default::default()
810 };
811 sender.send(req).await.unwrap();
812 request_sent_clone.fetch_add(1, Ordering::Relaxed);
813 }
814 });
815
816 let heartbeat_count = Arc::new(AtomicUsize::new(0));
817 let heartbeat_count_clone = heartbeat_count.clone();
818 let handle = tokio::spawn(async move {
819 while let Some(_resp) = receiver.message().await.unwrap() {
820 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
821 }
822 });
823
824 handle.await.unwrap();
825 assert_eq!(
827 request_sent.load(Ordering::Relaxed) + 1,
828 heartbeat_count.load(Ordering::Relaxed)
829 );
830 }
831
832 #[tokio::test]
833 async fn test_range_get() {
834 let tc = new_client("test_range_get").await;
835 tc.gen_data().await;
836
837 let key = tc.key("key-0");
838 let req = RangeRequest::new().with_key(key.as_slice());
839 let res = tc.client.range(req).await;
840 let mut kvs = res.unwrap().take_kvs();
841 assert_eq!(1, kvs.len());
842 let mut kv = kvs.pop().unwrap();
843 assert_eq!(key, kv.take_key());
844 assert_eq!(b"value-0".to_vec(), kv.take_value());
845 }
846
847 #[tokio::test]
848 async fn test_range_get_prefix() {
849 let tc = new_client("test_range_get_prefix").await;
850 tc.gen_data().await;
851
852 let req = RangeRequest::new().with_prefix(tc.key("key-"));
853 let res = tc.client.range(req).await;
854 let kvs = res.unwrap().take_kvs();
855 assert_eq!(10, kvs.len());
856 for (i, mut kv) in kvs.into_iter().enumerate() {
857 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
858 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
859 }
860 }
861
862 #[tokio::test]
863 async fn test_range() {
864 let tc = new_client("test_range").await;
865 tc.gen_data().await;
866
867 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
868 let res = tc.client.range(req).await;
869 let kvs = res.unwrap().take_kvs();
870 assert_eq!(3, kvs.len());
871 for (i, mut kv) in kvs.into_iter().enumerate() {
872 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
873 assert_eq!(
874 format!("{}-{}", "value", i + 5).into_bytes(),
875 kv.take_value()
876 );
877 }
878 }
879
880 #[tokio::test]
881 async fn test_range_keys_only() {
882 let tc = new_client("test_range_keys_only").await;
883 tc.gen_data().await;
884
885 let req = RangeRequest::new()
886 .with_range(tc.key("key-5"), tc.key("key-8"))
887 .with_keys_only();
888 let res = tc.client.range(req).await;
889 let kvs = res.unwrap().take_kvs();
890 assert_eq!(3, kvs.len());
891 for (i, mut kv) in kvs.into_iter().enumerate() {
892 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
893 assert!(kv.take_value().is_empty());
894 }
895 }
896
897 #[tokio::test]
898 async fn test_put() {
899 let tc = new_client("test_put").await;
900
901 let req = PutRequest::new()
902 .with_key(tc.key("key"))
903 .with_value(b"value".to_vec());
904 let res = tc.client.put(req).await;
905 assert!(res.unwrap().prev_kv.is_none());
906 }
907
908 #[tokio::test]
909 async fn test_put_with_prev_kv() {
910 let tc = new_client("test_put_with_prev_kv").await;
911
912 let key = tc.key("key");
913 let req = PutRequest::new()
914 .with_key(key.as_slice())
915 .with_value(b"value".to_vec())
916 .with_prev_kv();
917 let res = tc.client.put(req).await;
918 assert!(res.unwrap().prev_kv.is_none());
919
920 let req = PutRequest::new()
921 .with_key(key.as_slice())
922 .with_value(b"value1".to_vec())
923 .with_prev_kv();
924 let res = tc.client.put(req).await;
925 let mut kv = res.unwrap().prev_kv.unwrap();
926 assert_eq!(key, kv.take_key());
927 assert_eq!(b"value".to_vec(), kv.take_value());
928 }
929
930 #[tokio::test]
931 async fn test_batch_put() {
932 let tc = new_client("test_batch_put").await;
933
934 let mut req = BatchPutRequest::new();
935 for i in 0..275 {
936 req = req.add_kv(
937 tc.key(&format!("key-{}", i)),
938 format!("value-{}", i).into_bytes(),
939 );
940 }
941
942 let res = tc.client.batch_put(req).await;
943 assert_eq!(0, res.unwrap().take_prev_kvs().len());
944
945 let req = RangeRequest::new().with_prefix(tc.key("key-"));
946 let res = tc.client.range(req).await;
947 let kvs = res.unwrap().take_kvs();
948 assert_eq!(275, kvs.len());
949 }
950
951 #[tokio::test]
952 async fn test_batch_get() {
953 let tc = new_client("test_batch_get").await;
954 tc.gen_data().await;
955
956 let mut req = BatchGetRequest::default();
957 for i in 0..256 {
958 req = req.add_key(tc.key(&format!("key-{}", i)));
959 }
960 let res = tc.client.batch_get(req).await.unwrap();
961 assert_eq!(10, res.kvs.len());
962
963 let req = BatchGetRequest::default()
964 .add_key(tc.key("key-1"))
965 .add_key(tc.key("key-999"));
966 let res = tc.client.batch_get(req).await.unwrap();
967 assert_eq!(1, res.kvs.len());
968 }
969
970 #[tokio::test]
971 async fn test_batch_put_with_prev_kv() {
972 let tc = new_client("test_batch_put_with_prev_kv").await;
973
974 let key = tc.key("key");
975 let key2 = tc.key("key2");
976 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
977 let res = tc.client.batch_put(req).await;
978 assert_eq!(0, res.unwrap().take_prev_kvs().len());
979
980 let req = BatchPutRequest::new()
981 .add_kv(key.as_slice(), b"value-".to_vec())
982 .add_kv(key2.as_slice(), b"value2-".to_vec())
983 .with_prev_kv();
984 let res = tc.client.batch_put(req).await;
985 let mut kvs = res.unwrap().take_prev_kvs();
986 assert_eq!(1, kvs.len());
987 let mut kv = kvs.pop().unwrap();
988 assert_eq!(key, kv.take_key());
989 assert_eq!(b"value".to_vec(), kv.take_value());
990 }
991
992 #[tokio::test]
993 async fn test_compare_and_put() {
994 let tc = new_client("test_compare_and_put").await;
995
996 let key = tc.key("key");
997 let req = CompareAndPutRequest::new()
998 .with_key(key.as_slice())
999 .with_expect(b"expect".to_vec())
1000 .with_value(b"value".to_vec());
1001 let res = tc.client.compare_and_put(req).await;
1002 assert!(!res.unwrap().is_success());
1003
1004 let req = CompareAndPutRequest::new()
1006 .with_key(key.as_slice())
1007 .with_value(b"value".to_vec());
1008 let res = tc.client.compare_and_put(req).await;
1009 let mut res = res.unwrap();
1010 assert!(res.is_success());
1011 assert!(res.take_prev_kv().is_none());
1012
1013 let req = CompareAndPutRequest::new()
1015 .with_key(key.as_slice())
1016 .with_expect(b"not_eq".to_vec())
1017 .with_value(b"value2".to_vec());
1018 let res = tc.client.compare_and_put(req).await;
1019 let mut res = res.unwrap();
1020 assert!(!res.is_success());
1021 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1022
1023 let req = CompareAndPutRequest::new()
1025 .with_key(key.as_slice())
1026 .with_expect(b"value".to_vec())
1027 .with_value(b"value2".to_vec());
1028 let res = tc.client.compare_and_put(req).await;
1029 let mut res = res.unwrap();
1030 assert!(res.is_success());
1031
1032 assert!(res.take_prev_kv().is_none());
1034 }
1035
1036 #[tokio::test]
1037 async fn test_delete_with_key() {
1038 let tc = new_client("test_delete_with_key").await;
1039 tc.gen_data().await;
1040
1041 let req = DeleteRangeRequest::new()
1042 .with_key(tc.key("key-0"))
1043 .with_prev_kv();
1044 let res = tc.client.delete_range(req).await;
1045 let mut res = res.unwrap();
1046 assert_eq!(1, res.deleted());
1047 let mut kvs = res.take_prev_kvs();
1048 assert_eq!(1, kvs.len());
1049 let mut kv = kvs.pop().unwrap();
1050 assert_eq!(b"value-0".to_vec(), kv.take_value());
1051 }
1052
1053 #[tokio::test]
1054 async fn test_delete_with_prefix() {
1055 let tc = new_client("test_delete_with_prefix").await;
1056 tc.gen_data().await;
1057
1058 let req = DeleteRangeRequest::new()
1059 .with_prefix(tc.key("key-"))
1060 .with_prev_kv();
1061 let res = tc.client.delete_range(req).await;
1062 let mut res = res.unwrap();
1063 assert_eq!(10, res.deleted());
1064 let kvs = res.take_prev_kvs();
1065 assert_eq!(10, kvs.len());
1066 for (i, mut kv) in kvs.into_iter().enumerate() {
1067 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1068 }
1069 }
1070
1071 #[tokio::test]
1072 async fn test_delete_with_range() {
1073 let tc = new_client("test_delete_with_range").await;
1074 tc.gen_data().await;
1075
1076 let req = DeleteRangeRequest::new()
1077 .with_range(tc.key("key-2"), tc.key("key-7"))
1078 .with_prev_kv();
1079 let res = tc.client.delete_range(req).await;
1080 let mut res = res.unwrap();
1081 assert_eq!(5, res.deleted());
1082 let kvs = res.take_prev_kvs();
1083 assert_eq!(5, kvs.len());
1084 for (i, mut kv) in kvs.into_iter().enumerate() {
1085 assert_eq!(
1086 format!("{}-{}", "value", i + 2).into_bytes(),
1087 kv.take_value()
1088 );
1089 }
1090 }
1091
1092 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1093 Ok(())
1094 }
1095
1096 #[tokio::test]
1097 async fn test_cluster_client_adaptive_range() {
1098 let tx = new_client("test_cluster_client").await;
1099 let in_memory = tx.in_memory().unwrap();
1100 let cluster_client = tx.client.cluster_client().unwrap();
1101 let mut rng = rand::rng();
1102
1103 for i in 0..10 {
1105 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1106 in_memory
1107 .put(
1108 PutRequest::new()
1109 .with_key(format!("__prefix/{i}").as_bytes())
1110 .with_value(data.clone()),
1111 )
1112 .await
1113 .unwrap();
1114 }
1115
1116 let req = RangeRequest::new().with_prefix(b"__prefix/");
1117 let stream =
1118 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1119
1120 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1121 assert_eq!(10, res.len());
1122 }
1123}