1mod ask_leader;
16pub mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26use std::time::Duration;
27
28use api::v1::meta::{
29 MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
30};
31pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
32use cluster::Client as ClusterClient;
33pub use cluster::ClusterKvBackend;
34use common_error::ext::BoxedError;
35use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
36use common_meta::cluster::{
37 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
38};
39use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
40use common_meta::error::{
41 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
42};
43use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
44use common_meta::kv_backend::KvBackendRef;
45use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
46use common_meta::range_stream::PaginationStream;
47use common_meta::rpc::KeyValue;
48use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
49use common_meta::rpc::procedure::{
50 AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
51 MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
52 RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
53};
54use common_meta::rpc::store::{
55 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
56 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
57 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
58};
59use common_telemetry::info;
60use futures::TryStreamExt;
61use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
62use procedure::Client as ProcedureClient;
63use snafu::{OptionExt, ResultExt};
64use store::Client as StoreClient;
65
66pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
67use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
68use crate::error::{
69 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
70 Result,
71};
72
73pub type Id = u64;
74
75const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
76const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
77const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
78const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10);
79
80#[derive(Clone, Debug, Default)]
81pub struct MetaClientBuilder {
82 id: Id,
83 role: Role,
84 enable_heartbeat: bool,
85 enable_store: bool,
86 enable_procedure: bool,
87 enable_access_cluster_info: bool,
88 region_follower: Option<RegionFollowerClientRef>,
89 channel_manager: Option<ChannelManager>,
90 ddl_channel_manager: Option<ChannelManager>,
91 ddl_timeout: Option<Duration>,
93 heartbeat_channel_manager: Option<ChannelManager>,
94}
95
96impl MetaClientBuilder {
97 pub fn new(member_id: u64, role: Role) -> Self {
98 Self {
99 id: member_id,
100 role,
101 ..Default::default()
102 }
103 }
104
105 pub fn frontend_default_options() -> Self {
107 Self::new(0, Role::Frontend)
109 .enable_store()
110 .enable_heartbeat()
111 .enable_procedure()
112 .enable_access_cluster_info()
113 }
114
115 pub fn datanode_default_options(member_id: u64) -> Self {
117 Self::new(member_id, Role::Datanode)
118 .enable_store()
119 .enable_heartbeat()
120 }
121
122 pub fn flownode_default_options(member_id: u64) -> Self {
124 Self::new(member_id, Role::Flownode)
125 .enable_store()
126 .enable_heartbeat()
127 .enable_procedure()
128 .enable_access_cluster_info()
129 }
130
131 pub fn enable_heartbeat(self) -> Self {
132 Self {
133 enable_heartbeat: true,
134 ..self
135 }
136 }
137
138 pub fn enable_store(self) -> Self {
139 Self {
140 enable_store: true,
141 ..self
142 }
143 }
144
145 pub fn enable_procedure(self) -> Self {
146 Self {
147 enable_procedure: true,
148 ..self
149 }
150 }
151
152 pub fn enable_access_cluster_info(self) -> Self {
153 Self {
154 enable_access_cluster_info: true,
155 ..self
156 }
157 }
158
159 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
160 Self {
161 channel_manager: Some(channel_manager),
162 ..self
163 }
164 }
165
166 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
167 Self {
168 ddl_channel_manager: Some(channel_manager),
169 ..self
170 }
171 }
172
173 pub fn ddl_timeout(self, timeout: Duration) -> Self {
174 Self {
175 ddl_timeout: Some(timeout),
176 ..self
177 }
178 }
179
180 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
181 Self {
182 heartbeat_channel_manager: Some(channel_manager),
183 ..self
184 }
185 }
186
187 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
188 Self {
189 region_follower: Some(region_follower),
190 ..self
191 }
192 }
193
194 pub fn build(self) -> MetaClient {
195 let mgr = self.channel_manager.unwrap_or_default();
196 let heartbeat_channel_manager = self
197 .heartbeat_channel_manager
198 .clone()
199 .unwrap_or_else(|| mgr.clone());
200
201 let heartbeat = self.enable_heartbeat.then(|| {
202 if self.heartbeat_channel_manager.is_some() {
203 info!("Enable heartbeat channel using the heartbeat channel manager.");
204 }
205
206 HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
207 });
208 let store = self
209 .enable_store
210 .then(|| StoreClient::new(self.id, self.role, mgr.clone()));
211 let procedure = self.enable_procedure.then(|| {
212 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
213 ProcedureClient::new(
214 self.id,
215 self.role,
216 mgr,
217 DEFAULT_SUBMIT_DDL_MAX_RETRY,
218 self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT),
219 )
220 });
221 let cluster = self
222 .enable_access_cluster_info
223 .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
224 let region_follower = self.region_follower.clone();
225
226 MetaClient {
227 id: self.id,
228 channel_manager: mgr.clone(),
229 leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
230 self.id,
231 self.role,
232 DEFAULT_ASK_LEADER_MAX_RETRY,
233 heartbeat_channel_manager,
234 )),
235 heartbeat,
236 store,
237 procedure,
238 cluster,
239 region_follower,
240 }
241 }
242}
243
244#[derive(Debug)]
245pub struct MetaClient {
246 id: Id,
247 channel_manager: ChannelManager,
248 leader_provider_factory: LeaderProviderFactoryRef,
249 heartbeat: Option<HeartbeatClient>,
250 store: Option<StoreClient>,
251 procedure: Option<ProcedureClient>,
252 cluster: Option<ClusterClient>,
253 region_follower: Option<RegionFollowerClientRef>,
254}
255
256impl MetaClient {
257 pub fn new(id: Id, role: Role) -> Self {
258 Self {
259 id,
260 channel_manager: ChannelManager::default(),
261 leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
262 id,
263 role,
264 DEFAULT_ASK_LEADER_MAX_RETRY,
265 ChannelManager::default(),
266 )),
267 heartbeat: None,
268 store: None,
269 procedure: None,
270 cluster: None,
271 region_follower: None,
272 }
273 }
274}
275
276pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
277
278#[async_trait::async_trait]
280pub trait RegionFollowerClient: Sync + Send + Debug {
281 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
282
283 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
284
285 async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
286
287 async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
288
289 async fn start(&self, urls: &[&str]) -> Result<()>;
290
291 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
292}
293
294#[async_trait::async_trait]
295impl ProcedureExecutor for MetaClient {
296 async fn submit_ddl_task(
297 &self,
298 _ctx: &ExecutorContext,
299 request: SubmitDdlTaskRequest,
300 ) -> MetaResult<SubmitDdlTaskResponse> {
301 self.submit_ddl_task(request)
302 .await
303 .map_err(BoxedError::new)
304 .context(meta_error::ExternalSnafu)
305 }
306
307 async fn migrate_region(
308 &self,
309 _ctx: &ExecutorContext,
310 request: MigrateRegionRequest,
311 ) -> MetaResult<MigrateRegionResponse> {
312 self.migrate_region(request)
313 .await
314 .map_err(BoxedError::new)
315 .context(meta_error::ExternalSnafu)
316 }
317
318 async fn reconcile(
319 &self,
320 _ctx: &ExecutorContext,
321 request: ReconcileRequest,
322 ) -> MetaResult<ReconcileResponse> {
323 self.reconcile(request)
324 .await
325 .map_err(BoxedError::new)
326 .context(meta_error::ExternalSnafu)
327 }
328
329 async fn manage_region_follower(
330 &self,
331 _ctx: &ExecutorContext,
332 request: ManageRegionFollowerRequest,
333 ) -> MetaResult<()> {
334 if let Some(region_follower) = &self.region_follower {
335 match request {
336 ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
337 region_follower
338 .add_region_follower(add_region_follower_request)
339 .await
340 }
341 ManageRegionFollowerRequest::RemoveRegionFollower(
342 remove_region_follower_request,
343 ) => {
344 region_follower
345 .remove_region_follower(remove_region_follower_request)
346 .await
347 }
348 ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
349 region_follower
350 .add_table_follower(add_table_follower_request)
351 .await
352 }
353 ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
354 region_follower
355 .remove_table_follower(remove_table_follower_request)
356 .await
357 }
358 }
359 .map_err(BoxedError::new)
360 .context(meta_error::ExternalSnafu)
361 } else {
362 UnsupportedSnafu {
363 operation: "manage_region_follower",
364 }
365 .fail()
366 }
367 }
368
369 async fn query_procedure_state(
370 &self,
371 _ctx: &ExecutorContext,
372 pid: &str,
373 ) -> MetaResult<ProcedureStateResponse> {
374 self.query_procedure_state(pid)
375 .await
376 .map_err(BoxedError::new)
377 .context(meta_error::ExternalSnafu)
378 }
379
380 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
381 self.procedure_client()
382 .map_err(BoxedError::new)
383 .context(meta_error::ExternalSnafu)?
384 .list_procedures()
385 .await
386 .map_err(BoxedError::new)
387 .context(meta_error::ExternalSnafu)
388 }
389}
390
391#[allow(deprecated)]
393#[async_trait::async_trait]
394impl ClusterInfo for MetaClient {
395 type Error = Error;
396
397 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
398 let cluster_client = self.cluster_client()?;
399
400 let (get_metasrv_nodes, nodes_key_prefix) = match role {
401 None => (true, Some(NodeInfoKey::key_prefix())),
402 Some(ClusterRole::Metasrv) => (true, None),
403 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
404 };
405
406 let mut nodes = if get_metasrv_nodes {
407 let last_activity_ts = -1; let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
410 cluster_client.get_metasrv_peers().await?;
411 followers
412 .into_iter()
413 .map(|node| {
414 if let Some(node_info) = node.info {
415 NodeInfo {
416 peer: node.peer.unwrap_or_default(),
417 last_activity_ts,
418 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
419 version: node_info.version,
420 git_commit: node_info.git_commit,
421 start_time_ms: node_info.start_time_ms,
422 total_cpu_millicores: node_info.total_cpu_millicores,
423 total_memory_bytes: node_info.total_memory_bytes,
424 cpu_usage_millicores: node_info.cpu_usage_millicores,
425 memory_usage_bytes: node_info.memory_usage_bytes,
426 hostname: node_info.hostname,
427 }
428 } else {
429 NodeInfo {
431 peer: node.peer.unwrap_or_default(),
432 last_activity_ts,
433 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
434 version: node.version,
435 git_commit: node.git_commit,
436 start_time_ms: node.start_time_ms,
437 total_cpu_millicores: node.cpus as i64,
438 total_memory_bytes: node.memory_bytes as i64,
439 cpu_usage_millicores: 0,
440 memory_usage_bytes: 0,
441 hostname: "".to_string(),
442 }
443 }
444 })
445 .chain(leader.into_iter().map(|node| {
446 if let Some(node_info) = node.info {
447 NodeInfo {
448 peer: node.peer.unwrap_or_default(),
449 last_activity_ts,
450 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
451 version: node_info.version,
452 git_commit: node_info.git_commit,
453 start_time_ms: node_info.start_time_ms,
454 total_cpu_millicores: node_info.total_cpu_millicores,
455 total_memory_bytes: node_info.total_memory_bytes,
456 cpu_usage_millicores: node_info.cpu_usage_millicores,
457 memory_usage_bytes: node_info.memory_usage_bytes,
458 hostname: node_info.hostname,
459 }
460 } else {
461 NodeInfo {
463 peer: node.peer.unwrap_or_default(),
464 last_activity_ts,
465 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
466 version: node.version,
467 git_commit: node.git_commit,
468 start_time_ms: node.start_time_ms,
469 total_cpu_millicores: node.cpus as i64,
470 total_memory_bytes: node.memory_bytes as i64,
471 cpu_usage_millicores: 0,
472 memory_usage_bytes: 0,
473 hostname: "".to_string(),
474 }
475 }
476 }))
477 .collect::<Vec<_>>()
478 } else {
479 Vec::new()
480 };
481
482 if let Some(prefix) = nodes_key_prefix {
483 let req = RangeRequest::new().with_prefix(prefix);
484 let res = cluster_client.range(req).await?;
485 for kv in res.kvs {
486 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
487 }
488 }
489
490 Ok(nodes)
491 }
492
493 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
494 let cluster_kv_backend = Arc::new(self.cluster_client()?);
495 let range_prefix = DatanodeStatKey::prefix_key();
496 let req = RangeRequest::new().with_prefix(range_prefix);
497 let stream =
498 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
499 let mut datanode_stats = stream
500 .try_collect::<Vec<_>>()
501 .await
502 .context(ConvertMetaResponseSnafu)?;
503 let region_stats = datanode_stats
504 .iter_mut()
505 .flat_map(|datanode_stat| {
506 let last = datanode_stat.stats.pop();
507 last.map(|stat| stat.region_stats).unwrap_or_default()
508 })
509 .collect::<Vec<_>>();
510
511 Ok(region_stats)
512 }
513
514 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
515 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
516 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
517 let flow_state_manager = FlowStateManager::new(cluster_backend);
518 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
519
520 Ok(res.map(|r| r.into()))
521 }
522}
523
524fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
525 DatanodeStatValue::try_from(kv.value)
526 .map_err(BoxedError::new)
527 .context(ExternalSnafu)
528}
529
530impl MetaClient {
531 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
532 where
533 U: AsRef<str>,
534 A: AsRef<[U]> + Clone,
535 {
536 info!("MetaClient channel config: {:?}", self.channel_config());
537
538 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
539 let leader_provider = self.leader_provider_factory.create(&urls);
540
541 self.start_with(leader_provider, urls).await
542 }
543
544 pub(crate) async fn start_with<U, A>(
546 &mut self,
547 leader_provider: LeaderProviderRef,
548 peers: A,
549 ) -> Result<()>
550 where
551 U: AsRef<str>,
552 A: AsRef<[U]> + Clone,
553 {
554 if let Some(client) = &self.region_follower {
555 info!("Starting region follower client ...");
556 client.start_with(leader_provider.clone()).await?;
557 }
558
559 if let Some(client) = &self.heartbeat {
560 info!("Starting heartbeat client ...");
561 client.start_with(leader_provider.clone()).await?;
562 }
563
564 if let Some(client) = &mut self.store {
565 info!("Starting store client ...");
566 client.start(peers.clone()).await?;
567 }
568
569 if let Some(client) = &self.procedure {
570 info!("Starting procedure client ...");
571 client.start_with(leader_provider.clone()).await?;
572 }
573
574 if let Some(client) = &mut self.cluster {
575 info!("Starting cluster client ...");
576 client.start_with(leader_provider).await?;
577 }
578 Ok(())
579 }
580
581 pub async fn ask_leader(&self) -> Result<String> {
584 self.heartbeat_client()?.ask_leader().await
585 }
586
587 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
596 self.heartbeat_client()?.heartbeat().await
597 }
598
599 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
601 self.store_client()?
602 .range(req.into())
603 .await?
604 .try_into()
605 .context(ConvertMetaResponseSnafu)
606 }
607
608 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
610 self.store_client()?
611 .put(req.into())
612 .await?
613 .try_into()
614 .context(ConvertMetaResponseSnafu)
615 }
616
617 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
619 self.store_client()?
620 .batch_get(req.into())
621 .await?
622 .try_into()
623 .context(ConvertMetaResponseSnafu)
624 }
625
626 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
628 self.store_client()?
629 .batch_put(req.into())
630 .await?
631 .try_into()
632 .context(ConvertMetaResponseSnafu)
633 }
634
635 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
637 self.store_client()?
638 .batch_delete(req.into())
639 .await?
640 .try_into()
641 .context(ConvertMetaResponseSnafu)
642 }
643
644 pub async fn compare_and_put(
647 &self,
648 req: CompareAndPutRequest,
649 ) -> Result<CompareAndPutResponse> {
650 self.store_client()?
651 .compare_and_put(req.into())
652 .await?
653 .try_into()
654 .context(ConvertMetaResponseSnafu)
655 }
656
657 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
659 self.store_client()?
660 .delete_range(req.into())
661 .await?
662 .try_into()
663 .context(ConvertMetaResponseSnafu)
664 }
665
666 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
668 self.procedure_client()?.query_procedure_state(pid).await
669 }
670
671 pub async fn migrate_region(
673 &self,
674 request: MigrateRegionRequest,
675 ) -> Result<MigrateRegionResponse> {
676 self.procedure_client()?
677 .migrate_region(
678 request.region_id,
679 request.from_peer,
680 request.to_peer,
681 request.timeout,
682 )
683 .await
684 }
685
686 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
688 self.procedure_client()?.reconcile(request).await
689 }
690
691 pub async fn submit_ddl_task(
693 &self,
694 req: SubmitDdlTaskRequest,
695 ) -> Result<SubmitDdlTaskResponse> {
696 let res = self
697 .procedure_client()?
698 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
699 .await?
700 .try_into()
701 .context(ConvertMetaResponseSnafu)?;
702
703 Ok(res)
704 }
705
706 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
707 self.heartbeat.clone().context(NotStartedSnafu {
708 name: "heartbeat_client",
709 })
710 }
711
712 pub fn store_client(&self) -> Result<StoreClient> {
713 self.store.clone().context(NotStartedSnafu {
714 name: "store_client",
715 })
716 }
717
718 pub fn procedure_client(&self) -> Result<ProcedureClient> {
719 self.procedure.clone().context(NotStartedSnafu {
720 name: "procedure_client",
721 })
722 }
723
724 pub fn cluster_client(&self) -> Result<ClusterClient> {
725 self.cluster.clone().context(NotStartedSnafu {
726 name: "cluster_client",
727 })
728 }
729
730 pub fn channel_config(&self) -> &ChannelConfig {
731 self.channel_manager.config()
732 }
733
734 pub fn id(&self) -> Id {
735 self.id
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use std::sync::atomic::{AtomicUsize, Ordering};
742
743 use api::v1::meta::{HeartbeatRequest, Peer};
744 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
745 use rand::Rng;
746
747 use super::*;
748 use crate::error;
749 use crate::mocks::{self, MockMetaContext};
750
751 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
752
753 struct TestClient {
754 ns: String,
755 client: MetaClient,
756 meta_ctx: MockMetaContext,
757 }
758
759 impl TestClient {
760 async fn new(ns: impl Into<String>) -> Self {
761 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
763 Self {
764 ns: ns.into(),
765 client,
766 meta_ctx,
767 }
768 }
769
770 fn key(&self, name: &str) -> Vec<u8> {
771 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
772 }
773
774 async fn gen_data(&self) {
775 for i in 0..10 {
776 let req = PutRequest::new()
777 .with_key(self.key(&format!("key-{i}")))
778 .with_value(format!("{}-{}", "value", i).into_bytes())
779 .with_prev_kv();
780 let res = self.client.put(req).await;
781 let _ = res.unwrap();
782 }
783 }
784
785 async fn clear_data(&self) {
786 let req =
787 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
788 let res = self.client.delete_range(req).await;
789 let _ = res.unwrap();
790 }
791
792 #[allow(dead_code)]
793 fn kv_backend(&self) -> KvBackendRef {
794 self.meta_ctx.kv_backend.clone()
795 }
796
797 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
798 self.meta_ctx.in_memory.clone()
799 }
800 }
801
802 async fn new_client(ns: impl Into<String>) -> TestClient {
803 let client = TestClient::new(ns).await;
804 client.clear_data().await;
805 client
806 }
807
808 #[tokio::test]
809 async fn test_meta_client_builder() {
810 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
811
812 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
813 .enable_heartbeat()
814 .build();
815 let _ = meta_client.heartbeat_client().unwrap();
816 assert!(meta_client.store_client().is_err());
817 meta_client.start(urls).await.unwrap();
818
819 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
820 assert!(meta_client.heartbeat_client().is_err());
821 assert!(meta_client.store_client().is_err());
822 meta_client.start(urls).await.unwrap();
823
824 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
825 .enable_store()
826 .build();
827 assert!(meta_client.heartbeat_client().is_err());
828 let _ = meta_client.store_client().unwrap();
829 meta_client.start(urls).await.unwrap();
830
831 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
832 .enable_heartbeat()
833 .enable_store()
834 .build();
835 assert_eq!(2, meta_client.id());
836 assert_eq!(2, meta_client.id());
837 let _ = meta_client.heartbeat_client().unwrap();
838 let _ = meta_client.store_client().unwrap();
839 meta_client.start(urls).await.unwrap();
840 }
841
842 #[tokio::test]
843 async fn test_not_start_heartbeat_client() {
844 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
845 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
846 .enable_store()
847 .build();
848 meta_client.start(urls).await.unwrap();
849 let res = meta_client.ask_leader().await;
850 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
851 }
852
853 #[tokio::test]
854 async fn test_not_start_store_client() {
855 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
856 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
857 .enable_heartbeat()
858 .build();
859
860 meta_client.start(urls).await.unwrap();
861 let res = meta_client.put(PutRequest::default()).await;
862 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
863 }
864
865 #[tokio::test]
866 async fn test_ask_leader() {
867 let tc = new_client("test_ask_leader").await;
868 tc.client.ask_leader().await.unwrap();
869 }
870
871 #[tokio::test]
872 async fn test_heartbeat() {
873 let tc = new_client("test_heartbeat").await;
874 let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
875 let request_sent = Arc::new(AtomicUsize::new(0));
878 let request_sent_clone = request_sent.clone();
879 let _handle = tokio::spawn(async move {
880 for _ in 0..5 {
881 let req = HeartbeatRequest {
882 peer: Some(Peer {
883 id: 1,
884 addr: "meta_client_peer".to_string(),
885 }),
886 ..Default::default()
887 };
888 sender.send(req).await.unwrap();
889 request_sent_clone.fetch_add(1, Ordering::Relaxed);
890 }
891 });
892
893 let heartbeat_count = Arc::new(AtomicUsize::new(0));
894 let heartbeat_count_clone = heartbeat_count.clone();
895 let handle = tokio::spawn(async move {
896 while let Some(_resp) = receiver.message().await.unwrap() {
897 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
898 }
899 });
900
901 handle.await.unwrap();
902 assert_eq!(
904 request_sent.load(Ordering::Relaxed) + 1,
905 heartbeat_count.load(Ordering::Relaxed)
906 );
907 }
908
909 #[tokio::test]
910 async fn test_range_get() {
911 let tc = new_client("test_range_get").await;
912 tc.gen_data().await;
913
914 let key = tc.key("key-0");
915 let req = RangeRequest::new().with_key(key.as_slice());
916 let res = tc.client.range(req).await;
917 let mut kvs = res.unwrap().take_kvs();
918 assert_eq!(1, kvs.len());
919 let mut kv = kvs.pop().unwrap();
920 assert_eq!(key, kv.take_key());
921 assert_eq!(b"value-0".to_vec(), kv.take_value());
922 }
923
924 #[tokio::test]
925 async fn test_range_get_prefix() {
926 let tc = new_client("test_range_get_prefix").await;
927 tc.gen_data().await;
928
929 let req = RangeRequest::new().with_prefix(tc.key("key-"));
930 let res = tc.client.range(req).await;
931 let kvs = res.unwrap().take_kvs();
932 assert_eq!(10, kvs.len());
933 for (i, mut kv) in kvs.into_iter().enumerate() {
934 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
935 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
936 }
937 }
938
939 #[tokio::test]
940 async fn test_range() {
941 let tc = new_client("test_range").await;
942 tc.gen_data().await;
943
944 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
945 let res = tc.client.range(req).await;
946 let kvs = res.unwrap().take_kvs();
947 assert_eq!(3, kvs.len());
948 for (i, mut kv) in kvs.into_iter().enumerate() {
949 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
950 assert_eq!(
951 format!("{}-{}", "value", i + 5).into_bytes(),
952 kv.take_value()
953 );
954 }
955 }
956
957 #[tokio::test]
958 async fn test_range_keys_only() {
959 let tc = new_client("test_range_keys_only").await;
960 tc.gen_data().await;
961
962 let req = RangeRequest::new()
963 .with_range(tc.key("key-5"), tc.key("key-8"))
964 .with_keys_only();
965 let res = tc.client.range(req).await;
966 let kvs = res.unwrap().take_kvs();
967 assert_eq!(3, kvs.len());
968 for (i, mut kv) in kvs.into_iter().enumerate() {
969 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
970 assert!(kv.take_value().is_empty());
971 }
972 }
973
974 #[tokio::test]
975 async fn test_put() {
976 let tc = new_client("test_put").await;
977
978 let req = PutRequest::new()
979 .with_key(tc.key("key"))
980 .with_value(b"value".to_vec());
981 let res = tc.client.put(req).await;
982 assert!(res.unwrap().prev_kv.is_none());
983 }
984
985 #[tokio::test]
986 async fn test_put_with_prev_kv() {
987 let tc = new_client("test_put_with_prev_kv").await;
988
989 let key = tc.key("key");
990 let req = PutRequest::new()
991 .with_key(key.as_slice())
992 .with_value(b"value".to_vec())
993 .with_prev_kv();
994 let res = tc.client.put(req).await;
995 assert!(res.unwrap().prev_kv.is_none());
996
997 let req = PutRequest::new()
998 .with_key(key.as_slice())
999 .with_value(b"value1".to_vec())
1000 .with_prev_kv();
1001 let res = tc.client.put(req).await;
1002 let mut kv = res.unwrap().prev_kv.unwrap();
1003 assert_eq!(key, kv.take_key());
1004 assert_eq!(b"value".to_vec(), kv.take_value());
1005 }
1006
1007 #[tokio::test]
1008 async fn test_batch_put() {
1009 let tc = new_client("test_batch_put").await;
1010
1011 let mut req = BatchPutRequest::new();
1012 for i in 0..275 {
1013 req = req.add_kv(
1014 tc.key(&format!("key-{}", i)),
1015 format!("value-{}", i).into_bytes(),
1016 );
1017 }
1018
1019 let res = tc.client.batch_put(req).await;
1020 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1021
1022 let req = RangeRequest::new().with_prefix(tc.key("key-"));
1023 let res = tc.client.range(req).await;
1024 let kvs = res.unwrap().take_kvs();
1025 assert_eq!(275, kvs.len());
1026 }
1027
1028 #[tokio::test]
1029 async fn test_batch_get() {
1030 let tc = new_client("test_batch_get").await;
1031 tc.gen_data().await;
1032
1033 let mut req = BatchGetRequest::default();
1034 for i in 0..256 {
1035 req = req.add_key(tc.key(&format!("key-{}", i)));
1036 }
1037 let res = tc.client.batch_get(req).await.unwrap();
1038 assert_eq!(10, res.kvs.len());
1039
1040 let req = BatchGetRequest::default()
1041 .add_key(tc.key("key-1"))
1042 .add_key(tc.key("key-999"));
1043 let res = tc.client.batch_get(req).await.unwrap();
1044 assert_eq!(1, res.kvs.len());
1045 }
1046
1047 #[tokio::test]
1048 async fn test_batch_put_with_prev_kv() {
1049 let tc = new_client("test_batch_put_with_prev_kv").await;
1050
1051 let key = tc.key("key");
1052 let key2 = tc.key("key2");
1053 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1054 let res = tc.client.batch_put(req).await;
1055 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1056
1057 let req = BatchPutRequest::new()
1058 .add_kv(key.as_slice(), b"value-".to_vec())
1059 .add_kv(key2.as_slice(), b"value2-".to_vec())
1060 .with_prev_kv();
1061 let res = tc.client.batch_put(req).await;
1062 let mut kvs = res.unwrap().take_prev_kvs();
1063 assert_eq!(1, kvs.len());
1064 let mut kv = kvs.pop().unwrap();
1065 assert_eq!(key, kv.take_key());
1066 assert_eq!(b"value".to_vec(), kv.take_value());
1067 }
1068
1069 #[tokio::test]
1070 async fn test_compare_and_put() {
1071 let tc = new_client("test_compare_and_put").await;
1072
1073 let key = tc.key("key");
1074 let req = CompareAndPutRequest::new()
1075 .with_key(key.as_slice())
1076 .with_expect(b"expect".to_vec())
1077 .with_value(b"value".to_vec());
1078 let res = tc.client.compare_and_put(req).await;
1079 assert!(!res.unwrap().is_success());
1080
1081 let req = CompareAndPutRequest::new()
1083 .with_key(key.as_slice())
1084 .with_value(b"value".to_vec());
1085 let res = tc.client.compare_and_put(req).await;
1086 let mut res = res.unwrap();
1087 assert!(res.is_success());
1088 assert!(res.take_prev_kv().is_none());
1089
1090 let req = CompareAndPutRequest::new()
1092 .with_key(key.as_slice())
1093 .with_expect(b"not_eq".to_vec())
1094 .with_value(b"value2".to_vec());
1095 let res = tc.client.compare_and_put(req).await;
1096 let mut res = res.unwrap();
1097 assert!(!res.is_success());
1098 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1099
1100 let req = CompareAndPutRequest::new()
1102 .with_key(key.as_slice())
1103 .with_expect(b"value".to_vec())
1104 .with_value(b"value2".to_vec());
1105 let res = tc.client.compare_and_put(req).await;
1106 let mut res = res.unwrap();
1107 assert!(res.is_success());
1108
1109 assert!(res.take_prev_kv().is_none());
1111 }
1112
1113 #[tokio::test]
1114 async fn test_delete_with_key() {
1115 let tc = new_client("test_delete_with_key").await;
1116 tc.gen_data().await;
1117
1118 let req = DeleteRangeRequest::new()
1119 .with_key(tc.key("key-0"))
1120 .with_prev_kv();
1121 let res = tc.client.delete_range(req).await;
1122 let mut res = res.unwrap();
1123 assert_eq!(1, res.deleted());
1124 let mut kvs = res.take_prev_kvs();
1125 assert_eq!(1, kvs.len());
1126 let mut kv = kvs.pop().unwrap();
1127 assert_eq!(b"value-0".to_vec(), kv.take_value());
1128 }
1129
1130 #[tokio::test]
1131 async fn test_delete_with_prefix() {
1132 let tc = new_client("test_delete_with_prefix").await;
1133 tc.gen_data().await;
1134
1135 let req = DeleteRangeRequest::new()
1136 .with_prefix(tc.key("key-"))
1137 .with_prev_kv();
1138 let res = tc.client.delete_range(req).await;
1139 let mut res = res.unwrap();
1140 assert_eq!(10, res.deleted());
1141 let kvs = res.take_prev_kvs();
1142 assert_eq!(10, kvs.len());
1143 for (i, mut kv) in kvs.into_iter().enumerate() {
1144 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1145 }
1146 }
1147
1148 #[tokio::test]
1149 async fn test_delete_with_range() {
1150 let tc = new_client("test_delete_with_range").await;
1151 tc.gen_data().await;
1152
1153 let req = DeleteRangeRequest::new()
1154 .with_range(tc.key("key-2"), tc.key("key-7"))
1155 .with_prev_kv();
1156 let res = tc.client.delete_range(req).await;
1157 let mut res = res.unwrap();
1158 assert_eq!(5, res.deleted());
1159 let kvs = res.take_prev_kvs();
1160 assert_eq!(5, kvs.len());
1161 for (i, mut kv) in kvs.into_iter().enumerate() {
1162 assert_eq!(
1163 format!("{}-{}", "value", i + 2).into_bytes(),
1164 kv.take_value()
1165 );
1166 }
1167 }
1168
1169 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1170 Ok(())
1171 }
1172
1173 #[tokio::test]
1174 async fn test_cluster_client_adaptive_range() {
1175 let tx = new_client("test_cluster_client").await;
1176 let in_memory = tx.in_memory().unwrap();
1177 let cluster_client = tx.client.cluster_client().unwrap();
1178 let mut rng = rand::rng();
1179
1180 for i in 0..10 {
1182 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1183 in_memory
1184 .put(
1185 PutRequest::new()
1186 .with_key(format!("__prefix/{i}").as_bytes())
1187 .with_value(data.clone()),
1188 )
1189 .await
1190 .unwrap();
1191 }
1192
1193 let req = RangeRequest::new().with_prefix(b"__prefix/");
1194 let stream =
1195 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1196
1197 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1198 assert_eq!(10, res.len());
1199 }
1200}