1mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role};
28pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::error::{
38 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
39};
40use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
41use common_meta::kv_backend::KvBackendRef;
42use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::KeyValue;
45use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
46use common_meta::rpc::procedure::{
47 AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
48 MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
49 RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
50};
51use common_meta::rpc::store::{
52 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
53 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
54 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
55};
56use common_telemetry::info;
57use futures::TryStreamExt;
58use heartbeat::Client as HeartbeatClient;
59use procedure::Client as ProcedureClient;
60use snafu::{OptionExt, ResultExt};
61use store::Client as StoreClient;
62
63pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
64use crate::error::{
65 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
66 Result,
67};
68
69pub type Id = u64;
70
71const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
72const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
73const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
74
75#[derive(Clone, Debug, Default)]
76pub struct MetaClientBuilder {
77 id: Id,
78 role: Role,
79 enable_heartbeat: bool,
80 enable_store: bool,
81 enable_procedure: bool,
82 enable_access_cluster_info: bool,
83 region_follower: Option<RegionFollowerClientRef>,
84 channel_manager: Option<ChannelManager>,
85 ddl_channel_manager: Option<ChannelManager>,
86 heartbeat_channel_manager: Option<ChannelManager>,
87}
88
89impl MetaClientBuilder {
90 pub fn new(member_id: u64, role: Role) -> Self {
91 Self {
92 id: member_id,
93 role,
94 ..Default::default()
95 }
96 }
97
98 pub fn frontend_default_options() -> Self {
100 Self::new(0, Role::Frontend)
102 .enable_store()
103 .enable_heartbeat()
104 .enable_procedure()
105 .enable_access_cluster_info()
106 }
107
108 pub fn datanode_default_options(member_id: u64) -> Self {
110 Self::new(member_id, Role::Datanode)
111 .enable_store()
112 .enable_heartbeat()
113 }
114
115 pub fn flownode_default_options(member_id: u64) -> Self {
117 Self::new(member_id, Role::Flownode)
118 .enable_store()
119 .enable_heartbeat()
120 .enable_procedure()
121 .enable_access_cluster_info()
122 }
123
124 pub fn enable_heartbeat(self) -> Self {
125 Self {
126 enable_heartbeat: true,
127 ..self
128 }
129 }
130
131 pub fn enable_store(self) -> Self {
132 Self {
133 enable_store: true,
134 ..self
135 }
136 }
137
138 pub fn enable_procedure(self) -> Self {
139 Self {
140 enable_procedure: true,
141 ..self
142 }
143 }
144
145 pub fn enable_access_cluster_info(self) -> Self {
146 Self {
147 enable_access_cluster_info: true,
148 ..self
149 }
150 }
151
152 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
153 Self {
154 channel_manager: Some(channel_manager),
155 ..self
156 }
157 }
158
159 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
160 Self {
161 ddl_channel_manager: Some(channel_manager),
162 ..self
163 }
164 }
165
166 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
167 Self {
168 heartbeat_channel_manager: Some(channel_manager),
169 ..self
170 }
171 }
172
173 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
174 Self {
175 region_follower: Some(region_follower),
176 ..self
177 }
178 }
179
180 pub fn build(self) -> MetaClient {
181 let mut client = if let Some(mgr) = self.channel_manager {
182 MetaClient::with_channel_manager(self.id, mgr)
183 } else {
184 MetaClient::new(self.id)
185 };
186
187 let mgr = client.channel_manager.clone();
188
189 if self.enable_heartbeat {
190 let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
191 client.heartbeat = Some(HeartbeatClient::new(
192 self.id,
193 self.role,
194 mgr,
195 DEFAULT_ASK_LEADER_MAX_RETRY,
196 ));
197 }
198
199 if self.enable_store {
200 client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
201 }
202
203 if self.enable_procedure {
204 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
205 client.procedure = Some(ProcedureClient::new(
206 self.id,
207 self.role,
208 mgr,
209 DEFAULT_SUBMIT_DDL_MAX_RETRY,
210 ));
211 }
212
213 if self.enable_access_cluster_info {
214 client.cluster = Some(ClusterClient::new(
215 self.id,
216 self.role,
217 mgr,
218 DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
219 ))
220 }
221
222 if let Some(region_follower) = self.region_follower {
223 client.region_follower = Some(region_follower);
224 }
225
226 client
227 }
228}
229
230#[derive(Debug, Default)]
231pub struct MetaClient {
232 id: Id,
233 channel_manager: ChannelManager,
234 heartbeat: Option<HeartbeatClient>,
235 store: Option<StoreClient>,
236 procedure: Option<ProcedureClient>,
237 cluster: Option<ClusterClient>,
238 region_follower: Option<RegionFollowerClientRef>,
239}
240
241pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
242
243#[async_trait::async_trait]
245pub trait RegionFollowerClient: Sync + Send + Debug {
246 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
247
248 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
249
250 async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
251
252 async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
253
254 async fn start(&self, urls: &[&str]) -> Result<()>;
255
256 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
257}
258
259#[async_trait::async_trait]
260impl ProcedureExecutor for MetaClient {
261 async fn submit_ddl_task(
262 &self,
263 _ctx: &ExecutorContext,
264 request: SubmitDdlTaskRequest,
265 ) -> MetaResult<SubmitDdlTaskResponse> {
266 self.submit_ddl_task(request)
267 .await
268 .map_err(BoxedError::new)
269 .context(meta_error::ExternalSnafu)
270 }
271
272 async fn migrate_region(
273 &self,
274 _ctx: &ExecutorContext,
275 request: MigrateRegionRequest,
276 ) -> MetaResult<MigrateRegionResponse> {
277 self.migrate_region(request)
278 .await
279 .map_err(BoxedError::new)
280 .context(meta_error::ExternalSnafu)
281 }
282
283 async fn reconcile(
284 &self,
285 _ctx: &ExecutorContext,
286 request: ReconcileRequest,
287 ) -> MetaResult<ReconcileResponse> {
288 self.reconcile(request)
289 .await
290 .map_err(BoxedError::new)
291 .context(meta_error::ExternalSnafu)
292 }
293
294 async fn manage_region_follower(
295 &self,
296 _ctx: &ExecutorContext,
297 request: ManageRegionFollowerRequest,
298 ) -> MetaResult<()> {
299 if let Some(region_follower) = &self.region_follower {
300 match request {
301 ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
302 region_follower
303 .add_region_follower(add_region_follower_request)
304 .await
305 }
306 ManageRegionFollowerRequest::RemoveRegionFollower(
307 remove_region_follower_request,
308 ) => {
309 region_follower
310 .remove_region_follower(remove_region_follower_request)
311 .await
312 }
313 ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
314 region_follower
315 .add_table_follower(add_table_follower_request)
316 .await
317 }
318 ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
319 region_follower
320 .remove_table_follower(remove_table_follower_request)
321 .await
322 }
323 }
324 .map_err(BoxedError::new)
325 .context(meta_error::ExternalSnafu)
326 } else {
327 UnsupportedSnafu {
328 operation: "manage_region_follower",
329 }
330 .fail()
331 }
332 }
333
334 async fn query_procedure_state(
335 &self,
336 _ctx: &ExecutorContext,
337 pid: &str,
338 ) -> MetaResult<ProcedureStateResponse> {
339 self.query_procedure_state(pid)
340 .await
341 .map_err(BoxedError::new)
342 .context(meta_error::ExternalSnafu)
343 }
344
345 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
346 self.procedure_client()
347 .map_err(BoxedError::new)
348 .context(meta_error::ExternalSnafu)?
349 .list_procedures()
350 .await
351 .map_err(BoxedError::new)
352 .context(meta_error::ExternalSnafu)
353 }
354}
355
356#[async_trait::async_trait]
357impl ClusterInfo for MetaClient {
358 type Error = Error;
359
360 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
361 let cluster_client = self.cluster_client()?;
362
363 let (get_metasrv_nodes, nodes_key_prefix) = match role {
364 None => (true, Some(NodeInfoKey::key_prefix())),
365 Some(ClusterRole::Metasrv) => (true, None),
366 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
367 };
368
369 let mut nodes = if get_metasrv_nodes {
370 let last_activity_ts = -1; let (leader, followers) = cluster_client.get_metasrv_peers().await?;
373 followers
374 .into_iter()
375 .map(|node| NodeInfo {
376 peer: node.peer.unwrap_or_default(),
377 last_activity_ts,
378 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
379 version: node.version,
380 git_commit: node.git_commit,
381 start_time_ms: node.start_time_ms,
382 cpus: node.cpus,
383 memory_bytes: node.memory_bytes,
384 })
385 .chain(leader.into_iter().map(|node| NodeInfo {
386 peer: node.peer.unwrap_or_default(),
387 last_activity_ts,
388 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
389 version: node.version,
390 git_commit: node.git_commit,
391 start_time_ms: node.start_time_ms,
392 cpus: node.cpus,
393 memory_bytes: node.memory_bytes,
394 }))
395 .collect::<Vec<_>>()
396 } else {
397 Vec::new()
398 };
399
400 if let Some(prefix) = nodes_key_prefix {
401 let req = RangeRequest::new().with_prefix(prefix);
402 let res = cluster_client.range(req).await?;
403 for kv in res.kvs {
404 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
405 }
406 }
407
408 Ok(nodes)
409 }
410
411 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
412 let cluster_kv_backend = Arc::new(self.cluster_client()?);
413 let range_prefix = DatanodeStatKey::prefix_key();
414 let req = RangeRequest::new().with_prefix(range_prefix);
415 let stream =
416 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
417 let mut datanode_stats = stream
418 .try_collect::<Vec<_>>()
419 .await
420 .context(ConvertMetaResponseSnafu)?;
421 let region_stats = datanode_stats
422 .iter_mut()
423 .flat_map(|datanode_stat| {
424 let last = datanode_stat.stats.pop();
425 last.map(|stat| stat.region_stats).unwrap_or_default()
426 })
427 .collect::<Vec<_>>();
428
429 Ok(region_stats)
430 }
431
432 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
433 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
434 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
435 let flow_state_manager = FlowStateManager::new(cluster_backend);
436 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
437
438 Ok(res.map(|r| r.into()))
439 }
440}
441
442fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
443 DatanodeStatValue::try_from(kv.value)
444 .map_err(BoxedError::new)
445 .context(ExternalSnafu)
446}
447
448impl MetaClient {
449 pub fn new(id: Id) -> Self {
450 Self {
451 id,
452 ..Default::default()
453 }
454 }
455
456 pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
457 Self {
458 id,
459 channel_manager,
460 ..Default::default()
461 }
462 }
463
464 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
465 where
466 U: AsRef<str>,
467 A: AsRef<[U]> + Clone,
468 {
469 info!("MetaClient channel config: {:?}", self.channel_config());
470
471 if let Some(client) = &mut self.region_follower {
472 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
473 client.start(&urls).await?;
474 info!("Region follower client started");
475 }
476 if let Some(client) = &mut self.heartbeat {
477 client.start(urls.clone()).await?;
478 info!("Heartbeat client started");
479 }
480 if let Some(client) = &mut self.store {
481 client.start(urls.clone()).await?;
482 info!("Store client started");
483 }
484 if let Some(client) = &mut self.procedure {
485 client.start(urls.clone()).await?;
486 info!("DDL client started");
487 }
488 if let Some(client) = &mut self.cluster {
489 client.start(urls).await?;
490 info!("Cluster client started");
491 }
492
493 Ok(())
494 }
495
496 pub(crate) async fn start_with<U, A>(
498 &mut self,
499 leader_provider: LeaderProviderRef,
500 peers: A,
501 ) -> Result<()>
502 where
503 U: AsRef<str>,
504 A: AsRef<[U]> + Clone,
505 {
506 if let Some(client) = &self.region_follower {
507 info!("Starting region follower client ...");
508 client.start_with(leader_provider.clone()).await?;
509 }
510
511 if let Some(client) = &self.heartbeat {
512 info!("Starting heartbeat client ...");
513 client.start_with(leader_provider.clone()).await?;
514 }
515
516 if let Some(client) = &mut self.store {
517 info!("Starting store client ...");
518 client.start(peers.clone()).await?;
519 }
520
521 if let Some(client) = &self.procedure {
522 info!("Starting procedure client ...");
523 client.start_with(leader_provider.clone()).await?;
524 }
525
526 if let Some(client) = &mut self.cluster {
527 info!("Starting cluster client ...");
528 client.start_with(leader_provider).await?;
529 }
530 Ok(())
531 }
532
533 pub async fn ask_leader(&self) -> Result<String> {
536 self.heartbeat_client()?.ask_leader().await
537 }
538
539 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
546 self.heartbeat_client()?.heartbeat().await
547 }
548
549 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
551 self.store_client()?
552 .range(req.into())
553 .await?
554 .try_into()
555 .context(ConvertMetaResponseSnafu)
556 }
557
558 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
560 self.store_client()?
561 .put(req.into())
562 .await?
563 .try_into()
564 .context(ConvertMetaResponseSnafu)
565 }
566
567 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
569 self.store_client()?
570 .batch_get(req.into())
571 .await?
572 .try_into()
573 .context(ConvertMetaResponseSnafu)
574 }
575
576 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
578 self.store_client()?
579 .batch_put(req.into())
580 .await?
581 .try_into()
582 .context(ConvertMetaResponseSnafu)
583 }
584
585 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
587 self.store_client()?
588 .batch_delete(req.into())
589 .await?
590 .try_into()
591 .context(ConvertMetaResponseSnafu)
592 }
593
594 pub async fn compare_and_put(
597 &self,
598 req: CompareAndPutRequest,
599 ) -> Result<CompareAndPutResponse> {
600 self.store_client()?
601 .compare_and_put(req.into())
602 .await?
603 .try_into()
604 .context(ConvertMetaResponseSnafu)
605 }
606
607 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
609 self.store_client()?
610 .delete_range(req.into())
611 .await?
612 .try_into()
613 .context(ConvertMetaResponseSnafu)
614 }
615
616 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
618 self.procedure_client()?.query_procedure_state(pid).await
619 }
620
621 pub async fn migrate_region(
623 &self,
624 request: MigrateRegionRequest,
625 ) -> Result<MigrateRegionResponse> {
626 self.procedure_client()?
627 .migrate_region(
628 request.region_id,
629 request.from_peer,
630 request.to_peer,
631 request.timeout,
632 )
633 .await
634 }
635
636 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
638 self.procedure_client()?.reconcile(request).await
639 }
640
641 pub async fn submit_ddl_task(
643 &self,
644 req: SubmitDdlTaskRequest,
645 ) -> Result<SubmitDdlTaskResponse> {
646 let res = self
647 .procedure_client()?
648 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
649 .await?
650 .try_into()
651 .context(ConvertMetaResponseSnafu)?;
652
653 Ok(res)
654 }
655
656 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
657 self.heartbeat.clone().context(NotStartedSnafu {
658 name: "heartbeat_client",
659 })
660 }
661
662 pub fn store_client(&self) -> Result<StoreClient> {
663 self.store.clone().context(NotStartedSnafu {
664 name: "store_client",
665 })
666 }
667
668 pub fn procedure_client(&self) -> Result<ProcedureClient> {
669 self.procedure.clone().context(NotStartedSnafu {
670 name: "procedure_client",
671 })
672 }
673
674 pub fn cluster_client(&self) -> Result<ClusterClient> {
675 self.cluster.clone().context(NotStartedSnafu {
676 name: "cluster_client",
677 })
678 }
679
680 pub fn channel_config(&self) -> &ChannelConfig {
681 self.channel_manager.config()
682 }
683
684 pub fn id(&self) -> Id {
685 self.id
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use std::sync::atomic::{AtomicUsize, Ordering};
692
693 use api::v1::meta::{HeartbeatRequest, Peer};
694 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
695 use rand::Rng;
696
697 use super::*;
698 use crate::error;
699 use crate::mocks::{self, MockMetaContext};
700
701 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
702
703 struct TestClient {
704 ns: String,
705 client: MetaClient,
706 meta_ctx: MockMetaContext,
707 }
708
709 impl TestClient {
710 async fn new(ns: impl Into<String>) -> Self {
711 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
713 Self {
714 ns: ns.into(),
715 client,
716 meta_ctx,
717 }
718 }
719
720 fn key(&self, name: &str) -> Vec<u8> {
721 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
722 }
723
724 async fn gen_data(&self) {
725 for i in 0..10 {
726 let req = PutRequest::new()
727 .with_key(self.key(&format!("key-{i}")))
728 .with_value(format!("{}-{}", "value", i).into_bytes())
729 .with_prev_kv();
730 let res = self.client.put(req).await;
731 let _ = res.unwrap();
732 }
733 }
734
735 async fn clear_data(&self) {
736 let req =
737 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
738 let res = self.client.delete_range(req).await;
739 let _ = res.unwrap();
740 }
741
742 #[allow(dead_code)]
743 fn kv_backend(&self) -> KvBackendRef {
744 self.meta_ctx.kv_backend.clone()
745 }
746
747 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
748 self.meta_ctx.in_memory.clone()
749 }
750 }
751
752 async fn new_client(ns: impl Into<String>) -> TestClient {
753 let client = TestClient::new(ns).await;
754 client.clear_data().await;
755 client
756 }
757
758 #[tokio::test]
759 async fn test_meta_client_builder() {
760 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
761
762 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
763 .enable_heartbeat()
764 .build();
765 let _ = meta_client.heartbeat_client().unwrap();
766 assert!(meta_client.store_client().is_err());
767 meta_client.start(urls).await.unwrap();
768
769 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
770 assert!(meta_client.heartbeat_client().is_err());
771 assert!(meta_client.store_client().is_err());
772 meta_client.start(urls).await.unwrap();
773
774 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
775 .enable_store()
776 .build();
777 assert!(meta_client.heartbeat_client().is_err());
778 let _ = meta_client.store_client().unwrap();
779 meta_client.start(urls).await.unwrap();
780
781 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
782 .enable_heartbeat()
783 .enable_store()
784 .build();
785 assert_eq!(2, meta_client.id());
786 assert_eq!(2, meta_client.id());
787 let _ = meta_client.heartbeat_client().unwrap();
788 let _ = meta_client.store_client().unwrap();
789 meta_client.start(urls).await.unwrap();
790 }
791
792 #[tokio::test]
793 async fn test_not_start_heartbeat_client() {
794 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
795 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
796 .enable_store()
797 .build();
798 meta_client.start(urls).await.unwrap();
799 let res = meta_client.ask_leader().await;
800 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
801 }
802
803 #[tokio::test]
804 async fn test_not_start_store_client() {
805 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
806 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
807 .enable_heartbeat()
808 .build();
809
810 meta_client.start(urls).await.unwrap();
811 let res = meta_client.put(PutRequest::default()).await;
812 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
813 }
814
815 #[tokio::test]
816 async fn test_ask_leader() {
817 let tc = new_client("test_ask_leader").await;
818 tc.client.ask_leader().await.unwrap();
819 }
820
821 #[tokio::test]
822 async fn test_heartbeat() {
823 let tc = new_client("test_heartbeat").await;
824 let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
825 let request_sent = Arc::new(AtomicUsize::new(0));
828 let request_sent_clone = request_sent.clone();
829 let _handle = tokio::spawn(async move {
830 for _ in 0..5 {
831 let req = HeartbeatRequest {
832 peer: Some(Peer {
833 id: 1,
834 addr: "meta_client_peer".to_string(),
835 }),
836 ..Default::default()
837 };
838 sender.send(req).await.unwrap();
839 request_sent_clone.fetch_add(1, Ordering::Relaxed);
840 }
841 });
842
843 let heartbeat_count = Arc::new(AtomicUsize::new(0));
844 let heartbeat_count_clone = heartbeat_count.clone();
845 let handle = tokio::spawn(async move {
846 while let Some(_resp) = receiver.message().await.unwrap() {
847 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
848 }
849 });
850
851 handle.await.unwrap();
852 assert_eq!(
854 request_sent.load(Ordering::Relaxed) + 1,
855 heartbeat_count.load(Ordering::Relaxed)
856 );
857 }
858
859 #[tokio::test]
860 async fn test_range_get() {
861 let tc = new_client("test_range_get").await;
862 tc.gen_data().await;
863
864 let key = tc.key("key-0");
865 let req = RangeRequest::new().with_key(key.as_slice());
866 let res = tc.client.range(req).await;
867 let mut kvs = res.unwrap().take_kvs();
868 assert_eq!(1, kvs.len());
869 let mut kv = kvs.pop().unwrap();
870 assert_eq!(key, kv.take_key());
871 assert_eq!(b"value-0".to_vec(), kv.take_value());
872 }
873
874 #[tokio::test]
875 async fn test_range_get_prefix() {
876 let tc = new_client("test_range_get_prefix").await;
877 tc.gen_data().await;
878
879 let req = RangeRequest::new().with_prefix(tc.key("key-"));
880 let res = tc.client.range(req).await;
881 let kvs = res.unwrap().take_kvs();
882 assert_eq!(10, kvs.len());
883 for (i, mut kv) in kvs.into_iter().enumerate() {
884 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
885 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
886 }
887 }
888
889 #[tokio::test]
890 async fn test_range() {
891 let tc = new_client("test_range").await;
892 tc.gen_data().await;
893
894 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
895 let res = tc.client.range(req).await;
896 let kvs = res.unwrap().take_kvs();
897 assert_eq!(3, kvs.len());
898 for (i, mut kv) in kvs.into_iter().enumerate() {
899 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
900 assert_eq!(
901 format!("{}-{}", "value", i + 5).into_bytes(),
902 kv.take_value()
903 );
904 }
905 }
906
907 #[tokio::test]
908 async fn test_range_keys_only() {
909 let tc = new_client("test_range_keys_only").await;
910 tc.gen_data().await;
911
912 let req = RangeRequest::new()
913 .with_range(tc.key("key-5"), tc.key("key-8"))
914 .with_keys_only();
915 let res = tc.client.range(req).await;
916 let kvs = res.unwrap().take_kvs();
917 assert_eq!(3, kvs.len());
918 for (i, mut kv) in kvs.into_iter().enumerate() {
919 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
920 assert!(kv.take_value().is_empty());
921 }
922 }
923
924 #[tokio::test]
925 async fn test_put() {
926 let tc = new_client("test_put").await;
927
928 let req = PutRequest::new()
929 .with_key(tc.key("key"))
930 .with_value(b"value".to_vec());
931 let res = tc.client.put(req).await;
932 assert!(res.unwrap().prev_kv.is_none());
933 }
934
935 #[tokio::test]
936 async fn test_put_with_prev_kv() {
937 let tc = new_client("test_put_with_prev_kv").await;
938
939 let key = tc.key("key");
940 let req = PutRequest::new()
941 .with_key(key.as_slice())
942 .with_value(b"value".to_vec())
943 .with_prev_kv();
944 let res = tc.client.put(req).await;
945 assert!(res.unwrap().prev_kv.is_none());
946
947 let req = PutRequest::new()
948 .with_key(key.as_slice())
949 .with_value(b"value1".to_vec())
950 .with_prev_kv();
951 let res = tc.client.put(req).await;
952 let mut kv = res.unwrap().prev_kv.unwrap();
953 assert_eq!(key, kv.take_key());
954 assert_eq!(b"value".to_vec(), kv.take_value());
955 }
956
957 #[tokio::test]
958 async fn test_batch_put() {
959 let tc = new_client("test_batch_put").await;
960
961 let mut req = BatchPutRequest::new();
962 for i in 0..275 {
963 req = req.add_kv(
964 tc.key(&format!("key-{}", i)),
965 format!("value-{}", i).into_bytes(),
966 );
967 }
968
969 let res = tc.client.batch_put(req).await;
970 assert_eq!(0, res.unwrap().take_prev_kvs().len());
971
972 let req = RangeRequest::new().with_prefix(tc.key("key-"));
973 let res = tc.client.range(req).await;
974 let kvs = res.unwrap().take_kvs();
975 assert_eq!(275, kvs.len());
976 }
977
978 #[tokio::test]
979 async fn test_batch_get() {
980 let tc = new_client("test_batch_get").await;
981 tc.gen_data().await;
982
983 let mut req = BatchGetRequest::default();
984 for i in 0..256 {
985 req = req.add_key(tc.key(&format!("key-{}", i)));
986 }
987 let res = tc.client.batch_get(req).await.unwrap();
988 assert_eq!(10, res.kvs.len());
989
990 let req = BatchGetRequest::default()
991 .add_key(tc.key("key-1"))
992 .add_key(tc.key("key-999"));
993 let res = tc.client.batch_get(req).await.unwrap();
994 assert_eq!(1, res.kvs.len());
995 }
996
997 #[tokio::test]
998 async fn test_batch_put_with_prev_kv() {
999 let tc = new_client("test_batch_put_with_prev_kv").await;
1000
1001 let key = tc.key("key");
1002 let key2 = tc.key("key2");
1003 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1004 let res = tc.client.batch_put(req).await;
1005 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1006
1007 let req = BatchPutRequest::new()
1008 .add_kv(key.as_slice(), b"value-".to_vec())
1009 .add_kv(key2.as_slice(), b"value2-".to_vec())
1010 .with_prev_kv();
1011 let res = tc.client.batch_put(req).await;
1012 let mut kvs = res.unwrap().take_prev_kvs();
1013 assert_eq!(1, kvs.len());
1014 let mut kv = kvs.pop().unwrap();
1015 assert_eq!(key, kv.take_key());
1016 assert_eq!(b"value".to_vec(), kv.take_value());
1017 }
1018
1019 #[tokio::test]
1020 async fn test_compare_and_put() {
1021 let tc = new_client("test_compare_and_put").await;
1022
1023 let key = tc.key("key");
1024 let req = CompareAndPutRequest::new()
1025 .with_key(key.as_slice())
1026 .with_expect(b"expect".to_vec())
1027 .with_value(b"value".to_vec());
1028 let res = tc.client.compare_and_put(req).await;
1029 assert!(!res.unwrap().is_success());
1030
1031 let req = CompareAndPutRequest::new()
1033 .with_key(key.as_slice())
1034 .with_value(b"value".to_vec());
1035 let res = tc.client.compare_and_put(req).await;
1036 let mut res = res.unwrap();
1037 assert!(res.is_success());
1038 assert!(res.take_prev_kv().is_none());
1039
1040 let req = CompareAndPutRequest::new()
1042 .with_key(key.as_slice())
1043 .with_expect(b"not_eq".to_vec())
1044 .with_value(b"value2".to_vec());
1045 let res = tc.client.compare_and_put(req).await;
1046 let mut res = res.unwrap();
1047 assert!(!res.is_success());
1048 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1049
1050 let req = CompareAndPutRequest::new()
1052 .with_key(key.as_slice())
1053 .with_expect(b"value".to_vec())
1054 .with_value(b"value2".to_vec());
1055 let res = tc.client.compare_and_put(req).await;
1056 let mut res = res.unwrap();
1057 assert!(res.is_success());
1058
1059 assert!(res.take_prev_kv().is_none());
1061 }
1062
1063 #[tokio::test]
1064 async fn test_delete_with_key() {
1065 let tc = new_client("test_delete_with_key").await;
1066 tc.gen_data().await;
1067
1068 let req = DeleteRangeRequest::new()
1069 .with_key(tc.key("key-0"))
1070 .with_prev_kv();
1071 let res = tc.client.delete_range(req).await;
1072 let mut res = res.unwrap();
1073 assert_eq!(1, res.deleted());
1074 let mut kvs = res.take_prev_kvs();
1075 assert_eq!(1, kvs.len());
1076 let mut kv = kvs.pop().unwrap();
1077 assert_eq!(b"value-0".to_vec(), kv.take_value());
1078 }
1079
1080 #[tokio::test]
1081 async fn test_delete_with_prefix() {
1082 let tc = new_client("test_delete_with_prefix").await;
1083 tc.gen_data().await;
1084
1085 let req = DeleteRangeRequest::new()
1086 .with_prefix(tc.key("key-"))
1087 .with_prev_kv();
1088 let res = tc.client.delete_range(req).await;
1089 let mut res = res.unwrap();
1090 assert_eq!(10, res.deleted());
1091 let kvs = res.take_prev_kvs();
1092 assert_eq!(10, kvs.len());
1093 for (i, mut kv) in kvs.into_iter().enumerate() {
1094 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1095 }
1096 }
1097
1098 #[tokio::test]
1099 async fn test_delete_with_range() {
1100 let tc = new_client("test_delete_with_range").await;
1101 tc.gen_data().await;
1102
1103 let req = DeleteRangeRequest::new()
1104 .with_range(tc.key("key-2"), tc.key("key-7"))
1105 .with_prev_kv();
1106 let res = tc.client.delete_range(req).await;
1107 let mut res = res.unwrap();
1108 assert_eq!(5, res.deleted());
1109 let kvs = res.take_prev_kvs();
1110 assert_eq!(5, kvs.len());
1111 for (i, mut kv) in kvs.into_iter().enumerate() {
1112 assert_eq!(
1113 format!("{}-{}", "value", i + 2).into_bytes(),
1114 kv.take_value()
1115 );
1116 }
1117 }
1118
1119 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1120 Ok(())
1121 }
1122
1123 #[tokio::test]
1124 async fn test_cluster_client_adaptive_range() {
1125 let tx = new_client("test_cluster_client").await;
1126 let in_memory = tx.in_memory().unwrap();
1127 let cluster_client = tx.client.cluster_client().unwrap();
1128 let mut rng = rand::rng();
1129
1130 for i in 0..10 {
1132 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1133 in_memory
1134 .put(
1135 PutRequest::new()
1136 .with_key(format!("__prefix/{i}").as_bytes())
1137 .with_value(data.clone()),
1138 )
1139 .await
1140 .unwrap();
1141 }
1142
1143 let req = RangeRequest::new().with_prefix(b"__prefix/");
1144 let stream =
1145 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1146
1147 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1148 assert_eq!(10, res.len());
1149 }
1150}