1mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{
28 MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
29};
30pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
31use cluster::Client as ClusterClient;
32pub use cluster::ClusterKvBackend;
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
35use common_meta::cluster::{
36 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
37};
38use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
39use common_meta::error::{
40 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
41};
42use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
43use common_meta::kv_backend::KvBackendRef;
44use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
45use common_meta::range_stream::PaginationStream;
46use common_meta::rpc::KeyValue;
47use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
48use common_meta::rpc::procedure::{
49 AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
50 MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
51 RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
52};
53use common_meta::rpc::store::{
54 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
55 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
56 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
57};
58use common_telemetry::info;
59use futures::TryStreamExt;
60use heartbeat::Client as HeartbeatClient;
61use procedure::Client as ProcedureClient;
62use snafu::{OptionExt, ResultExt};
63use store::Client as StoreClient;
64
65pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
66use crate::error::{
67 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
68 Result,
69};
70
71pub type Id = u64;
72
73const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
74const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
75const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
76
77#[derive(Clone, Debug, Default)]
78pub struct MetaClientBuilder {
79 id: Id,
80 role: Role,
81 enable_heartbeat: bool,
82 enable_store: bool,
83 enable_procedure: bool,
84 enable_access_cluster_info: bool,
85 region_follower: Option<RegionFollowerClientRef>,
86 channel_manager: Option<ChannelManager>,
87 ddl_channel_manager: Option<ChannelManager>,
88 heartbeat_channel_manager: Option<ChannelManager>,
89}
90
91impl MetaClientBuilder {
92 pub fn new(member_id: u64, role: Role) -> Self {
93 Self {
94 id: member_id,
95 role,
96 ..Default::default()
97 }
98 }
99
100 pub fn frontend_default_options() -> Self {
102 Self::new(0, Role::Frontend)
104 .enable_store()
105 .enable_heartbeat()
106 .enable_procedure()
107 .enable_access_cluster_info()
108 }
109
110 pub fn datanode_default_options(member_id: u64) -> Self {
112 Self::new(member_id, Role::Datanode)
113 .enable_store()
114 .enable_heartbeat()
115 }
116
117 pub fn flownode_default_options(member_id: u64) -> Self {
119 Self::new(member_id, Role::Flownode)
120 .enable_store()
121 .enable_heartbeat()
122 .enable_procedure()
123 .enable_access_cluster_info()
124 }
125
126 pub fn enable_heartbeat(self) -> Self {
127 Self {
128 enable_heartbeat: true,
129 ..self
130 }
131 }
132
133 pub fn enable_store(self) -> Self {
134 Self {
135 enable_store: true,
136 ..self
137 }
138 }
139
140 pub fn enable_procedure(self) -> Self {
141 Self {
142 enable_procedure: true,
143 ..self
144 }
145 }
146
147 pub fn enable_access_cluster_info(self) -> Self {
148 Self {
149 enable_access_cluster_info: true,
150 ..self
151 }
152 }
153
154 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
155 Self {
156 channel_manager: Some(channel_manager),
157 ..self
158 }
159 }
160
161 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
162 Self {
163 ddl_channel_manager: Some(channel_manager),
164 ..self
165 }
166 }
167
168 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
169 Self {
170 heartbeat_channel_manager: Some(channel_manager),
171 ..self
172 }
173 }
174
175 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
176 Self {
177 region_follower: Some(region_follower),
178 ..self
179 }
180 }
181
182 pub fn build(self) -> MetaClient {
183 let mut client = if let Some(mgr) = self.channel_manager {
184 MetaClient::with_channel_manager(self.id, mgr)
185 } else {
186 MetaClient::new(self.id)
187 };
188
189 let mgr = client.channel_manager.clone();
190
191 if self.enable_heartbeat {
192 if self.heartbeat_channel_manager.is_some() {
193 info!("Enable heartbeat channel using the heartbeat channel manager.");
194 }
195 let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
196 client.heartbeat = Some(HeartbeatClient::new(
197 self.id,
198 self.role,
199 mgr,
200 DEFAULT_ASK_LEADER_MAX_RETRY,
201 ));
202 }
203
204 if self.enable_store {
205 client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
206 }
207
208 if self.enable_procedure {
209 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
210 client.procedure = Some(ProcedureClient::new(
211 self.id,
212 self.role,
213 mgr,
214 DEFAULT_SUBMIT_DDL_MAX_RETRY,
215 ));
216 }
217
218 if self.enable_access_cluster_info {
219 client.cluster = Some(ClusterClient::new(
220 self.id,
221 self.role,
222 mgr,
223 DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
224 ))
225 }
226
227 if let Some(region_follower) = self.region_follower {
228 client.region_follower = Some(region_follower);
229 }
230
231 client
232 }
233}
234
235#[derive(Debug, Default)]
236pub struct MetaClient {
237 id: Id,
238 channel_manager: ChannelManager,
239 heartbeat: Option<HeartbeatClient>,
240 store: Option<StoreClient>,
241 procedure: Option<ProcedureClient>,
242 cluster: Option<ClusterClient>,
243 region_follower: Option<RegionFollowerClientRef>,
244}
245
246pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
247
248#[async_trait::async_trait]
250pub trait RegionFollowerClient: Sync + Send + Debug {
251 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
252
253 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
254
255 async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
256
257 async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
258
259 async fn start(&self, urls: &[&str]) -> Result<()>;
260
261 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
262}
263
264#[async_trait::async_trait]
265impl ProcedureExecutor for MetaClient {
266 async fn submit_ddl_task(
267 &self,
268 _ctx: &ExecutorContext,
269 request: SubmitDdlTaskRequest,
270 ) -> MetaResult<SubmitDdlTaskResponse> {
271 self.submit_ddl_task(request)
272 .await
273 .map_err(BoxedError::new)
274 .context(meta_error::ExternalSnafu)
275 }
276
277 async fn migrate_region(
278 &self,
279 _ctx: &ExecutorContext,
280 request: MigrateRegionRequest,
281 ) -> MetaResult<MigrateRegionResponse> {
282 self.migrate_region(request)
283 .await
284 .map_err(BoxedError::new)
285 .context(meta_error::ExternalSnafu)
286 }
287
288 async fn reconcile(
289 &self,
290 _ctx: &ExecutorContext,
291 request: ReconcileRequest,
292 ) -> MetaResult<ReconcileResponse> {
293 self.reconcile(request)
294 .await
295 .map_err(BoxedError::new)
296 .context(meta_error::ExternalSnafu)
297 }
298
299 async fn manage_region_follower(
300 &self,
301 _ctx: &ExecutorContext,
302 request: ManageRegionFollowerRequest,
303 ) -> MetaResult<()> {
304 if let Some(region_follower) = &self.region_follower {
305 match request {
306 ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
307 region_follower
308 .add_region_follower(add_region_follower_request)
309 .await
310 }
311 ManageRegionFollowerRequest::RemoveRegionFollower(
312 remove_region_follower_request,
313 ) => {
314 region_follower
315 .remove_region_follower(remove_region_follower_request)
316 .await
317 }
318 ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
319 region_follower
320 .add_table_follower(add_table_follower_request)
321 .await
322 }
323 ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
324 region_follower
325 .remove_table_follower(remove_table_follower_request)
326 .await
327 }
328 }
329 .map_err(BoxedError::new)
330 .context(meta_error::ExternalSnafu)
331 } else {
332 UnsupportedSnafu {
333 operation: "manage_region_follower",
334 }
335 .fail()
336 }
337 }
338
339 async fn query_procedure_state(
340 &self,
341 _ctx: &ExecutorContext,
342 pid: &str,
343 ) -> MetaResult<ProcedureStateResponse> {
344 self.query_procedure_state(pid)
345 .await
346 .map_err(BoxedError::new)
347 .context(meta_error::ExternalSnafu)
348 }
349
350 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
351 self.procedure_client()
352 .map_err(BoxedError::new)
353 .context(meta_error::ExternalSnafu)?
354 .list_procedures()
355 .await
356 .map_err(BoxedError::new)
357 .context(meta_error::ExternalSnafu)
358 }
359}
360
361#[allow(deprecated)]
363#[async_trait::async_trait]
364impl ClusterInfo for MetaClient {
365 type Error = Error;
366
367 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
368 let cluster_client = self.cluster_client()?;
369
370 let (get_metasrv_nodes, nodes_key_prefix) = match role {
371 None => (true, Some(NodeInfoKey::key_prefix())),
372 Some(ClusterRole::Metasrv) => (true, None),
373 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
374 };
375
376 let mut nodes = if get_metasrv_nodes {
377 let last_activity_ts = -1; let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
380 cluster_client.get_metasrv_peers().await?;
381 followers
382 .into_iter()
383 .map(|node| {
384 if let Some(node_info) = node.info {
385 NodeInfo {
386 peer: node.peer.unwrap_or_default(),
387 last_activity_ts,
388 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
389 version: node_info.version,
390 git_commit: node_info.git_commit,
391 start_time_ms: node_info.start_time_ms,
392 total_cpu_millicores: node_info.total_cpu_millicores,
393 total_memory_bytes: node_info.total_memory_bytes,
394 cpu_usage_millicores: node_info.cpu_usage_millicores,
395 memory_usage_bytes: node_info.memory_usage_bytes,
396 hostname: node_info.hostname,
397 }
398 } else {
399 NodeInfo {
401 peer: node.peer.unwrap_or_default(),
402 last_activity_ts,
403 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
404 version: node.version,
405 git_commit: node.git_commit,
406 start_time_ms: node.start_time_ms,
407 total_cpu_millicores: node.cpus as i64,
408 total_memory_bytes: node.memory_bytes as i64,
409 cpu_usage_millicores: 0,
410 memory_usage_bytes: 0,
411 hostname: "".to_string(),
412 }
413 }
414 })
415 .chain(leader.into_iter().map(|node| {
416 if let Some(node_info) = node.info {
417 NodeInfo {
418 peer: node.peer.unwrap_or_default(),
419 last_activity_ts,
420 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
421 version: node_info.version,
422 git_commit: node_info.git_commit,
423 start_time_ms: node_info.start_time_ms,
424 total_cpu_millicores: node_info.total_cpu_millicores,
425 total_memory_bytes: node_info.total_memory_bytes,
426 cpu_usage_millicores: node_info.cpu_usage_millicores,
427 memory_usage_bytes: node_info.memory_usage_bytes,
428 hostname: node_info.hostname,
429 }
430 } else {
431 NodeInfo {
433 peer: node.peer.unwrap_or_default(),
434 last_activity_ts,
435 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
436 version: node.version,
437 git_commit: node.git_commit,
438 start_time_ms: node.start_time_ms,
439 total_cpu_millicores: node.cpus as i64,
440 total_memory_bytes: node.memory_bytes as i64,
441 cpu_usage_millicores: 0,
442 memory_usage_bytes: 0,
443 hostname: "".to_string(),
444 }
445 }
446 }))
447 .collect::<Vec<_>>()
448 } else {
449 Vec::new()
450 };
451
452 if let Some(prefix) = nodes_key_prefix {
453 let req = RangeRequest::new().with_prefix(prefix);
454 let res = cluster_client.range(req).await?;
455 for kv in res.kvs {
456 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
457 }
458 }
459
460 Ok(nodes)
461 }
462
463 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
464 let cluster_kv_backend = Arc::new(self.cluster_client()?);
465 let range_prefix = DatanodeStatKey::prefix_key();
466 let req = RangeRequest::new().with_prefix(range_prefix);
467 let stream =
468 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
469 let mut datanode_stats = stream
470 .try_collect::<Vec<_>>()
471 .await
472 .context(ConvertMetaResponseSnafu)?;
473 let region_stats = datanode_stats
474 .iter_mut()
475 .flat_map(|datanode_stat| {
476 let last = datanode_stat.stats.pop();
477 last.map(|stat| stat.region_stats).unwrap_or_default()
478 })
479 .collect::<Vec<_>>();
480
481 Ok(region_stats)
482 }
483
484 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
485 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
486 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
487 let flow_state_manager = FlowStateManager::new(cluster_backend);
488 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
489
490 Ok(res.map(|r| r.into()))
491 }
492}
493
494fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
495 DatanodeStatValue::try_from(kv.value)
496 .map_err(BoxedError::new)
497 .context(ExternalSnafu)
498}
499
500impl MetaClient {
501 pub fn new(id: Id) -> Self {
502 Self {
503 id,
504 ..Default::default()
505 }
506 }
507
508 pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
509 Self {
510 id,
511 channel_manager,
512 ..Default::default()
513 }
514 }
515
516 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
517 where
518 U: AsRef<str>,
519 A: AsRef<[U]> + Clone,
520 {
521 info!("MetaClient channel config: {:?}", self.channel_config());
522
523 if let Some(client) = &mut self.region_follower {
524 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
525 client.start(&urls).await?;
526 info!("Region follower client started");
527 }
528 if let Some(client) = &mut self.heartbeat {
529 client.start(urls.clone()).await?;
530 info!("Heartbeat client started");
531 }
532 if let Some(client) = &mut self.store {
533 client.start(urls.clone()).await?;
534 info!("Store client started");
535 }
536 if let Some(client) = &mut self.procedure {
537 client.start(urls.clone()).await?;
538 info!("DDL client started");
539 }
540 if let Some(client) = &mut self.cluster {
541 client.start(urls).await?;
542 info!("Cluster client started");
543 }
544
545 Ok(())
546 }
547
548 pub(crate) async fn start_with<U, A>(
550 &mut self,
551 leader_provider: LeaderProviderRef,
552 peers: A,
553 ) -> Result<()>
554 where
555 U: AsRef<str>,
556 A: AsRef<[U]> + Clone,
557 {
558 if let Some(client) = &self.region_follower {
559 info!("Starting region follower client ...");
560 client.start_with(leader_provider.clone()).await?;
561 }
562
563 if let Some(client) = &self.heartbeat {
564 info!("Starting heartbeat client ...");
565 client.start_with(leader_provider.clone()).await?;
566 }
567
568 if let Some(client) = &mut self.store {
569 info!("Starting store client ...");
570 client.start(peers.clone()).await?;
571 }
572
573 if let Some(client) = &self.procedure {
574 info!("Starting procedure client ...");
575 client.start_with(leader_provider.clone()).await?;
576 }
577
578 if let Some(client) = &mut self.cluster {
579 info!("Starting cluster client ...");
580 client.start_with(leader_provider).await?;
581 }
582 Ok(())
583 }
584
585 pub async fn ask_leader(&self) -> Result<String> {
588 self.heartbeat_client()?.ask_leader().await
589 }
590
591 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
598 self.heartbeat_client()?.heartbeat().await
599 }
600
601 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
603 self.store_client()?
604 .range(req.into())
605 .await?
606 .try_into()
607 .context(ConvertMetaResponseSnafu)
608 }
609
610 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
612 self.store_client()?
613 .put(req.into())
614 .await?
615 .try_into()
616 .context(ConvertMetaResponseSnafu)
617 }
618
619 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
621 self.store_client()?
622 .batch_get(req.into())
623 .await?
624 .try_into()
625 .context(ConvertMetaResponseSnafu)
626 }
627
628 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
630 self.store_client()?
631 .batch_put(req.into())
632 .await?
633 .try_into()
634 .context(ConvertMetaResponseSnafu)
635 }
636
637 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
639 self.store_client()?
640 .batch_delete(req.into())
641 .await?
642 .try_into()
643 .context(ConvertMetaResponseSnafu)
644 }
645
646 pub async fn compare_and_put(
649 &self,
650 req: CompareAndPutRequest,
651 ) -> Result<CompareAndPutResponse> {
652 self.store_client()?
653 .compare_and_put(req.into())
654 .await?
655 .try_into()
656 .context(ConvertMetaResponseSnafu)
657 }
658
659 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
661 self.store_client()?
662 .delete_range(req.into())
663 .await?
664 .try_into()
665 .context(ConvertMetaResponseSnafu)
666 }
667
668 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
670 self.procedure_client()?.query_procedure_state(pid).await
671 }
672
673 pub async fn migrate_region(
675 &self,
676 request: MigrateRegionRequest,
677 ) -> Result<MigrateRegionResponse> {
678 self.procedure_client()?
679 .migrate_region(
680 request.region_id,
681 request.from_peer,
682 request.to_peer,
683 request.timeout,
684 )
685 .await
686 }
687
688 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
690 self.procedure_client()?.reconcile(request).await
691 }
692
693 pub async fn submit_ddl_task(
695 &self,
696 req: SubmitDdlTaskRequest,
697 ) -> Result<SubmitDdlTaskResponse> {
698 let res = self
699 .procedure_client()?
700 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
701 .await?
702 .try_into()
703 .context(ConvertMetaResponseSnafu)?;
704
705 Ok(res)
706 }
707
708 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
709 self.heartbeat.clone().context(NotStartedSnafu {
710 name: "heartbeat_client",
711 })
712 }
713
714 pub fn store_client(&self) -> Result<StoreClient> {
715 self.store.clone().context(NotStartedSnafu {
716 name: "store_client",
717 })
718 }
719
720 pub fn procedure_client(&self) -> Result<ProcedureClient> {
721 self.procedure.clone().context(NotStartedSnafu {
722 name: "procedure_client",
723 })
724 }
725
726 pub fn cluster_client(&self) -> Result<ClusterClient> {
727 self.cluster.clone().context(NotStartedSnafu {
728 name: "cluster_client",
729 })
730 }
731
732 pub fn channel_config(&self) -> &ChannelConfig {
733 self.channel_manager.config()
734 }
735
736 pub fn id(&self) -> Id {
737 self.id
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use std::sync::atomic::{AtomicUsize, Ordering};
744
745 use api::v1::meta::{HeartbeatRequest, Peer};
746 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
747 use rand::Rng;
748
749 use super::*;
750 use crate::error;
751 use crate::mocks::{self, MockMetaContext};
752
753 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
754
755 struct TestClient {
756 ns: String,
757 client: MetaClient,
758 meta_ctx: MockMetaContext,
759 }
760
761 impl TestClient {
762 async fn new(ns: impl Into<String>) -> Self {
763 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
765 Self {
766 ns: ns.into(),
767 client,
768 meta_ctx,
769 }
770 }
771
772 fn key(&self, name: &str) -> Vec<u8> {
773 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
774 }
775
776 async fn gen_data(&self) {
777 for i in 0..10 {
778 let req = PutRequest::new()
779 .with_key(self.key(&format!("key-{i}")))
780 .with_value(format!("{}-{}", "value", i).into_bytes())
781 .with_prev_kv();
782 let res = self.client.put(req).await;
783 let _ = res.unwrap();
784 }
785 }
786
787 async fn clear_data(&self) {
788 let req =
789 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
790 let res = self.client.delete_range(req).await;
791 let _ = res.unwrap();
792 }
793
794 #[allow(dead_code)]
795 fn kv_backend(&self) -> KvBackendRef {
796 self.meta_ctx.kv_backend.clone()
797 }
798
799 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
800 self.meta_ctx.in_memory.clone()
801 }
802 }
803
804 async fn new_client(ns: impl Into<String>) -> TestClient {
805 let client = TestClient::new(ns).await;
806 client.clear_data().await;
807 client
808 }
809
810 #[tokio::test]
811 async fn test_meta_client_builder() {
812 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
813
814 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
815 .enable_heartbeat()
816 .build();
817 let _ = meta_client.heartbeat_client().unwrap();
818 assert!(meta_client.store_client().is_err());
819 meta_client.start(urls).await.unwrap();
820
821 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
822 assert!(meta_client.heartbeat_client().is_err());
823 assert!(meta_client.store_client().is_err());
824 meta_client.start(urls).await.unwrap();
825
826 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
827 .enable_store()
828 .build();
829 assert!(meta_client.heartbeat_client().is_err());
830 let _ = meta_client.store_client().unwrap();
831 meta_client.start(urls).await.unwrap();
832
833 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
834 .enable_heartbeat()
835 .enable_store()
836 .build();
837 assert_eq!(2, meta_client.id());
838 assert_eq!(2, meta_client.id());
839 let _ = meta_client.heartbeat_client().unwrap();
840 let _ = meta_client.store_client().unwrap();
841 meta_client.start(urls).await.unwrap();
842 }
843
844 #[tokio::test]
845 async fn test_not_start_heartbeat_client() {
846 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
847 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
848 .enable_store()
849 .build();
850 meta_client.start(urls).await.unwrap();
851 let res = meta_client.ask_leader().await;
852 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
853 }
854
855 #[tokio::test]
856 async fn test_not_start_store_client() {
857 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
858 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
859 .enable_heartbeat()
860 .build();
861
862 meta_client.start(urls).await.unwrap();
863 let res = meta_client.put(PutRequest::default()).await;
864 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
865 }
866
867 #[tokio::test]
868 async fn test_ask_leader() {
869 let tc = new_client("test_ask_leader").await;
870 tc.client.ask_leader().await.unwrap();
871 }
872
873 #[tokio::test]
874 async fn test_heartbeat() {
875 let tc = new_client("test_heartbeat").await;
876 let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
877 let request_sent = Arc::new(AtomicUsize::new(0));
880 let request_sent_clone = request_sent.clone();
881 let _handle = tokio::spawn(async move {
882 for _ in 0..5 {
883 let req = HeartbeatRequest {
884 peer: Some(Peer {
885 id: 1,
886 addr: "meta_client_peer".to_string(),
887 }),
888 ..Default::default()
889 };
890 sender.send(req).await.unwrap();
891 request_sent_clone.fetch_add(1, Ordering::Relaxed);
892 }
893 });
894
895 let heartbeat_count = Arc::new(AtomicUsize::new(0));
896 let heartbeat_count_clone = heartbeat_count.clone();
897 let handle = tokio::spawn(async move {
898 while let Some(_resp) = receiver.message().await.unwrap() {
899 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
900 }
901 });
902
903 handle.await.unwrap();
904 assert_eq!(
906 request_sent.load(Ordering::Relaxed) + 1,
907 heartbeat_count.load(Ordering::Relaxed)
908 );
909 }
910
911 #[tokio::test]
912 async fn test_range_get() {
913 let tc = new_client("test_range_get").await;
914 tc.gen_data().await;
915
916 let key = tc.key("key-0");
917 let req = RangeRequest::new().with_key(key.as_slice());
918 let res = tc.client.range(req).await;
919 let mut kvs = res.unwrap().take_kvs();
920 assert_eq!(1, kvs.len());
921 let mut kv = kvs.pop().unwrap();
922 assert_eq!(key, kv.take_key());
923 assert_eq!(b"value-0".to_vec(), kv.take_value());
924 }
925
926 #[tokio::test]
927 async fn test_range_get_prefix() {
928 let tc = new_client("test_range_get_prefix").await;
929 tc.gen_data().await;
930
931 let req = RangeRequest::new().with_prefix(tc.key("key-"));
932 let res = tc.client.range(req).await;
933 let kvs = res.unwrap().take_kvs();
934 assert_eq!(10, kvs.len());
935 for (i, mut kv) in kvs.into_iter().enumerate() {
936 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
937 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
938 }
939 }
940
941 #[tokio::test]
942 async fn test_range() {
943 let tc = new_client("test_range").await;
944 tc.gen_data().await;
945
946 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
947 let res = tc.client.range(req).await;
948 let kvs = res.unwrap().take_kvs();
949 assert_eq!(3, kvs.len());
950 for (i, mut kv) in kvs.into_iter().enumerate() {
951 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
952 assert_eq!(
953 format!("{}-{}", "value", i + 5).into_bytes(),
954 kv.take_value()
955 );
956 }
957 }
958
959 #[tokio::test]
960 async fn test_range_keys_only() {
961 let tc = new_client("test_range_keys_only").await;
962 tc.gen_data().await;
963
964 let req = RangeRequest::new()
965 .with_range(tc.key("key-5"), tc.key("key-8"))
966 .with_keys_only();
967 let res = tc.client.range(req).await;
968 let kvs = res.unwrap().take_kvs();
969 assert_eq!(3, kvs.len());
970 for (i, mut kv) in kvs.into_iter().enumerate() {
971 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
972 assert!(kv.take_value().is_empty());
973 }
974 }
975
976 #[tokio::test]
977 async fn test_put() {
978 let tc = new_client("test_put").await;
979
980 let req = PutRequest::new()
981 .with_key(tc.key("key"))
982 .with_value(b"value".to_vec());
983 let res = tc.client.put(req).await;
984 assert!(res.unwrap().prev_kv.is_none());
985 }
986
987 #[tokio::test]
988 async fn test_put_with_prev_kv() {
989 let tc = new_client("test_put_with_prev_kv").await;
990
991 let key = tc.key("key");
992 let req = PutRequest::new()
993 .with_key(key.as_slice())
994 .with_value(b"value".to_vec())
995 .with_prev_kv();
996 let res = tc.client.put(req).await;
997 assert!(res.unwrap().prev_kv.is_none());
998
999 let req = PutRequest::new()
1000 .with_key(key.as_slice())
1001 .with_value(b"value1".to_vec())
1002 .with_prev_kv();
1003 let res = tc.client.put(req).await;
1004 let mut kv = res.unwrap().prev_kv.unwrap();
1005 assert_eq!(key, kv.take_key());
1006 assert_eq!(b"value".to_vec(), kv.take_value());
1007 }
1008
1009 #[tokio::test]
1010 async fn test_batch_put() {
1011 let tc = new_client("test_batch_put").await;
1012
1013 let mut req = BatchPutRequest::new();
1014 for i in 0..275 {
1015 req = req.add_kv(
1016 tc.key(&format!("key-{}", i)),
1017 format!("value-{}", i).into_bytes(),
1018 );
1019 }
1020
1021 let res = tc.client.batch_put(req).await;
1022 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1023
1024 let req = RangeRequest::new().with_prefix(tc.key("key-"));
1025 let res = tc.client.range(req).await;
1026 let kvs = res.unwrap().take_kvs();
1027 assert_eq!(275, kvs.len());
1028 }
1029
1030 #[tokio::test]
1031 async fn test_batch_get() {
1032 let tc = new_client("test_batch_get").await;
1033 tc.gen_data().await;
1034
1035 let mut req = BatchGetRequest::default();
1036 for i in 0..256 {
1037 req = req.add_key(tc.key(&format!("key-{}", i)));
1038 }
1039 let res = tc.client.batch_get(req).await.unwrap();
1040 assert_eq!(10, res.kvs.len());
1041
1042 let req = BatchGetRequest::default()
1043 .add_key(tc.key("key-1"))
1044 .add_key(tc.key("key-999"));
1045 let res = tc.client.batch_get(req).await.unwrap();
1046 assert_eq!(1, res.kvs.len());
1047 }
1048
1049 #[tokio::test]
1050 async fn test_batch_put_with_prev_kv() {
1051 let tc = new_client("test_batch_put_with_prev_kv").await;
1052
1053 let key = tc.key("key");
1054 let key2 = tc.key("key2");
1055 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1056 let res = tc.client.batch_put(req).await;
1057 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1058
1059 let req = BatchPutRequest::new()
1060 .add_kv(key.as_slice(), b"value-".to_vec())
1061 .add_kv(key2.as_slice(), b"value2-".to_vec())
1062 .with_prev_kv();
1063 let res = tc.client.batch_put(req).await;
1064 let mut kvs = res.unwrap().take_prev_kvs();
1065 assert_eq!(1, kvs.len());
1066 let mut kv = kvs.pop().unwrap();
1067 assert_eq!(key, kv.take_key());
1068 assert_eq!(b"value".to_vec(), kv.take_value());
1069 }
1070
1071 #[tokio::test]
1072 async fn test_compare_and_put() {
1073 let tc = new_client("test_compare_and_put").await;
1074
1075 let key = tc.key("key");
1076 let req = CompareAndPutRequest::new()
1077 .with_key(key.as_slice())
1078 .with_expect(b"expect".to_vec())
1079 .with_value(b"value".to_vec());
1080 let res = tc.client.compare_and_put(req).await;
1081 assert!(!res.unwrap().is_success());
1082
1083 let req = CompareAndPutRequest::new()
1085 .with_key(key.as_slice())
1086 .with_value(b"value".to_vec());
1087 let res = tc.client.compare_and_put(req).await;
1088 let mut res = res.unwrap();
1089 assert!(res.is_success());
1090 assert!(res.take_prev_kv().is_none());
1091
1092 let req = CompareAndPutRequest::new()
1094 .with_key(key.as_slice())
1095 .with_expect(b"not_eq".to_vec())
1096 .with_value(b"value2".to_vec());
1097 let res = tc.client.compare_and_put(req).await;
1098 let mut res = res.unwrap();
1099 assert!(!res.is_success());
1100 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1101
1102 let req = CompareAndPutRequest::new()
1104 .with_key(key.as_slice())
1105 .with_expect(b"value".to_vec())
1106 .with_value(b"value2".to_vec());
1107 let res = tc.client.compare_and_put(req).await;
1108 let mut res = res.unwrap();
1109 assert!(res.is_success());
1110
1111 assert!(res.take_prev_kv().is_none());
1113 }
1114
1115 #[tokio::test]
1116 async fn test_delete_with_key() {
1117 let tc = new_client("test_delete_with_key").await;
1118 tc.gen_data().await;
1119
1120 let req = DeleteRangeRequest::new()
1121 .with_key(tc.key("key-0"))
1122 .with_prev_kv();
1123 let res = tc.client.delete_range(req).await;
1124 let mut res = res.unwrap();
1125 assert_eq!(1, res.deleted());
1126 let mut kvs = res.take_prev_kvs();
1127 assert_eq!(1, kvs.len());
1128 let mut kv = kvs.pop().unwrap();
1129 assert_eq!(b"value-0".to_vec(), kv.take_value());
1130 }
1131
1132 #[tokio::test]
1133 async fn test_delete_with_prefix() {
1134 let tc = new_client("test_delete_with_prefix").await;
1135 tc.gen_data().await;
1136
1137 let req = DeleteRangeRequest::new()
1138 .with_prefix(tc.key("key-"))
1139 .with_prev_kv();
1140 let res = tc.client.delete_range(req).await;
1141 let mut res = res.unwrap();
1142 assert_eq!(10, res.deleted());
1143 let kvs = res.take_prev_kvs();
1144 assert_eq!(10, kvs.len());
1145 for (i, mut kv) in kvs.into_iter().enumerate() {
1146 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1147 }
1148 }
1149
1150 #[tokio::test]
1151 async fn test_delete_with_range() {
1152 let tc = new_client("test_delete_with_range").await;
1153 tc.gen_data().await;
1154
1155 let req = DeleteRangeRequest::new()
1156 .with_range(tc.key("key-2"), tc.key("key-7"))
1157 .with_prev_kv();
1158 let res = tc.client.delete_range(req).await;
1159 let mut res = res.unwrap();
1160 assert_eq!(5, res.deleted());
1161 let kvs = res.take_prev_kvs();
1162 assert_eq!(5, kvs.len());
1163 for (i, mut kv) in kvs.into_iter().enumerate() {
1164 assert_eq!(
1165 format!("{}-{}", "value", i + 2).into_bytes(),
1166 kv.take_value()
1167 );
1168 }
1169 }
1170
1171 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1172 Ok(())
1173 }
1174
1175 #[tokio::test]
1176 async fn test_cluster_client_adaptive_range() {
1177 let tx = new_client("test_cluster_client").await;
1178 let in_memory = tx.in_memory().unwrap();
1179 let cluster_client = tx.client.cluster_client().unwrap();
1180 let mut rng = rand::rng();
1181
1182 for i in 0..10 {
1184 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1185 in_memory
1186 .put(
1187 PutRequest::new()
1188 .with_key(format!("__prefix/{i}").as_bytes())
1189 .with_value(data.clone()),
1190 )
1191 .await
1192 .unwrap();
1193 }
1194
1195 let req = RangeRequest::new().with_prefix(b"__prefix/");
1196 let stream =
1197 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1198
1199 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1200 assert_eq!(10, res.len());
1201 }
1202}