1mod ask_leader;
16mod config;
17pub mod heartbeat;
18mod load_balance;
19mod procedure;
20
21mod cluster;
22mod store;
23mod util;
24
25use std::fmt::Debug;
26use std::sync::Arc;
27use std::time::Duration;
28
29use api::v1::meta::{
30 MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
31};
32pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
33use cluster::Client as ClusterClient;
34pub use cluster::ClusterKvBackend;
35use common_error::ext::BoxedError;
36use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
37use common_meta::cluster::{
38 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
39};
40use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
41use common_meta::error::{
42 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
43};
44use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
45use common_meta::kv_backend::KvBackendRef;
46use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
47use common_meta::range_stream::PaginationStream;
48use common_meta::rpc::KeyValue;
49use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
50use common_meta::rpc::procedure::{
51 AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse,
52 GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
53 ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
54};
55use common_meta::rpc::store::{
56 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
57 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
58 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
59};
60use common_options::plugin_options::PluginOptionsDeserializer;
61use common_telemetry::info;
62use config::Client as ConfigClient;
63use futures::TryStreamExt;
64use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
65use procedure::Client as ProcedureClient;
66use serde::de::DeserializeOwned;
67use snafu::{OptionExt, ResultExt};
68use store::Client as StoreClient;
69
70pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
71use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
72use crate::error::{
73 ConvertMetaConfigSnafu, ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error,
74 GetFlowStatSnafu, NotStartedSnafu, Result,
75};
76
77pub type Id = u64;
78
79const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
80const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
81const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
82const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10);
83
84#[derive(Clone, Debug, Default)]
85pub struct MetaClientBuilder {
86 id: Id,
87 role: Role,
88 enable_heartbeat: bool,
89 enable_store: bool,
90 enable_procedure: bool,
91 enable_access_cluster_info: bool,
92 region_follower: Option<RegionFollowerClientRef>,
93 channel_manager: Option<ChannelManager>,
94 ddl_channel_manager: Option<ChannelManager>,
95 ddl_timeout: Option<Duration>,
97 heartbeat_channel_manager: Option<ChannelManager>,
98}
99
100impl MetaClientBuilder {
101 pub fn new(member_id: u64, role: Role) -> Self {
102 Self {
103 id: member_id,
104 role,
105 ..Default::default()
106 }
107 }
108
109 pub fn frontend_default_options() -> Self {
111 Self::new(0, Role::Frontend)
113 .enable_store()
114 .enable_heartbeat()
115 .enable_procedure()
116 .enable_access_cluster_info()
117 }
118
119 pub fn datanode_default_options(member_id: u64) -> Self {
121 Self::new(member_id, Role::Datanode)
122 .enable_store()
123 .enable_heartbeat()
124 }
125
126 pub fn flownode_default_options(member_id: u64) -> Self {
128 Self::new(member_id, Role::Flownode)
129 .enable_store()
130 .enable_heartbeat()
131 .enable_procedure()
132 .enable_access_cluster_info()
133 }
134
135 pub fn enable_heartbeat(self) -> Self {
136 Self {
137 enable_heartbeat: true,
138 ..self
139 }
140 }
141
142 pub fn enable_store(self) -> Self {
143 Self {
144 enable_store: true,
145 ..self
146 }
147 }
148
149 pub fn enable_procedure(self) -> Self {
150 Self {
151 enable_procedure: true,
152 ..self
153 }
154 }
155
156 pub fn enable_access_cluster_info(self) -> Self {
157 Self {
158 enable_access_cluster_info: true,
159 ..self
160 }
161 }
162
163 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
164 Self {
165 channel_manager: Some(channel_manager),
166 ..self
167 }
168 }
169
170 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
171 Self {
172 ddl_channel_manager: Some(channel_manager),
173 ..self
174 }
175 }
176
177 pub fn ddl_timeout(self, timeout: Duration) -> Self {
178 Self {
179 ddl_timeout: Some(timeout),
180 ..self
181 }
182 }
183
184 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
185 Self {
186 heartbeat_channel_manager: Some(channel_manager),
187 ..self
188 }
189 }
190
191 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
192 Self {
193 region_follower: Some(region_follower),
194 ..self
195 }
196 }
197
198 pub fn build(self) -> MetaClient {
199 let mgr = self.channel_manager.unwrap_or_default();
200 let heartbeat_channel_manager = self
201 .heartbeat_channel_manager
202 .clone()
203 .unwrap_or_else(|| mgr.clone());
204
205 let heartbeat = self.enable_heartbeat.then(|| {
206 if self.heartbeat_channel_manager.is_some() {
207 info!("Enable heartbeat channel using the heartbeat channel manager.");
208 }
209
210 HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
211 });
212 let config = self
213 .enable_heartbeat
214 .then(|| ConfigClient::new(self.id, self.role, mgr.clone()));
215 let store = self
216 .enable_store
217 .then(|| StoreClient::new(self.id, self.role, mgr.clone()));
218 let procedure = self.enable_procedure.then(|| {
219 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
220 ProcedureClient::new(
221 self.id,
222 self.role,
223 mgr,
224 DEFAULT_SUBMIT_DDL_MAX_RETRY,
225 self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT),
226 )
227 });
228 let cluster = self
229 .enable_access_cluster_info
230 .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
231 let region_follower = self.region_follower.clone();
232
233 MetaClient {
234 id: self.id,
235 channel_manager: mgr.clone(),
236 leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
237 self.id,
238 self.role,
239 DEFAULT_ASK_LEADER_MAX_RETRY,
240 heartbeat_channel_manager,
241 )),
242 heartbeat,
243 config,
244 store,
245 procedure,
246 cluster,
247 region_follower,
248 }
249 }
250}
251
252#[derive(Debug)]
253pub struct MetaClient {
254 id: Id,
255 channel_manager: ChannelManager,
256 leader_provider_factory: LeaderProviderFactoryRef,
257 heartbeat: Option<HeartbeatClient>,
258 config: Option<ConfigClient>,
259 store: Option<StoreClient>,
260 procedure: Option<ProcedureClient>,
261 cluster: Option<ClusterClient>,
262 region_follower: Option<RegionFollowerClientRef>,
263}
264
265impl MetaClient {
266 pub fn new(id: Id, role: Role) -> Self {
267 Self {
268 id,
269 channel_manager: ChannelManager::default(),
270 leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
271 id,
272 role,
273 DEFAULT_ASK_LEADER_MAX_RETRY,
274 ChannelManager::default(),
275 )),
276 heartbeat: None,
277 config: None,
278 store: None,
279 procedure: None,
280 cluster: None,
281 region_follower: None,
282 }
283 }
284}
285
286pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
287
288#[async_trait::async_trait]
290pub trait RegionFollowerClient: Sync + Send + Debug {
291 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
292
293 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
294
295 async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
296
297 async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
298
299 async fn start(&self, urls: &[&str]) -> Result<()>;
300
301 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
302}
303
304#[async_trait::async_trait]
305impl ProcedureExecutor for MetaClient {
306 async fn submit_ddl_task(
307 &self,
308 _ctx: &ExecutorContext,
309 request: SubmitDdlTaskRequest,
310 ) -> MetaResult<SubmitDdlTaskResponse> {
311 self.submit_ddl_task(request)
312 .await
313 .map_err(BoxedError::new)
314 .context(meta_error::ExternalSnafu)
315 }
316
317 async fn migrate_region(
318 &self,
319 _ctx: &ExecutorContext,
320 request: MigrateRegionRequest,
321 ) -> MetaResult<MigrateRegionResponse> {
322 self.migrate_region(request)
323 .await
324 .map_err(BoxedError::new)
325 .context(meta_error::ExternalSnafu)
326 }
327
328 async fn reconcile(
329 &self,
330 _ctx: &ExecutorContext,
331 request: ReconcileRequest,
332 ) -> MetaResult<ReconcileResponse> {
333 self.reconcile(request)
334 .await
335 .map_err(BoxedError::new)
336 .context(meta_error::ExternalSnafu)
337 }
338
339 async fn manage_region_follower(
340 &self,
341 _ctx: &ExecutorContext,
342 request: ManageRegionFollowerRequest,
343 ) -> MetaResult<()> {
344 if let Some(region_follower) = &self.region_follower {
345 match request {
346 ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
347 region_follower
348 .add_region_follower(add_region_follower_request)
349 .await
350 }
351 ManageRegionFollowerRequest::RemoveRegionFollower(
352 remove_region_follower_request,
353 ) => {
354 region_follower
355 .remove_region_follower(remove_region_follower_request)
356 .await
357 }
358 ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
359 region_follower
360 .add_table_follower(add_table_follower_request)
361 .await
362 }
363 ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
364 region_follower
365 .remove_table_follower(remove_table_follower_request)
366 .await
367 }
368 }
369 .map_err(BoxedError::new)
370 .context(meta_error::ExternalSnafu)
371 } else {
372 UnsupportedSnafu {
373 operation: "manage_region_follower",
374 }
375 .fail()
376 }
377 }
378
379 async fn query_procedure_state(
380 &self,
381 _ctx: &ExecutorContext,
382 pid: &str,
383 ) -> MetaResult<ProcedureStateResponse> {
384 self.query_procedure_state(pid)
385 .await
386 .map_err(BoxedError::new)
387 .context(meta_error::ExternalSnafu)
388 }
389
390 async fn gc_regions(
391 &self,
392 _ctx: &ExecutorContext,
393 request: GcRegionsRequest,
394 ) -> MetaResult<GcResponse> {
395 self.gc_regions(request)
396 .await
397 .map_err(BoxedError::new)
398 .context(meta_error::ExternalSnafu)
399 }
400
401 async fn gc_table(
402 &self,
403 _ctx: &ExecutorContext,
404 request: GcTableRequest,
405 ) -> MetaResult<GcResponse> {
406 self.gc_table(request)
407 .await
408 .map_err(BoxedError::new)
409 .context(meta_error::ExternalSnafu)
410 }
411
412 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
413 self.procedure_client()
414 .map_err(BoxedError::new)
415 .context(meta_error::ExternalSnafu)?
416 .list_procedures()
417 .await
418 .map_err(BoxedError::new)
419 .context(meta_error::ExternalSnafu)
420 }
421}
422
423#[allow(deprecated)]
425#[async_trait::async_trait]
426impl ClusterInfo for MetaClient {
427 type Error = Error;
428
429 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
430 let cluster_client = self.cluster_client()?;
431
432 let (get_metasrv_nodes, nodes_key_prefix) = match role {
433 None => (true, Some(NodeInfoKey::key_prefix())),
434 Some(ClusterRole::Metasrv) => (true, None),
435 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
436 };
437
438 let mut nodes = if get_metasrv_nodes {
439 let last_activity_ts = -1; let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
442 cluster_client.get_metasrv_peers().await?;
443 followers
444 .into_iter()
445 .map(|node| {
446 if let Some(node_info) = node.info {
447 NodeInfo {
448 peer: node.peer.unwrap_or_default(),
449 last_activity_ts,
450 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
451 version: node_info.version,
452 git_commit: node_info.git_commit,
453 start_time_ms: node_info.start_time_ms,
454 total_cpu_millicores: node_info.total_cpu_millicores,
455 total_memory_bytes: node_info.total_memory_bytes,
456 cpu_usage_millicores: node_info.cpu_usage_millicores,
457 memory_usage_bytes: node_info.memory_usage_bytes,
458 hostname: node_info.hostname,
459 }
460 } else {
461 NodeInfo {
463 peer: node.peer.unwrap_or_default(),
464 last_activity_ts,
465 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
466 version: node.version,
467 git_commit: node.git_commit,
468 start_time_ms: node.start_time_ms,
469 total_cpu_millicores: node.cpus as i64,
470 total_memory_bytes: node.memory_bytes as i64,
471 cpu_usage_millicores: 0,
472 memory_usage_bytes: 0,
473 hostname: "".to_string(),
474 }
475 }
476 })
477 .chain(leader.into_iter().map(|node| {
478 if let Some(node_info) = node.info {
479 NodeInfo {
480 peer: node.peer.unwrap_or_default(),
481 last_activity_ts,
482 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
483 version: node_info.version,
484 git_commit: node_info.git_commit,
485 start_time_ms: node_info.start_time_ms,
486 total_cpu_millicores: node_info.total_cpu_millicores,
487 total_memory_bytes: node_info.total_memory_bytes,
488 cpu_usage_millicores: node_info.cpu_usage_millicores,
489 memory_usage_bytes: node_info.memory_usage_bytes,
490 hostname: node_info.hostname,
491 }
492 } else {
493 NodeInfo {
495 peer: node.peer.unwrap_or_default(),
496 last_activity_ts,
497 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
498 version: node.version,
499 git_commit: node.git_commit,
500 start_time_ms: node.start_time_ms,
501 total_cpu_millicores: node.cpus as i64,
502 total_memory_bytes: node.memory_bytes as i64,
503 cpu_usage_millicores: 0,
504 memory_usage_bytes: 0,
505 hostname: "".to_string(),
506 }
507 }
508 }))
509 .collect::<Vec<_>>()
510 } else {
511 Vec::new()
512 };
513
514 if let Some(prefix) = nodes_key_prefix {
515 let req = RangeRequest::new().with_prefix(prefix);
516 let res = cluster_client.range(req).await?;
517 for kv in res.kvs {
518 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
519 }
520 }
521
522 Ok(nodes)
523 }
524
525 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
526 let cluster_kv_backend = Arc::new(self.cluster_client()?);
527 let range_prefix = DatanodeStatKey::prefix_key();
528 let req = RangeRequest::new().with_prefix(range_prefix);
529 let stream =
530 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
531 let mut datanode_stats = stream
532 .try_collect::<Vec<_>>()
533 .await
534 .context(ConvertMetaResponseSnafu)?;
535 let region_stats = datanode_stats
536 .iter_mut()
537 .flat_map(|datanode_stat| {
538 let last = datanode_stat.stats.pop();
539 last.map(|stat| stat.region_stats).unwrap_or_default()
540 })
541 .collect::<Vec<_>>();
542
543 Ok(region_stats)
544 }
545
546 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
547 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
548 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
549 let flow_state_manager = FlowStateManager::new(cluster_backend);
550 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
551
552 Ok(res.map(|r| r.into()))
553 }
554}
555
556fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
557 DatanodeStatValue::try_from(kv.value)
558 .map_err(BoxedError::new)
559 .context(ExternalSnafu)
560}
561
562impl MetaClient {
563 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
564 where
565 U: AsRef<str>,
566 A: AsRef<[U]> + Clone,
567 {
568 info!("MetaClient channel config: {:?}", self.channel_config());
569
570 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
571 let leader_provider = self.leader_provider_factory.create(&urls);
572
573 self.start_with(leader_provider, urls).await
574 }
575
576 pub(crate) async fn start_with<U, A>(
578 &mut self,
579 leader_provider: LeaderProviderRef,
580 peers: A,
581 ) -> Result<()>
582 where
583 U: AsRef<str>,
584 A: AsRef<[U]> + Clone,
585 {
586 if let Some(client) = &self.region_follower {
587 info!("Starting region follower client ...");
588 client.start_with(leader_provider.clone()).await?;
589 }
590
591 if let Some(client) = &self.heartbeat {
592 info!("Starting heartbeat client ...");
593 client.start_with(leader_provider.clone()).await?;
594 }
595
596 if let Some(client) = &self.config {
597 info!("Starting config client ...");
598 client.start_with(leader_provider.clone()).await?;
599 }
600
601 if let Some(client) = &mut self.store {
602 info!("Starting store client ...");
603 client.start(peers.clone()).await?;
604 }
605
606 if let Some(client) = &self.procedure {
607 info!("Starting procedure client ...");
608 client.start_with(leader_provider.clone()).await?;
609 }
610
611 if let Some(client) = &mut self.cluster {
612 info!("Starting cluster client ...");
613 client.start_with(leader_provider).await?;
614 }
615 Ok(())
616 }
617
618 pub async fn ask_leader(&self) -> Result<String> {
621 self.heartbeat_client()?.ask_leader().await
622 }
623
624 pub async fn pull_config<T, U>(&self, deserializer: T) -> Result<U>
625 where
626 T: PluginOptionsDeserializer<U>,
627 U: DeserializeOwned,
628 {
629 let res = self.config_client()?.pull_config().await?;
630 let v = deserializer
631 .deserialize(&res.payload)
632 .context(ConvertMetaConfigSnafu)?;
633 Ok(v)
634 }
635
636 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
645 self.heartbeat_client()?.heartbeat().await
646 }
647
648 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
650 self.store_client()?
651 .range(req.into())
652 .await?
653 .try_into()
654 .context(ConvertMetaResponseSnafu)
655 }
656
657 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
659 self.store_client()?
660 .put(req.into())
661 .await?
662 .try_into()
663 .context(ConvertMetaResponseSnafu)
664 }
665
666 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
668 self.store_client()?
669 .batch_get(req.into())
670 .await?
671 .try_into()
672 .context(ConvertMetaResponseSnafu)
673 }
674
675 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
677 self.store_client()?
678 .batch_put(req.into())
679 .await?
680 .try_into()
681 .context(ConvertMetaResponseSnafu)
682 }
683
684 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
686 self.store_client()?
687 .batch_delete(req.into())
688 .await?
689 .try_into()
690 .context(ConvertMetaResponseSnafu)
691 }
692
693 pub async fn compare_and_put(
696 &self,
697 req: CompareAndPutRequest,
698 ) -> Result<CompareAndPutResponse> {
699 self.store_client()?
700 .compare_and_put(req.into())
701 .await?
702 .try_into()
703 .context(ConvertMetaResponseSnafu)
704 }
705
706 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
708 self.store_client()?
709 .delete_range(req.into())
710 .await?
711 .try_into()
712 .context(ConvertMetaResponseSnafu)
713 }
714
715 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
717 self.procedure_client()?.query_procedure_state(pid).await
718 }
719
720 pub async fn migrate_region(
722 &self,
723 request: MigrateRegionRequest,
724 ) -> Result<MigrateRegionResponse> {
725 self.procedure_client()?
726 .migrate_region(
727 request.region_id,
728 request.from_peer,
729 request.to_peer,
730 request.timeout,
731 )
732 .await
733 }
734
735 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
737 self.procedure_client()?.reconcile(request).await
738 }
739
740 pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result<GcResponse> {
742 self.procedure_client()?.gc_regions(request).await
743 }
744
745 pub async fn gc_table(&self, request: GcTableRequest) -> Result<GcResponse> {
747 self.procedure_client()?.gc_table(request).await
748 }
749
750 pub async fn submit_ddl_task(
752 &self,
753 req: SubmitDdlTaskRequest,
754 ) -> Result<SubmitDdlTaskResponse> {
755 let res = self
756 .procedure_client()?
757 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
758 .await?
759 .try_into()
760 .context(ConvertMetaResponseSnafu)?;
761
762 Ok(res)
763 }
764
765 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
766 self.heartbeat.clone().context(NotStartedSnafu {
767 name: "heartbeat_client",
768 })
769 }
770
771 pub fn config_client(&self) -> Result<ConfigClient> {
772 self.config.clone().context(NotStartedSnafu {
773 name: "config_client",
774 })
775 }
776
777 pub fn store_client(&self) -> Result<StoreClient> {
778 self.store.clone().context(NotStartedSnafu {
779 name: "store_client",
780 })
781 }
782
783 pub fn procedure_client(&self) -> Result<ProcedureClient> {
784 self.procedure.clone().context(NotStartedSnafu {
785 name: "procedure_client",
786 })
787 }
788
789 pub fn cluster_client(&self) -> Result<ClusterClient> {
790 self.cluster.clone().context(NotStartedSnafu {
791 name: "cluster_client",
792 })
793 }
794
795 pub fn channel_config(&self) -> &ChannelConfig {
796 self.channel_manager.config()
797 }
798
799 pub fn id(&self) -> Id {
800 self.id
801 }
802}
803
804#[cfg(test)]
805mod tests {
806 use std::sync::atomic::{AtomicUsize, Ordering};
807
808 use api::v1::meta::{HeartbeatRequest, Peer};
809 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
810 use rand::Rng;
811
812 use super::*;
813 use crate::error;
814 use crate::mocks::{self, MockMetaContext};
815
816 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
817
818 struct TestClient {
819 ns: String,
820 client: MetaClient,
821 meta_ctx: MockMetaContext,
822 }
823
824 impl TestClient {
825 async fn new(ns: impl Into<String>) -> Self {
826 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
828 Self {
829 ns: ns.into(),
830 client,
831 meta_ctx,
832 }
833 }
834
835 fn key(&self, name: &str) -> Vec<u8> {
836 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
837 }
838
839 async fn gen_data(&self) {
840 for i in 0..10 {
841 let req = PutRequest::new()
842 .with_key(self.key(&format!("key-{i}")))
843 .with_value(format!("{}-{}", "value", i).into_bytes())
844 .with_prev_kv();
845 let res = self.client.put(req).await;
846 let _ = res.unwrap();
847 }
848 }
849
850 async fn clear_data(&self) {
851 let req =
852 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
853 let res = self.client.delete_range(req).await;
854 let _ = res.unwrap();
855 }
856
857 #[allow(dead_code)]
858 fn kv_backend(&self) -> KvBackendRef {
859 self.meta_ctx.kv_backend.clone()
860 }
861
862 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
863 self.meta_ctx.in_memory.clone()
864 }
865 }
866
867 async fn new_client(ns: impl Into<String>) -> TestClient {
868 let client = TestClient::new(ns).await;
869 client.clear_data().await;
870 client
871 }
872
873 #[tokio::test]
874 async fn test_meta_client_builder() {
875 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
876
877 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
878 .enable_heartbeat()
879 .build();
880 let _ = meta_client.heartbeat_client().unwrap();
881 assert!(meta_client.store_client().is_err());
882 meta_client.start(urls).await.unwrap();
883
884 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
885 assert!(meta_client.heartbeat_client().is_err());
886 assert!(meta_client.store_client().is_err());
887 meta_client.start(urls).await.unwrap();
888
889 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
890 .enable_store()
891 .build();
892 assert!(meta_client.heartbeat_client().is_err());
893 let _ = meta_client.store_client().unwrap();
894 meta_client.start(urls).await.unwrap();
895
896 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
897 .enable_heartbeat()
898 .enable_store()
899 .build();
900 assert_eq!(2, meta_client.id());
901 assert_eq!(2, meta_client.id());
902 let _ = meta_client.heartbeat_client().unwrap();
903 let _ = meta_client.store_client().unwrap();
904 meta_client.start(urls).await.unwrap();
905 }
906
907 #[tokio::test]
908 async fn test_not_start_heartbeat_client() {
909 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
910 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
911 .enable_store()
912 .build();
913 meta_client.start(urls).await.unwrap();
914 let res = meta_client.ask_leader().await;
915 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
916 }
917
918 #[tokio::test]
919 async fn test_not_start_store_client() {
920 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
921 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
922 .enable_heartbeat()
923 .build();
924
925 meta_client.start(urls).await.unwrap();
926 let res = meta_client.put(PutRequest::default()).await;
927 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
928 }
929
930 #[tokio::test]
931 async fn test_ask_leader() {
932 let tc = new_client("test_ask_leader").await;
933 tc.client.ask_leader().await.unwrap();
934 }
935
936 #[tokio::test]
937 async fn test_heartbeat() {
938 let tc = new_client("test_heartbeat").await;
939 let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
940 let request_sent = Arc::new(AtomicUsize::new(0));
943 let request_sent_clone = request_sent.clone();
944 let _handle = tokio::spawn(async move {
945 for _ in 0..5 {
946 let req = HeartbeatRequest {
947 peer: Some(Peer {
948 id: 1,
949 addr: "meta_client_peer".to_string(),
950 }),
951 ..Default::default()
952 };
953 sender.send(req).await.unwrap();
954 request_sent_clone.fetch_add(1, Ordering::Relaxed);
955 }
956 });
957
958 let heartbeat_count = Arc::new(AtomicUsize::new(0));
959 let heartbeat_count_clone = heartbeat_count.clone();
960 let handle = tokio::spawn(async move {
961 while let Some(_resp) = receiver.message().await.unwrap() {
962 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
963 }
964 });
965
966 handle.await.unwrap();
967 assert_eq!(
969 request_sent.load(Ordering::Relaxed) + 1,
970 heartbeat_count.load(Ordering::Relaxed)
971 );
972 }
973
974 #[tokio::test]
975 async fn test_range_get() {
976 let tc = new_client("test_range_get").await;
977 tc.gen_data().await;
978
979 let key = tc.key("key-0");
980 let req = RangeRequest::new().with_key(key.as_slice());
981 let res = tc.client.range(req).await;
982 let mut kvs = res.unwrap().take_kvs();
983 assert_eq!(1, kvs.len());
984 let mut kv = kvs.pop().unwrap();
985 assert_eq!(key, kv.take_key());
986 assert_eq!(b"value-0".to_vec(), kv.take_value());
987 }
988
989 #[tokio::test]
990 async fn test_range_get_prefix() {
991 let tc = new_client("test_range_get_prefix").await;
992 tc.gen_data().await;
993
994 let req = RangeRequest::new().with_prefix(tc.key("key-"));
995 let res = tc.client.range(req).await;
996 let kvs = res.unwrap().take_kvs();
997 assert_eq!(10, kvs.len());
998 for (i, mut kv) in kvs.into_iter().enumerate() {
999 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
1000 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1001 }
1002 }
1003
1004 #[tokio::test]
1005 async fn test_range() {
1006 let tc = new_client("test_range").await;
1007 tc.gen_data().await;
1008
1009 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
1010 let res = tc.client.range(req).await;
1011 let kvs = res.unwrap().take_kvs();
1012 assert_eq!(3, kvs.len());
1013 for (i, mut kv) in kvs.into_iter().enumerate() {
1014 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1015 assert_eq!(
1016 format!("{}-{}", "value", i + 5).into_bytes(),
1017 kv.take_value()
1018 );
1019 }
1020 }
1021
1022 #[tokio::test]
1023 async fn test_range_keys_only() {
1024 let tc = new_client("test_range_keys_only").await;
1025 tc.gen_data().await;
1026
1027 let req = RangeRequest::new()
1028 .with_range(tc.key("key-5"), tc.key("key-8"))
1029 .with_keys_only();
1030 let res = tc.client.range(req).await;
1031 let kvs = res.unwrap().take_kvs();
1032 assert_eq!(3, kvs.len());
1033 for (i, mut kv) in kvs.into_iter().enumerate() {
1034 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1035 assert!(kv.take_value().is_empty());
1036 }
1037 }
1038
1039 #[tokio::test]
1040 async fn test_put() {
1041 let tc = new_client("test_put").await;
1042
1043 let req = PutRequest::new()
1044 .with_key(tc.key("key"))
1045 .with_value(b"value".to_vec());
1046 let res = tc.client.put(req).await;
1047 assert!(res.unwrap().prev_kv.is_none());
1048 }
1049
1050 #[tokio::test]
1051 async fn test_put_with_prev_kv() {
1052 let tc = new_client("test_put_with_prev_kv").await;
1053
1054 let key = tc.key("key");
1055 let req = PutRequest::new()
1056 .with_key(key.as_slice())
1057 .with_value(b"value".to_vec())
1058 .with_prev_kv();
1059 let res = tc.client.put(req).await;
1060 assert!(res.unwrap().prev_kv.is_none());
1061
1062 let req = PutRequest::new()
1063 .with_key(key.as_slice())
1064 .with_value(b"value1".to_vec())
1065 .with_prev_kv();
1066 let res = tc.client.put(req).await;
1067 let mut kv = res.unwrap().prev_kv.unwrap();
1068 assert_eq!(key, kv.take_key());
1069 assert_eq!(b"value".to_vec(), kv.take_value());
1070 }
1071
1072 #[tokio::test]
1073 async fn test_batch_put() {
1074 let tc = new_client("test_batch_put").await;
1075
1076 let mut req = BatchPutRequest::new();
1077 for i in 0..275 {
1078 req = req.add_kv(
1079 tc.key(&format!("key-{}", i)),
1080 format!("value-{}", i).into_bytes(),
1081 );
1082 }
1083
1084 let res = tc.client.batch_put(req).await;
1085 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1086
1087 let req = RangeRequest::new().with_prefix(tc.key("key-"));
1088 let res = tc.client.range(req).await;
1089 let kvs = res.unwrap().take_kvs();
1090 assert_eq!(275, kvs.len());
1091 }
1092
1093 #[tokio::test]
1094 async fn test_batch_get() {
1095 let tc = new_client("test_batch_get").await;
1096 tc.gen_data().await;
1097
1098 let mut req = BatchGetRequest::default();
1099 for i in 0..256 {
1100 req = req.add_key(tc.key(&format!("key-{}", i)));
1101 }
1102 let res = tc.client.batch_get(req).await.unwrap();
1103 assert_eq!(10, res.kvs.len());
1104
1105 let req = BatchGetRequest::default()
1106 .add_key(tc.key("key-1"))
1107 .add_key(tc.key("key-999"));
1108 let res = tc.client.batch_get(req).await.unwrap();
1109 assert_eq!(1, res.kvs.len());
1110 }
1111
1112 #[tokio::test]
1113 async fn test_batch_put_with_prev_kv() {
1114 let tc = new_client("test_batch_put_with_prev_kv").await;
1115
1116 let key = tc.key("key");
1117 let key2 = tc.key("key2");
1118 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1119 let res = tc.client.batch_put(req).await;
1120 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1121
1122 let req = BatchPutRequest::new()
1123 .add_kv(key.as_slice(), b"value-".to_vec())
1124 .add_kv(key2.as_slice(), b"value2-".to_vec())
1125 .with_prev_kv();
1126 let res = tc.client.batch_put(req).await;
1127 let mut kvs = res.unwrap().take_prev_kvs();
1128 assert_eq!(1, kvs.len());
1129 let mut kv = kvs.pop().unwrap();
1130 assert_eq!(key, kv.take_key());
1131 assert_eq!(b"value".to_vec(), kv.take_value());
1132 }
1133
1134 #[tokio::test]
1135 async fn test_compare_and_put() {
1136 let tc = new_client("test_compare_and_put").await;
1137
1138 let key = tc.key("key");
1139 let req = CompareAndPutRequest::new()
1140 .with_key(key.as_slice())
1141 .with_expect(b"expect".to_vec())
1142 .with_value(b"value".to_vec());
1143 let res = tc.client.compare_and_put(req).await;
1144 assert!(!res.unwrap().is_success());
1145
1146 let req = CompareAndPutRequest::new()
1148 .with_key(key.as_slice())
1149 .with_value(b"value".to_vec());
1150 let res = tc.client.compare_and_put(req).await;
1151 let mut res = res.unwrap();
1152 assert!(res.is_success());
1153 assert!(res.take_prev_kv().is_none());
1154
1155 let req = CompareAndPutRequest::new()
1157 .with_key(key.as_slice())
1158 .with_expect(b"not_eq".to_vec())
1159 .with_value(b"value2".to_vec());
1160 let res = tc.client.compare_and_put(req).await;
1161 let mut res = res.unwrap();
1162 assert!(!res.is_success());
1163 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1164
1165 let req = CompareAndPutRequest::new()
1167 .with_key(key.as_slice())
1168 .with_expect(b"value".to_vec())
1169 .with_value(b"value2".to_vec());
1170 let res = tc.client.compare_and_put(req).await;
1171 let mut res = res.unwrap();
1172 assert!(res.is_success());
1173
1174 assert!(res.take_prev_kv().is_none());
1176 }
1177
1178 #[tokio::test]
1179 async fn test_delete_with_key() {
1180 let tc = new_client("test_delete_with_key").await;
1181 tc.gen_data().await;
1182
1183 let req = DeleteRangeRequest::new()
1184 .with_key(tc.key("key-0"))
1185 .with_prev_kv();
1186 let res = tc.client.delete_range(req).await;
1187 let mut res = res.unwrap();
1188 assert_eq!(1, res.deleted());
1189 let mut kvs = res.take_prev_kvs();
1190 assert_eq!(1, kvs.len());
1191 let mut kv = kvs.pop().unwrap();
1192 assert_eq!(b"value-0".to_vec(), kv.take_value());
1193 }
1194
1195 #[tokio::test]
1196 async fn test_delete_with_prefix() {
1197 let tc = new_client("test_delete_with_prefix").await;
1198 tc.gen_data().await;
1199
1200 let req = DeleteRangeRequest::new()
1201 .with_prefix(tc.key("key-"))
1202 .with_prev_kv();
1203 let res = tc.client.delete_range(req).await;
1204 let mut res = res.unwrap();
1205 assert_eq!(10, res.deleted());
1206 let kvs = res.take_prev_kvs();
1207 assert_eq!(10, kvs.len());
1208 for (i, mut kv) in kvs.into_iter().enumerate() {
1209 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1210 }
1211 }
1212
1213 #[tokio::test]
1214 async fn test_delete_with_range() {
1215 let tc = new_client("test_delete_with_range").await;
1216 tc.gen_data().await;
1217
1218 let req = DeleteRangeRequest::new()
1219 .with_range(tc.key("key-2"), tc.key("key-7"))
1220 .with_prev_kv();
1221 let res = tc.client.delete_range(req).await;
1222 let mut res = res.unwrap();
1223 assert_eq!(5, res.deleted());
1224 let kvs = res.take_prev_kvs();
1225 assert_eq!(5, kvs.len());
1226 for (i, mut kv) in kvs.into_iter().enumerate() {
1227 assert_eq!(
1228 format!("{}-{}", "value", i + 2).into_bytes(),
1229 kv.take_value()
1230 );
1231 }
1232 }
1233
1234 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1235 Ok(())
1236 }
1237
1238 #[tokio::test]
1239 async fn test_cluster_client_adaptive_range() {
1240 let tx = new_client("test_cluster_client").await;
1241 let in_memory = tx.in_memory().unwrap();
1242 let cluster_client = tx.client.cluster_client().unwrap();
1243 let mut rng = rand::rng();
1244
1245 for i in 0..10 {
1247 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1248 in_memory
1249 .put(
1250 PutRequest::new()
1251 .with_key(format!("__prefix/{i}").as_bytes())
1252 .with_value(data.clone()),
1253 )
1254 .await
1255 .unwrap();
1256 }
1257
1258 let req = RangeRequest::new().with_prefix(b"__prefix/");
1259 let stream =
1260 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1261
1262 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1263 assert_eq!(10, res.len());
1264 }
1265}