1mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{
28 MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
29};
30pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
31use cluster::Client as ClusterClient;
32pub use cluster::ClusterKvBackend;
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
35use common_meta::cluster::{
36 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
37};
38use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
39use common_meta::error::{
40 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
41};
42use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
43use common_meta::kv_backend::KvBackendRef;
44use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
45use common_meta::range_stream::PaginationStream;
46use common_meta::rpc::KeyValue;
47use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
48use common_meta::rpc::procedure::{
49 AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
50 MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
51 RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
52};
53use common_meta::rpc::store::{
54 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
55 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
56 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
57};
58use common_telemetry::info;
59use futures::TryStreamExt;
60use heartbeat::Client as HeartbeatClient;
61use procedure::Client as ProcedureClient;
62use snafu::{OptionExt, ResultExt};
63use store::Client as StoreClient;
64
65pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
66use crate::error::{
67 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
68 Result,
69};
70
71pub type Id = u64;
72
73const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
74const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
75const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
76
77#[derive(Clone, Debug, Default)]
78pub struct MetaClientBuilder {
79 id: Id,
80 role: Role,
81 enable_heartbeat: bool,
82 enable_store: bool,
83 enable_procedure: bool,
84 enable_access_cluster_info: bool,
85 region_follower: Option<RegionFollowerClientRef>,
86 channel_manager: Option<ChannelManager>,
87 ddl_channel_manager: Option<ChannelManager>,
88 heartbeat_channel_manager: Option<ChannelManager>,
89}
90
91impl MetaClientBuilder {
92 pub fn new(member_id: u64, role: Role) -> Self {
93 Self {
94 id: member_id,
95 role,
96 ..Default::default()
97 }
98 }
99
100 pub fn frontend_default_options() -> Self {
102 Self::new(0, Role::Frontend)
104 .enable_store()
105 .enable_heartbeat()
106 .enable_procedure()
107 .enable_access_cluster_info()
108 }
109
110 pub fn datanode_default_options(member_id: u64) -> Self {
112 Self::new(member_id, Role::Datanode)
113 .enable_store()
114 .enable_heartbeat()
115 }
116
117 pub fn flownode_default_options(member_id: u64) -> Self {
119 Self::new(member_id, Role::Flownode)
120 .enable_store()
121 .enable_heartbeat()
122 .enable_procedure()
123 .enable_access_cluster_info()
124 }
125
126 pub fn enable_heartbeat(self) -> Self {
127 Self {
128 enable_heartbeat: true,
129 ..self
130 }
131 }
132
133 pub fn enable_store(self) -> Self {
134 Self {
135 enable_store: true,
136 ..self
137 }
138 }
139
140 pub fn enable_procedure(self) -> Self {
141 Self {
142 enable_procedure: true,
143 ..self
144 }
145 }
146
147 pub fn enable_access_cluster_info(self) -> Self {
148 Self {
149 enable_access_cluster_info: true,
150 ..self
151 }
152 }
153
154 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
155 Self {
156 channel_manager: Some(channel_manager),
157 ..self
158 }
159 }
160
161 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
162 Self {
163 ddl_channel_manager: Some(channel_manager),
164 ..self
165 }
166 }
167
168 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
169 Self {
170 heartbeat_channel_manager: Some(channel_manager),
171 ..self
172 }
173 }
174
175 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
176 Self {
177 region_follower: Some(region_follower),
178 ..self
179 }
180 }
181
182 pub fn build(self) -> MetaClient {
183 let mut client = if let Some(mgr) = self.channel_manager {
184 MetaClient::with_channel_manager(self.id, mgr)
185 } else {
186 MetaClient::new(self.id)
187 };
188
189 let mgr = client.channel_manager.clone();
190
191 if self.enable_heartbeat {
192 let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
193 client.heartbeat = Some(HeartbeatClient::new(
194 self.id,
195 self.role,
196 mgr,
197 DEFAULT_ASK_LEADER_MAX_RETRY,
198 ));
199 }
200
201 if self.enable_store {
202 client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
203 }
204
205 if self.enable_procedure {
206 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
207 client.procedure = Some(ProcedureClient::new(
208 self.id,
209 self.role,
210 mgr,
211 DEFAULT_SUBMIT_DDL_MAX_RETRY,
212 ));
213 }
214
215 if self.enable_access_cluster_info {
216 client.cluster = Some(ClusterClient::new(
217 self.id,
218 self.role,
219 mgr,
220 DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
221 ))
222 }
223
224 if let Some(region_follower) = self.region_follower {
225 client.region_follower = Some(region_follower);
226 }
227
228 client
229 }
230}
231
232#[derive(Debug, Default)]
233pub struct MetaClient {
234 id: Id,
235 channel_manager: ChannelManager,
236 heartbeat: Option<HeartbeatClient>,
237 store: Option<StoreClient>,
238 procedure: Option<ProcedureClient>,
239 cluster: Option<ClusterClient>,
240 region_follower: Option<RegionFollowerClientRef>,
241}
242
243pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
244
245#[async_trait::async_trait]
247pub trait RegionFollowerClient: Sync + Send + Debug {
248 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
249
250 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
251
252 async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
253
254 async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
255
256 async fn start(&self, urls: &[&str]) -> Result<()>;
257
258 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
259}
260
261#[async_trait::async_trait]
262impl ProcedureExecutor for MetaClient {
263 async fn submit_ddl_task(
264 &self,
265 _ctx: &ExecutorContext,
266 request: SubmitDdlTaskRequest,
267 ) -> MetaResult<SubmitDdlTaskResponse> {
268 self.submit_ddl_task(request)
269 .await
270 .map_err(BoxedError::new)
271 .context(meta_error::ExternalSnafu)
272 }
273
274 async fn migrate_region(
275 &self,
276 _ctx: &ExecutorContext,
277 request: MigrateRegionRequest,
278 ) -> MetaResult<MigrateRegionResponse> {
279 self.migrate_region(request)
280 .await
281 .map_err(BoxedError::new)
282 .context(meta_error::ExternalSnafu)
283 }
284
285 async fn reconcile(
286 &self,
287 _ctx: &ExecutorContext,
288 request: ReconcileRequest,
289 ) -> MetaResult<ReconcileResponse> {
290 self.reconcile(request)
291 .await
292 .map_err(BoxedError::new)
293 .context(meta_error::ExternalSnafu)
294 }
295
296 async fn manage_region_follower(
297 &self,
298 _ctx: &ExecutorContext,
299 request: ManageRegionFollowerRequest,
300 ) -> MetaResult<()> {
301 if let Some(region_follower) = &self.region_follower {
302 match request {
303 ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
304 region_follower
305 .add_region_follower(add_region_follower_request)
306 .await
307 }
308 ManageRegionFollowerRequest::RemoveRegionFollower(
309 remove_region_follower_request,
310 ) => {
311 region_follower
312 .remove_region_follower(remove_region_follower_request)
313 .await
314 }
315 ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
316 region_follower
317 .add_table_follower(add_table_follower_request)
318 .await
319 }
320 ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
321 region_follower
322 .remove_table_follower(remove_table_follower_request)
323 .await
324 }
325 }
326 .map_err(BoxedError::new)
327 .context(meta_error::ExternalSnafu)
328 } else {
329 UnsupportedSnafu {
330 operation: "manage_region_follower",
331 }
332 .fail()
333 }
334 }
335
336 async fn query_procedure_state(
337 &self,
338 _ctx: &ExecutorContext,
339 pid: &str,
340 ) -> MetaResult<ProcedureStateResponse> {
341 self.query_procedure_state(pid)
342 .await
343 .map_err(BoxedError::new)
344 .context(meta_error::ExternalSnafu)
345 }
346
347 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
348 self.procedure_client()
349 .map_err(BoxedError::new)
350 .context(meta_error::ExternalSnafu)?
351 .list_procedures()
352 .await
353 .map_err(BoxedError::new)
354 .context(meta_error::ExternalSnafu)
355 }
356}
357
358#[allow(deprecated)]
360#[async_trait::async_trait]
361impl ClusterInfo for MetaClient {
362 type Error = Error;
363
364 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
365 let cluster_client = self.cluster_client()?;
366
367 let (get_metasrv_nodes, nodes_key_prefix) = match role {
368 None => (true, Some(NodeInfoKey::key_prefix())),
369 Some(ClusterRole::Metasrv) => (true, None),
370 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
371 };
372
373 let mut nodes = if get_metasrv_nodes {
374 let last_activity_ts = -1; let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
377 cluster_client.get_metasrv_peers().await?;
378 followers
379 .into_iter()
380 .map(|node| {
381 if let Some(node_info) = node.info {
382 NodeInfo {
383 peer: node.peer.unwrap_or_default(),
384 last_activity_ts,
385 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
386 version: node_info.version,
387 git_commit: node_info.git_commit,
388 start_time_ms: node_info.start_time_ms,
389 total_cpu_millicores: node_info.total_cpu_millicores,
390 total_memory_bytes: node_info.total_memory_bytes,
391 cpu_usage_millicores: node_info.cpu_usage_millicores,
392 memory_usage_bytes: node_info.memory_usage_bytes,
393 hostname: node_info.hostname,
394 }
395 } else {
396 NodeInfo {
398 peer: node.peer.unwrap_or_default(),
399 last_activity_ts,
400 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
401 version: node.version,
402 git_commit: node.git_commit,
403 start_time_ms: node.start_time_ms,
404 total_cpu_millicores: node.cpus as i64,
405 total_memory_bytes: node.memory_bytes as i64,
406 cpu_usage_millicores: 0,
407 memory_usage_bytes: 0,
408 hostname: "".to_string(),
409 }
410 }
411 })
412 .chain(leader.into_iter().map(|node| {
413 if let Some(node_info) = node.info {
414 NodeInfo {
415 peer: node.peer.unwrap_or_default(),
416 last_activity_ts,
417 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
418 version: node_info.version,
419 git_commit: node_info.git_commit,
420 start_time_ms: node_info.start_time_ms,
421 total_cpu_millicores: node_info.total_cpu_millicores,
422 total_memory_bytes: node_info.total_memory_bytes,
423 cpu_usage_millicores: node_info.cpu_usage_millicores,
424 memory_usage_bytes: node_info.memory_usage_bytes,
425 hostname: node_info.hostname,
426 }
427 } else {
428 NodeInfo {
430 peer: node.peer.unwrap_or_default(),
431 last_activity_ts,
432 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
433 version: node.version,
434 git_commit: node.git_commit,
435 start_time_ms: node.start_time_ms,
436 total_cpu_millicores: node.cpus as i64,
437 total_memory_bytes: node.memory_bytes as i64,
438 cpu_usage_millicores: 0,
439 memory_usage_bytes: 0,
440 hostname: "".to_string(),
441 }
442 }
443 }))
444 .collect::<Vec<_>>()
445 } else {
446 Vec::new()
447 };
448
449 if let Some(prefix) = nodes_key_prefix {
450 let req = RangeRequest::new().with_prefix(prefix);
451 let res = cluster_client.range(req).await?;
452 for kv in res.kvs {
453 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
454 }
455 }
456
457 Ok(nodes)
458 }
459
460 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
461 let cluster_kv_backend = Arc::new(self.cluster_client()?);
462 let range_prefix = DatanodeStatKey::prefix_key();
463 let req = RangeRequest::new().with_prefix(range_prefix);
464 let stream =
465 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
466 let mut datanode_stats = stream
467 .try_collect::<Vec<_>>()
468 .await
469 .context(ConvertMetaResponseSnafu)?;
470 let region_stats = datanode_stats
471 .iter_mut()
472 .flat_map(|datanode_stat| {
473 let last = datanode_stat.stats.pop();
474 last.map(|stat| stat.region_stats).unwrap_or_default()
475 })
476 .collect::<Vec<_>>();
477
478 Ok(region_stats)
479 }
480
481 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
482 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
483 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
484 let flow_state_manager = FlowStateManager::new(cluster_backend);
485 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
486
487 Ok(res.map(|r| r.into()))
488 }
489}
490
491fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
492 DatanodeStatValue::try_from(kv.value)
493 .map_err(BoxedError::new)
494 .context(ExternalSnafu)
495}
496
497impl MetaClient {
498 pub fn new(id: Id) -> Self {
499 Self {
500 id,
501 ..Default::default()
502 }
503 }
504
505 pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
506 Self {
507 id,
508 channel_manager,
509 ..Default::default()
510 }
511 }
512
513 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
514 where
515 U: AsRef<str>,
516 A: AsRef<[U]> + Clone,
517 {
518 info!("MetaClient channel config: {:?}", self.channel_config());
519
520 if let Some(client) = &mut self.region_follower {
521 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
522 client.start(&urls).await?;
523 info!("Region follower client started");
524 }
525 if let Some(client) = &mut self.heartbeat {
526 client.start(urls.clone()).await?;
527 info!("Heartbeat client started");
528 }
529 if let Some(client) = &mut self.store {
530 client.start(urls.clone()).await?;
531 info!("Store client started");
532 }
533 if let Some(client) = &mut self.procedure {
534 client.start(urls.clone()).await?;
535 info!("DDL client started");
536 }
537 if let Some(client) = &mut self.cluster {
538 client.start(urls).await?;
539 info!("Cluster client started");
540 }
541
542 Ok(())
543 }
544
545 pub(crate) async fn start_with<U, A>(
547 &mut self,
548 leader_provider: LeaderProviderRef,
549 peers: A,
550 ) -> Result<()>
551 where
552 U: AsRef<str>,
553 A: AsRef<[U]> + Clone,
554 {
555 if let Some(client) = &self.region_follower {
556 info!("Starting region follower client ...");
557 client.start_with(leader_provider.clone()).await?;
558 }
559
560 if let Some(client) = &self.heartbeat {
561 info!("Starting heartbeat client ...");
562 client.start_with(leader_provider.clone()).await?;
563 }
564
565 if let Some(client) = &mut self.store {
566 info!("Starting store client ...");
567 client.start(peers.clone()).await?;
568 }
569
570 if let Some(client) = &self.procedure {
571 info!("Starting procedure client ...");
572 client.start_with(leader_provider.clone()).await?;
573 }
574
575 if let Some(client) = &mut self.cluster {
576 info!("Starting cluster client ...");
577 client.start_with(leader_provider).await?;
578 }
579 Ok(())
580 }
581
582 pub async fn ask_leader(&self) -> Result<String> {
585 self.heartbeat_client()?.ask_leader().await
586 }
587
588 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
595 self.heartbeat_client()?.heartbeat().await
596 }
597
598 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
600 self.store_client()?
601 .range(req.into())
602 .await?
603 .try_into()
604 .context(ConvertMetaResponseSnafu)
605 }
606
607 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
609 self.store_client()?
610 .put(req.into())
611 .await?
612 .try_into()
613 .context(ConvertMetaResponseSnafu)
614 }
615
616 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
618 self.store_client()?
619 .batch_get(req.into())
620 .await?
621 .try_into()
622 .context(ConvertMetaResponseSnafu)
623 }
624
625 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
627 self.store_client()?
628 .batch_put(req.into())
629 .await?
630 .try_into()
631 .context(ConvertMetaResponseSnafu)
632 }
633
634 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
636 self.store_client()?
637 .batch_delete(req.into())
638 .await?
639 .try_into()
640 .context(ConvertMetaResponseSnafu)
641 }
642
643 pub async fn compare_and_put(
646 &self,
647 req: CompareAndPutRequest,
648 ) -> Result<CompareAndPutResponse> {
649 self.store_client()?
650 .compare_and_put(req.into())
651 .await?
652 .try_into()
653 .context(ConvertMetaResponseSnafu)
654 }
655
656 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
658 self.store_client()?
659 .delete_range(req.into())
660 .await?
661 .try_into()
662 .context(ConvertMetaResponseSnafu)
663 }
664
665 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
667 self.procedure_client()?.query_procedure_state(pid).await
668 }
669
670 pub async fn migrate_region(
672 &self,
673 request: MigrateRegionRequest,
674 ) -> Result<MigrateRegionResponse> {
675 self.procedure_client()?
676 .migrate_region(
677 request.region_id,
678 request.from_peer,
679 request.to_peer,
680 request.timeout,
681 )
682 .await
683 }
684
685 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
687 self.procedure_client()?.reconcile(request).await
688 }
689
690 pub async fn submit_ddl_task(
692 &self,
693 req: SubmitDdlTaskRequest,
694 ) -> Result<SubmitDdlTaskResponse> {
695 let res = self
696 .procedure_client()?
697 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
698 .await?
699 .try_into()
700 .context(ConvertMetaResponseSnafu)?;
701
702 Ok(res)
703 }
704
705 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
706 self.heartbeat.clone().context(NotStartedSnafu {
707 name: "heartbeat_client",
708 })
709 }
710
711 pub fn store_client(&self) -> Result<StoreClient> {
712 self.store.clone().context(NotStartedSnafu {
713 name: "store_client",
714 })
715 }
716
717 pub fn procedure_client(&self) -> Result<ProcedureClient> {
718 self.procedure.clone().context(NotStartedSnafu {
719 name: "procedure_client",
720 })
721 }
722
723 pub fn cluster_client(&self) -> Result<ClusterClient> {
724 self.cluster.clone().context(NotStartedSnafu {
725 name: "cluster_client",
726 })
727 }
728
729 pub fn channel_config(&self) -> &ChannelConfig {
730 self.channel_manager.config()
731 }
732
733 pub fn id(&self) -> Id {
734 self.id
735 }
736}
737
738#[cfg(test)]
739mod tests {
740 use std::sync::atomic::{AtomicUsize, Ordering};
741
742 use api::v1::meta::{HeartbeatRequest, Peer};
743 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
744 use rand::Rng;
745
746 use super::*;
747 use crate::error;
748 use crate::mocks::{self, MockMetaContext};
749
750 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
751
752 struct TestClient {
753 ns: String,
754 client: MetaClient,
755 meta_ctx: MockMetaContext,
756 }
757
758 impl TestClient {
759 async fn new(ns: impl Into<String>) -> Self {
760 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
762 Self {
763 ns: ns.into(),
764 client,
765 meta_ctx,
766 }
767 }
768
769 fn key(&self, name: &str) -> Vec<u8> {
770 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
771 }
772
773 async fn gen_data(&self) {
774 for i in 0..10 {
775 let req = PutRequest::new()
776 .with_key(self.key(&format!("key-{i}")))
777 .with_value(format!("{}-{}", "value", i).into_bytes())
778 .with_prev_kv();
779 let res = self.client.put(req).await;
780 let _ = res.unwrap();
781 }
782 }
783
784 async fn clear_data(&self) {
785 let req =
786 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
787 let res = self.client.delete_range(req).await;
788 let _ = res.unwrap();
789 }
790
791 #[allow(dead_code)]
792 fn kv_backend(&self) -> KvBackendRef {
793 self.meta_ctx.kv_backend.clone()
794 }
795
796 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
797 self.meta_ctx.in_memory.clone()
798 }
799 }
800
801 async fn new_client(ns: impl Into<String>) -> TestClient {
802 let client = TestClient::new(ns).await;
803 client.clear_data().await;
804 client
805 }
806
807 #[tokio::test]
808 async fn test_meta_client_builder() {
809 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
810
811 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
812 .enable_heartbeat()
813 .build();
814 let _ = meta_client.heartbeat_client().unwrap();
815 assert!(meta_client.store_client().is_err());
816 meta_client.start(urls).await.unwrap();
817
818 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
819 assert!(meta_client.heartbeat_client().is_err());
820 assert!(meta_client.store_client().is_err());
821 meta_client.start(urls).await.unwrap();
822
823 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
824 .enable_store()
825 .build();
826 assert!(meta_client.heartbeat_client().is_err());
827 let _ = meta_client.store_client().unwrap();
828 meta_client.start(urls).await.unwrap();
829
830 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
831 .enable_heartbeat()
832 .enable_store()
833 .build();
834 assert_eq!(2, meta_client.id());
835 assert_eq!(2, meta_client.id());
836 let _ = meta_client.heartbeat_client().unwrap();
837 let _ = meta_client.store_client().unwrap();
838 meta_client.start(urls).await.unwrap();
839 }
840
841 #[tokio::test]
842 async fn test_not_start_heartbeat_client() {
843 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
844 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
845 .enable_store()
846 .build();
847 meta_client.start(urls).await.unwrap();
848 let res = meta_client.ask_leader().await;
849 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
850 }
851
852 #[tokio::test]
853 async fn test_not_start_store_client() {
854 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
855 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
856 .enable_heartbeat()
857 .build();
858
859 meta_client.start(urls).await.unwrap();
860 let res = meta_client.put(PutRequest::default()).await;
861 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
862 }
863
864 #[tokio::test]
865 async fn test_ask_leader() {
866 let tc = new_client("test_ask_leader").await;
867 tc.client.ask_leader().await.unwrap();
868 }
869
870 #[tokio::test]
871 async fn test_heartbeat() {
872 let tc = new_client("test_heartbeat").await;
873 let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
874 let request_sent = Arc::new(AtomicUsize::new(0));
877 let request_sent_clone = request_sent.clone();
878 let _handle = tokio::spawn(async move {
879 for _ in 0..5 {
880 let req = HeartbeatRequest {
881 peer: Some(Peer {
882 id: 1,
883 addr: "meta_client_peer".to_string(),
884 }),
885 ..Default::default()
886 };
887 sender.send(req).await.unwrap();
888 request_sent_clone.fetch_add(1, Ordering::Relaxed);
889 }
890 });
891
892 let heartbeat_count = Arc::new(AtomicUsize::new(0));
893 let heartbeat_count_clone = heartbeat_count.clone();
894 let handle = tokio::spawn(async move {
895 while let Some(_resp) = receiver.message().await.unwrap() {
896 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
897 }
898 });
899
900 handle.await.unwrap();
901 assert_eq!(
903 request_sent.load(Ordering::Relaxed) + 1,
904 heartbeat_count.load(Ordering::Relaxed)
905 );
906 }
907
908 #[tokio::test]
909 async fn test_range_get() {
910 let tc = new_client("test_range_get").await;
911 tc.gen_data().await;
912
913 let key = tc.key("key-0");
914 let req = RangeRequest::new().with_key(key.as_slice());
915 let res = tc.client.range(req).await;
916 let mut kvs = res.unwrap().take_kvs();
917 assert_eq!(1, kvs.len());
918 let mut kv = kvs.pop().unwrap();
919 assert_eq!(key, kv.take_key());
920 assert_eq!(b"value-0".to_vec(), kv.take_value());
921 }
922
923 #[tokio::test]
924 async fn test_range_get_prefix() {
925 let tc = new_client("test_range_get_prefix").await;
926 tc.gen_data().await;
927
928 let req = RangeRequest::new().with_prefix(tc.key("key-"));
929 let res = tc.client.range(req).await;
930 let kvs = res.unwrap().take_kvs();
931 assert_eq!(10, kvs.len());
932 for (i, mut kv) in kvs.into_iter().enumerate() {
933 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
934 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
935 }
936 }
937
938 #[tokio::test]
939 async fn test_range() {
940 let tc = new_client("test_range").await;
941 tc.gen_data().await;
942
943 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
944 let res = tc.client.range(req).await;
945 let kvs = res.unwrap().take_kvs();
946 assert_eq!(3, kvs.len());
947 for (i, mut kv) in kvs.into_iter().enumerate() {
948 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
949 assert_eq!(
950 format!("{}-{}", "value", i + 5).into_bytes(),
951 kv.take_value()
952 );
953 }
954 }
955
956 #[tokio::test]
957 async fn test_range_keys_only() {
958 let tc = new_client("test_range_keys_only").await;
959 tc.gen_data().await;
960
961 let req = RangeRequest::new()
962 .with_range(tc.key("key-5"), tc.key("key-8"))
963 .with_keys_only();
964 let res = tc.client.range(req).await;
965 let kvs = res.unwrap().take_kvs();
966 assert_eq!(3, kvs.len());
967 for (i, mut kv) in kvs.into_iter().enumerate() {
968 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
969 assert!(kv.take_value().is_empty());
970 }
971 }
972
973 #[tokio::test]
974 async fn test_put() {
975 let tc = new_client("test_put").await;
976
977 let req = PutRequest::new()
978 .with_key(tc.key("key"))
979 .with_value(b"value".to_vec());
980 let res = tc.client.put(req).await;
981 assert!(res.unwrap().prev_kv.is_none());
982 }
983
984 #[tokio::test]
985 async fn test_put_with_prev_kv() {
986 let tc = new_client("test_put_with_prev_kv").await;
987
988 let key = tc.key("key");
989 let req = PutRequest::new()
990 .with_key(key.as_slice())
991 .with_value(b"value".to_vec())
992 .with_prev_kv();
993 let res = tc.client.put(req).await;
994 assert!(res.unwrap().prev_kv.is_none());
995
996 let req = PutRequest::new()
997 .with_key(key.as_slice())
998 .with_value(b"value1".to_vec())
999 .with_prev_kv();
1000 let res = tc.client.put(req).await;
1001 let mut kv = res.unwrap().prev_kv.unwrap();
1002 assert_eq!(key, kv.take_key());
1003 assert_eq!(b"value".to_vec(), kv.take_value());
1004 }
1005
1006 #[tokio::test]
1007 async fn test_batch_put() {
1008 let tc = new_client("test_batch_put").await;
1009
1010 let mut req = BatchPutRequest::new();
1011 for i in 0..275 {
1012 req = req.add_kv(
1013 tc.key(&format!("key-{}", i)),
1014 format!("value-{}", i).into_bytes(),
1015 );
1016 }
1017
1018 let res = tc.client.batch_put(req).await;
1019 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1020
1021 let req = RangeRequest::new().with_prefix(tc.key("key-"));
1022 let res = tc.client.range(req).await;
1023 let kvs = res.unwrap().take_kvs();
1024 assert_eq!(275, kvs.len());
1025 }
1026
1027 #[tokio::test]
1028 async fn test_batch_get() {
1029 let tc = new_client("test_batch_get").await;
1030 tc.gen_data().await;
1031
1032 let mut req = BatchGetRequest::default();
1033 for i in 0..256 {
1034 req = req.add_key(tc.key(&format!("key-{}", i)));
1035 }
1036 let res = tc.client.batch_get(req).await.unwrap();
1037 assert_eq!(10, res.kvs.len());
1038
1039 let req = BatchGetRequest::default()
1040 .add_key(tc.key("key-1"))
1041 .add_key(tc.key("key-999"));
1042 let res = tc.client.batch_get(req).await.unwrap();
1043 assert_eq!(1, res.kvs.len());
1044 }
1045
1046 #[tokio::test]
1047 async fn test_batch_put_with_prev_kv() {
1048 let tc = new_client("test_batch_put_with_prev_kv").await;
1049
1050 let key = tc.key("key");
1051 let key2 = tc.key("key2");
1052 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1053 let res = tc.client.batch_put(req).await;
1054 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1055
1056 let req = BatchPutRequest::new()
1057 .add_kv(key.as_slice(), b"value-".to_vec())
1058 .add_kv(key2.as_slice(), b"value2-".to_vec())
1059 .with_prev_kv();
1060 let res = tc.client.batch_put(req).await;
1061 let mut kvs = res.unwrap().take_prev_kvs();
1062 assert_eq!(1, kvs.len());
1063 let mut kv = kvs.pop().unwrap();
1064 assert_eq!(key, kv.take_key());
1065 assert_eq!(b"value".to_vec(), kv.take_value());
1066 }
1067
1068 #[tokio::test]
1069 async fn test_compare_and_put() {
1070 let tc = new_client("test_compare_and_put").await;
1071
1072 let key = tc.key("key");
1073 let req = CompareAndPutRequest::new()
1074 .with_key(key.as_slice())
1075 .with_expect(b"expect".to_vec())
1076 .with_value(b"value".to_vec());
1077 let res = tc.client.compare_and_put(req).await;
1078 assert!(!res.unwrap().is_success());
1079
1080 let req = CompareAndPutRequest::new()
1082 .with_key(key.as_slice())
1083 .with_value(b"value".to_vec());
1084 let res = tc.client.compare_and_put(req).await;
1085 let mut res = res.unwrap();
1086 assert!(res.is_success());
1087 assert!(res.take_prev_kv().is_none());
1088
1089 let req = CompareAndPutRequest::new()
1091 .with_key(key.as_slice())
1092 .with_expect(b"not_eq".to_vec())
1093 .with_value(b"value2".to_vec());
1094 let res = tc.client.compare_and_put(req).await;
1095 let mut res = res.unwrap();
1096 assert!(!res.is_success());
1097 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1098
1099 let req = CompareAndPutRequest::new()
1101 .with_key(key.as_slice())
1102 .with_expect(b"value".to_vec())
1103 .with_value(b"value2".to_vec());
1104 let res = tc.client.compare_and_put(req).await;
1105 let mut res = res.unwrap();
1106 assert!(res.is_success());
1107
1108 assert!(res.take_prev_kv().is_none());
1110 }
1111
1112 #[tokio::test]
1113 async fn test_delete_with_key() {
1114 let tc = new_client("test_delete_with_key").await;
1115 tc.gen_data().await;
1116
1117 let req = DeleteRangeRequest::new()
1118 .with_key(tc.key("key-0"))
1119 .with_prev_kv();
1120 let res = tc.client.delete_range(req).await;
1121 let mut res = res.unwrap();
1122 assert_eq!(1, res.deleted());
1123 let mut kvs = res.take_prev_kvs();
1124 assert_eq!(1, kvs.len());
1125 let mut kv = kvs.pop().unwrap();
1126 assert_eq!(b"value-0".to_vec(), kv.take_value());
1127 }
1128
1129 #[tokio::test]
1130 async fn test_delete_with_prefix() {
1131 let tc = new_client("test_delete_with_prefix").await;
1132 tc.gen_data().await;
1133
1134 let req = DeleteRangeRequest::new()
1135 .with_prefix(tc.key("key-"))
1136 .with_prev_kv();
1137 let res = tc.client.delete_range(req).await;
1138 let mut res = res.unwrap();
1139 assert_eq!(10, res.deleted());
1140 let kvs = res.take_prev_kvs();
1141 assert_eq!(10, kvs.len());
1142 for (i, mut kv) in kvs.into_iter().enumerate() {
1143 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1144 }
1145 }
1146
1147 #[tokio::test]
1148 async fn test_delete_with_range() {
1149 let tc = new_client("test_delete_with_range").await;
1150 tc.gen_data().await;
1151
1152 let req = DeleteRangeRequest::new()
1153 .with_range(tc.key("key-2"), tc.key("key-7"))
1154 .with_prev_kv();
1155 let res = tc.client.delete_range(req).await;
1156 let mut res = res.unwrap();
1157 assert_eq!(5, res.deleted());
1158 let kvs = res.take_prev_kvs();
1159 assert_eq!(5, kvs.len());
1160 for (i, mut kv) in kvs.into_iter().enumerate() {
1161 assert_eq!(
1162 format!("{}-{}", "value", i + 2).into_bytes(),
1163 kv.take_value()
1164 );
1165 }
1166 }
1167
1168 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1169 Ok(())
1170 }
1171
1172 #[tokio::test]
1173 async fn test_cluster_client_adaptive_range() {
1174 let tx = new_client("test_cluster_client").await;
1175 let in_memory = tx.in_memory().unwrap();
1176 let cluster_client = tx.client.cluster_client().unwrap();
1177 let mut rng = rand::rng();
1178
1179 for i in 0..10 {
1181 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1182 in_memory
1183 .put(
1184 PutRequest::new()
1185 .with_key(format!("__prefix/{i}").as_bytes())
1186 .with_value(data.clone()),
1187 )
1188 .await
1189 .unwrap();
1190 }
1191
1192 let req = RangeRequest::new().with_prefix(b"__prefix/");
1193 let stream =
1194 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1195
1196 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1197 assert_eq!(10, res.len());
1198 }
1199}