1mod ask_leader;
16mod config;
17pub mod heartbeat;
18mod load_balance;
19mod procedure;
20
21mod cluster;
22mod store;
23mod util;
24
25use std::fmt::Debug;
26use std::sync::Arc;
27use std::time::Duration;
28
29use api::v1::meta::heartbeat_request::NodeWorkloads;
30use api::v1::meta::{
31 MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
32};
33pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
34use cluster::Client as ClusterClient;
35pub use cluster::ClusterKvBackend;
36use common_error::ext::BoxedError;
37use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
38use common_meta::cluster::{
39 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
40};
41use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
42use common_meta::distributed_time_constants::default_distributed_time_constants;
43use common_meta::error::{
44 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
45};
46use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
47use common_meta::kv_backend::KvBackendRef;
48use common_meta::peer::{Peer, PeerDiscovery};
49use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
50use common_meta::range_stream::PaginationStream;
51use common_meta::rpc::KeyValue;
52use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
53use common_meta::rpc::procedure::{
54 AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse,
55 GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
56 ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
57};
58use common_meta::rpc::store::{
59 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
60 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
61 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
62};
63use common_options::plugin_options::PluginOptionsDeserializer;
64use common_telemetry::info;
65use common_time::util::DefaultSystemTimer;
66use config::Client as ConfigClient;
67use futures::TryStreamExt;
68use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
69use procedure::Client as ProcedureClient;
70use serde::de::DeserializeOwned;
71use snafu::{OptionExt, ResultExt};
72use store::Client as StoreClient;
73
74pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
75use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
76use crate::error::{
77 ConvertMetaConfigSnafu, ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error,
78 GetFlowStatSnafu, NotStartedSnafu, Result,
79};
80
81pub type Id = u64;
82
83const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
84const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
85const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
86const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10);
87
88#[derive(Clone, Debug, Default)]
89pub struct MetaClientBuilder {
90 id: Id,
91 role: Role,
92 enable_heartbeat: bool,
93 enable_store: bool,
94 enable_procedure: bool,
95 enable_access_cluster_info: bool,
96 region_follower: Option<RegionFollowerClientRef>,
97 channel_manager: Option<ChannelManager>,
98 ddl_channel_manager: Option<ChannelManager>,
99 ddl_timeout: Option<Duration>,
101 heartbeat_channel_manager: Option<ChannelManager>,
102}
103
104impl MetaClientBuilder {
105 pub fn new(member_id: u64, role: Role) -> Self {
106 Self {
107 id: member_id,
108 role,
109 ..Default::default()
110 }
111 }
112
113 pub fn frontend_default_options() -> Self {
115 Self::new(0, Role::Frontend)
117 .enable_store()
118 .enable_heartbeat()
119 .enable_procedure()
120 .enable_access_cluster_info()
121 }
122
123 pub fn datanode_default_options(member_id: u64) -> Self {
125 Self::new(member_id, Role::Datanode)
126 .enable_store()
127 .enable_heartbeat()
128 }
129
130 pub fn flownode_default_options(member_id: u64) -> Self {
132 Self::new(member_id, Role::Flownode)
133 .enable_store()
134 .enable_heartbeat()
135 .enable_procedure()
136 .enable_access_cluster_info()
137 }
138
139 pub fn enable_heartbeat(self) -> Self {
140 Self {
141 enable_heartbeat: true,
142 ..self
143 }
144 }
145
146 pub fn enable_store(self) -> Self {
147 Self {
148 enable_store: true,
149 ..self
150 }
151 }
152
153 pub fn enable_procedure(self) -> Self {
154 Self {
155 enable_procedure: true,
156 ..self
157 }
158 }
159
160 pub fn enable_access_cluster_info(self) -> Self {
161 Self {
162 enable_access_cluster_info: true,
163 ..self
164 }
165 }
166
167 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
168 Self {
169 channel_manager: Some(channel_manager),
170 ..self
171 }
172 }
173
174 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
175 Self {
176 ddl_channel_manager: Some(channel_manager),
177 ..self
178 }
179 }
180
181 pub fn ddl_timeout(self, timeout: Duration) -> Self {
182 Self {
183 ddl_timeout: Some(timeout),
184 ..self
185 }
186 }
187
188 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
189 Self {
190 heartbeat_channel_manager: Some(channel_manager),
191 ..self
192 }
193 }
194
195 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
196 Self {
197 region_follower: Some(region_follower),
198 ..self
199 }
200 }
201
202 pub fn build(self) -> MetaClient {
203 let mgr = self.channel_manager.unwrap_or_default();
204 let heartbeat_channel_manager = self
205 .heartbeat_channel_manager
206 .clone()
207 .unwrap_or_else(|| mgr.clone());
208
209 let heartbeat = self.enable_heartbeat.then(|| {
210 if self.heartbeat_channel_manager.is_some() {
211 info!("Enable heartbeat channel using the heartbeat channel manager.");
212 }
213
214 HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
215 });
216 let config = self
217 .enable_heartbeat
218 .then(|| ConfigClient::new(self.id, self.role, mgr.clone()));
219 let store = self
220 .enable_store
221 .then(|| StoreClient::new(self.id, self.role, mgr.clone()));
222 let procedure = self.enable_procedure.then(|| {
223 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
224 ProcedureClient::new(
225 self.id,
226 self.role,
227 mgr,
228 DEFAULT_SUBMIT_DDL_MAX_RETRY,
229 self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT),
230 )
231 });
232 let cluster = self
233 .enable_access_cluster_info
234 .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
235 let region_follower = self.region_follower.clone();
236
237 MetaClient {
238 id: self.id,
239 channel_manager: mgr.clone(),
240 leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
241 self.id,
242 self.role,
243 DEFAULT_ASK_LEADER_MAX_RETRY,
244 heartbeat_channel_manager,
245 )),
246 heartbeat,
247 config,
248 store,
249 procedure,
250 cluster,
251 region_follower,
252 }
253 }
254}
255
256#[derive(Debug)]
257pub struct MetaClient {
258 id: Id,
259 channel_manager: ChannelManager,
260 leader_provider_factory: LeaderProviderFactoryRef,
261 heartbeat: Option<HeartbeatClient>,
262 config: Option<ConfigClient>,
263 store: Option<StoreClient>,
264 procedure: Option<ProcedureClient>,
265 cluster: Option<ClusterClient>,
266 region_follower: Option<RegionFollowerClientRef>,
267}
268
269impl MetaClient {
270 pub fn new(id: Id, role: Role) -> Self {
271 Self {
272 id,
273 channel_manager: ChannelManager::default(),
274 leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
275 id,
276 role,
277 DEFAULT_ASK_LEADER_MAX_RETRY,
278 ChannelManager::default(),
279 )),
280 heartbeat: None,
281 config: None,
282 store: None,
283 procedure: None,
284 cluster: None,
285 region_follower: None,
286 }
287 }
288}
289
290pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
291
292#[async_trait::async_trait]
294pub trait RegionFollowerClient: Sync + Send + Debug {
295 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
296
297 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
298
299 async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
300
301 async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
302
303 async fn start(&self, urls: &[&str]) -> Result<()>;
304
305 async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
306}
307
308#[async_trait::async_trait]
309impl ProcedureExecutor for MetaClient {
310 async fn submit_ddl_task(
311 &self,
312 _ctx: &ExecutorContext,
313 request: SubmitDdlTaskRequest,
314 ) -> MetaResult<SubmitDdlTaskResponse> {
315 self.submit_ddl_task(request)
316 .await
317 .map_err(BoxedError::new)
318 .context(meta_error::ExternalSnafu)
319 }
320
321 async fn migrate_region(
322 &self,
323 _ctx: &ExecutorContext,
324 request: MigrateRegionRequest,
325 ) -> MetaResult<MigrateRegionResponse> {
326 self.migrate_region(request)
327 .await
328 .map_err(BoxedError::new)
329 .context(meta_error::ExternalSnafu)
330 }
331
332 async fn reconcile(
333 &self,
334 _ctx: &ExecutorContext,
335 request: ReconcileRequest,
336 ) -> MetaResult<ReconcileResponse> {
337 self.reconcile(request)
338 .await
339 .map_err(BoxedError::new)
340 .context(meta_error::ExternalSnafu)
341 }
342
343 async fn manage_region_follower(
344 &self,
345 _ctx: &ExecutorContext,
346 request: ManageRegionFollowerRequest,
347 ) -> MetaResult<()> {
348 if let Some(region_follower) = &self.region_follower {
349 match request {
350 ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
351 region_follower
352 .add_region_follower(add_region_follower_request)
353 .await
354 }
355 ManageRegionFollowerRequest::RemoveRegionFollower(
356 remove_region_follower_request,
357 ) => {
358 region_follower
359 .remove_region_follower(remove_region_follower_request)
360 .await
361 }
362 ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
363 region_follower
364 .add_table_follower(add_table_follower_request)
365 .await
366 }
367 ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
368 region_follower
369 .remove_table_follower(remove_table_follower_request)
370 .await
371 }
372 }
373 .map_err(BoxedError::new)
374 .context(meta_error::ExternalSnafu)
375 } else {
376 UnsupportedSnafu {
377 operation: "manage_region_follower",
378 }
379 .fail()
380 }
381 }
382
383 async fn query_procedure_state(
384 &self,
385 _ctx: &ExecutorContext,
386 pid: &str,
387 ) -> MetaResult<ProcedureStateResponse> {
388 self.query_procedure_state(pid)
389 .await
390 .map_err(BoxedError::new)
391 .context(meta_error::ExternalSnafu)
392 }
393
394 async fn gc_regions(
395 &self,
396 _ctx: &ExecutorContext,
397 request: GcRegionsRequest,
398 ) -> MetaResult<GcResponse> {
399 self.gc_regions(request)
400 .await
401 .map_err(BoxedError::new)
402 .context(meta_error::ExternalSnafu)
403 }
404
405 async fn gc_table(
406 &self,
407 _ctx: &ExecutorContext,
408 request: GcTableRequest,
409 ) -> MetaResult<GcResponse> {
410 self.gc_table(request)
411 .await
412 .map_err(BoxedError::new)
413 .context(meta_error::ExternalSnafu)
414 }
415
416 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
417 self.procedure_client()
418 .map_err(BoxedError::new)
419 .context(meta_error::ExternalSnafu)?
420 .list_procedures()
421 .await
422 .map_err(BoxedError::new)
423 .context(meta_error::ExternalSnafu)
424 }
425}
426
427#[allow(deprecated)]
429#[async_trait::async_trait]
430impl ClusterInfo for MetaClient {
431 type Error = Error;
432
433 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
434 let cluster_client = self.cluster_client()?;
435
436 let (get_metasrv_nodes, nodes_key_prefix) = match role {
437 None => (true, Some(NodeInfoKey::key_prefix())),
438 Some(ClusterRole::Metasrv) => (true, None),
439 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
440 };
441
442 let mut nodes = if get_metasrv_nodes {
443 let last_activity_ts = -1; let (leader, followers): (Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>) =
446 cluster_client.get_metasrv_peers().await?;
447 followers
448 .into_iter()
449 .map(|node| {
450 if let Some(node_info) = node.info {
451 NodeInfo {
452 peer: node.peer.unwrap_or_default(),
453 last_activity_ts,
454 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
455 version: node_info.version,
456 git_commit: node_info.git_commit,
457 start_time_ms: node_info.start_time_ms,
458 total_cpu_millicores: node_info.total_cpu_millicores,
459 total_memory_bytes: node_info.total_memory_bytes,
460 cpu_usage_millicores: node_info.cpu_usage_millicores,
461 memory_usage_bytes: node_info.memory_usage_bytes,
462 hostname: node_info.hostname,
463 }
464 } else {
465 NodeInfo {
467 peer: node.peer.unwrap_or_default(),
468 last_activity_ts,
469 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
470 version: node.version,
471 git_commit: node.git_commit,
472 start_time_ms: node.start_time_ms,
473 total_cpu_millicores: node.cpus as i64,
474 total_memory_bytes: node.memory_bytes as i64,
475 cpu_usage_millicores: 0,
476 memory_usage_bytes: 0,
477 hostname: "".to_string(),
478 }
479 }
480 })
481 .chain(leader.into_iter().map(|node| {
482 if let Some(node_info) = node.info {
483 NodeInfo {
484 peer: node.peer.unwrap_or_default(),
485 last_activity_ts,
486 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
487 version: node_info.version,
488 git_commit: node_info.git_commit,
489 start_time_ms: node_info.start_time_ms,
490 total_cpu_millicores: node_info.total_cpu_millicores,
491 total_memory_bytes: node_info.total_memory_bytes,
492 cpu_usage_millicores: node_info.cpu_usage_millicores,
493 memory_usage_bytes: node_info.memory_usage_bytes,
494 hostname: node_info.hostname,
495 }
496 } else {
497 NodeInfo {
499 peer: node.peer.unwrap_or_default(),
500 last_activity_ts,
501 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
502 version: node.version,
503 git_commit: node.git_commit,
504 start_time_ms: node.start_time_ms,
505 total_cpu_millicores: node.cpus as i64,
506 total_memory_bytes: node.memory_bytes as i64,
507 cpu_usage_millicores: 0,
508 memory_usage_bytes: 0,
509 hostname: "".to_string(),
510 }
511 }
512 }))
513 .collect::<Vec<_>>()
514 } else {
515 Vec::new()
516 };
517
518 if let Some(prefix) = nodes_key_prefix {
519 let req = RangeRequest::new().with_prefix(prefix);
520 let res = cluster_client.range(req).await?;
521 for kv in res.kvs {
522 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
523 }
524 }
525
526 Ok(nodes)
527 }
528
529 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
530 let cluster_kv_backend = Arc::new(self.cluster_client()?);
531 let range_prefix = DatanodeStatKey::prefix_key();
532 let req = RangeRequest::new().with_prefix(range_prefix);
533 let stream =
534 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
535 let mut datanode_stats = stream
536 .try_collect::<Vec<_>>()
537 .await
538 .context(ConvertMetaResponseSnafu)?;
539 let region_stats = datanode_stats
540 .iter_mut()
541 .flat_map(|datanode_stat| {
542 let last = datanode_stat.stats.pop();
543 last.map(|stat| stat.region_stats).unwrap_or_default()
544 })
545 .collect::<Vec<_>>();
546
547 Ok(region_stats)
548 }
549
550 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
551 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
552 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
553 let flow_state_manager = FlowStateManager::new(cluster_backend);
554 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
555
556 Ok(res.map(|r| r.into()))
557 }
558}
559
560#[async_trait::async_trait]
563impl PeerDiscovery for MetaClient {
564 async fn active_frontends(&self) -> MetaResult<Vec<Peer>> {
565 let nodes = self
566 .list_nodes(Some(ClusterRole::Frontend))
567 .await
568 .map_err(BoxedError::new)
569 .context(ExternalSnafu)?;
570 Ok(util::alive_frontends(
571 &DefaultSystemTimer,
572 nodes,
573 default_distributed_time_constants().frontend_heartbeat_interval,
576 ))
577 }
578
579 async fn active_datanodes(
580 &self,
581 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
582 ) -> MetaResult<Vec<Peer>> {
583 let nodes = self
584 .list_nodes(Some(ClusterRole::Datanode))
585 .await
586 .map_err(BoxedError::new)
587 .context(ExternalSnafu)?;
588 Ok(util::alive_datanodes(
589 &DefaultSystemTimer,
590 nodes,
591 default_distributed_time_constants().datanode_lease,
592 filter,
593 ))
594 }
595
596 async fn active_flownodes(
597 &self,
598 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
599 ) -> MetaResult<Vec<Peer>> {
600 let nodes = self
601 .list_nodes(Some(ClusterRole::Flownode))
602 .await
603 .map_err(BoxedError::new)
604 .context(ExternalSnafu)?;
605 Ok(util::alive_flownodes(
606 &DefaultSystemTimer,
607 nodes,
608 default_distributed_time_constants().flownode_lease,
609 filter,
610 ))
611 }
612}
613
614fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
615 DatanodeStatValue::try_from(kv.value)
616 .map_err(BoxedError::new)
617 .context(ExternalSnafu)
618}
619
620impl MetaClient {
621 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
622 where
623 U: AsRef<str>,
624 A: AsRef<[U]> + Clone,
625 {
626 info!("MetaClient channel config: {:?}", self.channel_config());
627
628 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
629 let leader_provider = self.leader_provider_factory.create(&urls);
630
631 self.start_with(leader_provider, urls).await
632 }
633
634 pub(crate) async fn start_with<U, A>(
636 &mut self,
637 leader_provider: LeaderProviderRef,
638 peers: A,
639 ) -> Result<()>
640 where
641 U: AsRef<str>,
642 A: AsRef<[U]> + Clone,
643 {
644 if let Some(client) = &self.region_follower {
645 info!("Starting region follower client ...");
646 client.start_with(leader_provider.clone()).await?;
647 }
648
649 if let Some(client) = &self.heartbeat {
650 info!("Starting heartbeat client ...");
651 client.start_with(leader_provider.clone()).await?;
652 }
653
654 if let Some(client) = &self.config {
655 info!("Starting config client ...");
656 client.start_with(leader_provider.clone()).await?;
657 }
658
659 if let Some(client) = &mut self.store {
660 info!("Starting store client ...");
661 client.start(peers.clone()).await?;
662 }
663
664 if let Some(client) = &self.procedure {
665 info!("Starting procedure client ...");
666 client.start_with(leader_provider.clone()).await?;
667 }
668
669 if let Some(client) = &mut self.cluster {
670 info!("Starting cluster client ...");
671 client.start_with(leader_provider).await?;
672 }
673 Ok(())
674 }
675
676 pub async fn ask_leader(&self) -> Result<String> {
679 self.heartbeat_client()?.ask_leader().await
680 }
681
682 pub async fn pull_config<T, U>(&self, deserializer: T) -> Result<U>
683 where
684 T: PluginOptionsDeserializer<U>,
685 U: DeserializeOwned,
686 {
687 let res = self.config_client()?.pull_config().await?;
688 let v = deserializer
689 .deserialize(&res.payload)
690 .context(ConvertMetaConfigSnafu)?;
691 Ok(v)
692 }
693
694 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
703 self.heartbeat_client()?.heartbeat().await
704 }
705
706 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
708 self.store_client()?
709 .range(req.into())
710 .await?
711 .try_into()
712 .context(ConvertMetaResponseSnafu)
713 }
714
715 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
717 self.store_client()?
718 .put(req.into())
719 .await?
720 .try_into()
721 .context(ConvertMetaResponseSnafu)
722 }
723
724 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
726 self.store_client()?
727 .batch_get(req.into())
728 .await?
729 .try_into()
730 .context(ConvertMetaResponseSnafu)
731 }
732
733 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
735 self.store_client()?
736 .batch_put(req.into())
737 .await?
738 .try_into()
739 .context(ConvertMetaResponseSnafu)
740 }
741
742 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
744 self.store_client()?
745 .batch_delete(req.into())
746 .await?
747 .try_into()
748 .context(ConvertMetaResponseSnafu)
749 }
750
751 pub async fn compare_and_put(
754 &self,
755 req: CompareAndPutRequest,
756 ) -> Result<CompareAndPutResponse> {
757 self.store_client()?
758 .compare_and_put(req.into())
759 .await?
760 .try_into()
761 .context(ConvertMetaResponseSnafu)
762 }
763
764 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
766 self.store_client()?
767 .delete_range(req.into())
768 .await?
769 .try_into()
770 .context(ConvertMetaResponseSnafu)
771 }
772
773 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
775 self.procedure_client()?.query_procedure_state(pid).await
776 }
777
778 pub async fn migrate_region(
780 &self,
781 request: MigrateRegionRequest,
782 ) -> Result<MigrateRegionResponse> {
783 self.procedure_client()?
784 .migrate_region(
785 request.region_id,
786 request.from_peer,
787 request.to_peer,
788 request.timeout,
789 )
790 .await
791 }
792
793 pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
795 self.procedure_client()?.reconcile(request).await
796 }
797
798 pub async fn gc_regions(&self, request: GcRegionsRequest) -> Result<GcResponse> {
800 self.procedure_client()?.gc_regions(request).await
801 }
802
803 pub async fn gc_table(&self, request: GcTableRequest) -> Result<GcResponse> {
805 self.procedure_client()?.gc_table(request).await
806 }
807
808 pub async fn submit_ddl_task(
810 &self,
811 req: SubmitDdlTaskRequest,
812 ) -> Result<SubmitDdlTaskResponse> {
813 let res = self
814 .procedure_client()?
815 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
816 .await?
817 .try_into()
818 .context(ConvertMetaResponseSnafu)?;
819
820 Ok(res)
821 }
822
823 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
824 self.heartbeat.clone().context(NotStartedSnafu {
825 name: "heartbeat_client",
826 })
827 }
828
829 pub fn config_client(&self) -> Result<ConfigClient> {
830 self.config.clone().context(NotStartedSnafu {
831 name: "config_client",
832 })
833 }
834
835 pub fn store_client(&self) -> Result<StoreClient> {
836 self.store.clone().context(NotStartedSnafu {
837 name: "store_client",
838 })
839 }
840
841 pub fn procedure_client(&self) -> Result<ProcedureClient> {
842 self.procedure.clone().context(NotStartedSnafu {
843 name: "procedure_client",
844 })
845 }
846
847 pub fn cluster_client(&self) -> Result<ClusterClient> {
848 self.cluster.clone().context(NotStartedSnafu {
849 name: "cluster_client",
850 })
851 }
852
853 pub fn channel_config(&self) -> &ChannelConfig {
854 self.channel_manager.config()
855 }
856
857 pub fn id(&self) -> Id {
858 self.id
859 }
860}
861
862#[cfg(test)]
863mod tests {
864 use std::sync::atomic::{AtomicUsize, Ordering};
865
866 use api::v1::meta::{HeartbeatRequest, Peer};
867 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
868 use rand::Rng;
869
870 use super::*;
871 use crate::error;
872 use crate::mocks::{self, MockMetaContext};
873
874 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
875
876 struct TestClient {
877 ns: String,
878 client: MetaClient,
879 meta_ctx: MockMetaContext,
880 }
881
882 impl TestClient {
883 async fn new(ns: impl Into<String>) -> Self {
884 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
886 Self {
887 ns: ns.into(),
888 client,
889 meta_ctx,
890 }
891 }
892
893 fn key(&self, name: &str) -> Vec<u8> {
894 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
895 }
896
897 async fn gen_data(&self) {
898 for i in 0..10 {
899 let req = PutRequest::new()
900 .with_key(self.key(&format!("key-{i}")))
901 .with_value(format!("{}-{}", "value", i).into_bytes())
902 .with_prev_kv();
903 let res = self.client.put(req).await;
904 let _ = res.unwrap();
905 }
906 }
907
908 async fn clear_data(&self) {
909 let req =
910 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
911 let res = self.client.delete_range(req).await;
912 let _ = res.unwrap();
913 }
914
915 #[allow(dead_code)]
916 fn kv_backend(&self) -> KvBackendRef {
917 self.meta_ctx.kv_backend.clone()
918 }
919
920 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
921 self.meta_ctx.in_memory.clone()
922 }
923 }
924
925 async fn new_client(ns: impl Into<String>) -> TestClient {
926 let client = TestClient::new(ns).await;
927 client.clear_data().await;
928 client
929 }
930
931 #[tokio::test]
932 async fn test_meta_client_builder() {
933 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
934
935 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
936 .enable_heartbeat()
937 .build();
938 let _ = meta_client.heartbeat_client().unwrap();
939 assert!(meta_client.store_client().is_err());
940 meta_client.start(urls).await.unwrap();
941
942 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
943 assert!(meta_client.heartbeat_client().is_err());
944 assert!(meta_client.store_client().is_err());
945 meta_client.start(urls).await.unwrap();
946
947 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
948 .enable_store()
949 .build();
950 assert!(meta_client.heartbeat_client().is_err());
951 let _ = meta_client.store_client().unwrap();
952 meta_client.start(urls).await.unwrap();
953
954 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
955 .enable_heartbeat()
956 .enable_store()
957 .build();
958 assert_eq!(2, meta_client.id());
959 assert_eq!(2, meta_client.id());
960 let _ = meta_client.heartbeat_client().unwrap();
961 let _ = meta_client.store_client().unwrap();
962 meta_client.start(urls).await.unwrap();
963 }
964
965 #[tokio::test]
966 async fn test_not_start_heartbeat_client() {
967 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
968 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
969 .enable_store()
970 .build();
971 meta_client.start(urls).await.unwrap();
972 let res = meta_client.ask_leader().await;
973 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
974 }
975
976 #[tokio::test]
977 async fn test_not_start_store_client() {
978 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
979 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
980 .enable_heartbeat()
981 .build();
982
983 meta_client.start(urls).await.unwrap();
984 let res = meta_client.put(PutRequest::default()).await;
985 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
986 }
987
988 #[tokio::test]
989 async fn test_ask_leader() {
990 let tc = new_client("test_ask_leader").await;
991 tc.client.ask_leader().await.unwrap();
992 }
993
994 #[tokio::test]
995 async fn test_heartbeat() {
996 let tc = new_client("test_heartbeat").await;
997 let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
998 let request_sent = Arc::new(AtomicUsize::new(0));
1001 let request_sent_clone = request_sent.clone();
1002 let _handle = tokio::spawn(async move {
1003 for _ in 0..5 {
1004 let req = HeartbeatRequest {
1005 peer: Some(Peer {
1006 id: 1,
1007 addr: "meta_client_peer".to_string(),
1008 }),
1009 ..Default::default()
1010 };
1011 sender.send(req).await.unwrap();
1012 request_sent_clone.fetch_add(1, Ordering::Relaxed);
1013 }
1014 });
1015
1016 let heartbeat_count = Arc::new(AtomicUsize::new(0));
1017 let heartbeat_count_clone = heartbeat_count.clone();
1018 let handle = tokio::spawn(async move {
1019 while let Some(_resp) = receiver.message().await.unwrap() {
1020 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
1021 }
1022 });
1023
1024 handle.await.unwrap();
1025 assert_eq!(
1027 request_sent.load(Ordering::Relaxed) + 1,
1028 heartbeat_count.load(Ordering::Relaxed)
1029 );
1030 }
1031
1032 #[tokio::test]
1033 async fn test_range_get() {
1034 let tc = new_client("test_range_get").await;
1035 tc.gen_data().await;
1036
1037 let key = tc.key("key-0");
1038 let req = RangeRequest::new().with_key(key.as_slice());
1039 let res = tc.client.range(req).await;
1040 let mut kvs = res.unwrap().take_kvs();
1041 assert_eq!(1, kvs.len());
1042 let mut kv = kvs.pop().unwrap();
1043 assert_eq!(key, kv.take_key());
1044 assert_eq!(b"value-0".to_vec(), kv.take_value());
1045 }
1046
1047 #[tokio::test]
1048 async fn test_range_get_prefix() {
1049 let tc = new_client("test_range_get_prefix").await;
1050 tc.gen_data().await;
1051
1052 let req = RangeRequest::new().with_prefix(tc.key("key-"));
1053 let res = tc.client.range(req).await;
1054 let kvs = res.unwrap().take_kvs();
1055 assert_eq!(10, kvs.len());
1056 for (i, mut kv) in kvs.into_iter().enumerate() {
1057 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
1058 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1059 }
1060 }
1061
1062 #[tokio::test]
1063 async fn test_range() {
1064 let tc = new_client("test_range").await;
1065 tc.gen_data().await;
1066
1067 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
1068 let res = tc.client.range(req).await;
1069 let kvs = res.unwrap().take_kvs();
1070 assert_eq!(3, kvs.len());
1071 for (i, mut kv) in kvs.into_iter().enumerate() {
1072 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1073 assert_eq!(
1074 format!("{}-{}", "value", i + 5).into_bytes(),
1075 kv.take_value()
1076 );
1077 }
1078 }
1079
1080 #[tokio::test]
1081 async fn test_range_keys_only() {
1082 let tc = new_client("test_range_keys_only").await;
1083 tc.gen_data().await;
1084
1085 let req = RangeRequest::new()
1086 .with_range(tc.key("key-5"), tc.key("key-8"))
1087 .with_keys_only();
1088 let res = tc.client.range(req).await;
1089 let kvs = res.unwrap().take_kvs();
1090 assert_eq!(3, kvs.len());
1091 for (i, mut kv) in kvs.into_iter().enumerate() {
1092 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
1093 assert!(kv.take_value().is_empty());
1094 }
1095 }
1096
1097 #[tokio::test]
1098 async fn test_put() {
1099 let tc = new_client("test_put").await;
1100
1101 let req = PutRequest::new()
1102 .with_key(tc.key("key"))
1103 .with_value(b"value".to_vec());
1104 let res = tc.client.put(req).await;
1105 assert!(res.unwrap().prev_kv.is_none());
1106 }
1107
1108 #[tokio::test]
1109 async fn test_put_with_prev_kv() {
1110 let tc = new_client("test_put_with_prev_kv").await;
1111
1112 let key = tc.key("key");
1113 let req = PutRequest::new()
1114 .with_key(key.as_slice())
1115 .with_value(b"value".to_vec())
1116 .with_prev_kv();
1117 let res = tc.client.put(req).await;
1118 assert!(res.unwrap().prev_kv.is_none());
1119
1120 let req = PutRequest::new()
1121 .with_key(key.as_slice())
1122 .with_value(b"value1".to_vec())
1123 .with_prev_kv();
1124 let res = tc.client.put(req).await;
1125 let mut kv = res.unwrap().prev_kv.unwrap();
1126 assert_eq!(key, kv.take_key());
1127 assert_eq!(b"value".to_vec(), kv.take_value());
1128 }
1129
1130 #[tokio::test]
1131 async fn test_batch_put() {
1132 let tc = new_client("test_batch_put").await;
1133
1134 let mut req = BatchPutRequest::new();
1135 for i in 0..275 {
1136 req = req.add_kv(
1137 tc.key(&format!("key-{}", i)),
1138 format!("value-{}", i).into_bytes(),
1139 );
1140 }
1141
1142 let res = tc.client.batch_put(req).await;
1143 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1144
1145 let req = RangeRequest::new().with_prefix(tc.key("key-"));
1146 let res = tc.client.range(req).await;
1147 let kvs = res.unwrap().take_kvs();
1148 assert_eq!(275, kvs.len());
1149 }
1150
1151 #[tokio::test]
1152 async fn test_batch_get() {
1153 let tc = new_client("test_batch_get").await;
1154 tc.gen_data().await;
1155
1156 let mut req = BatchGetRequest::default();
1157 for i in 0..256 {
1158 req = req.add_key(tc.key(&format!("key-{}", i)));
1159 }
1160 let res = tc.client.batch_get(req).await.unwrap();
1161 assert_eq!(10, res.kvs.len());
1162
1163 let req = BatchGetRequest::default()
1164 .add_key(tc.key("key-1"))
1165 .add_key(tc.key("key-999"));
1166 let res = tc.client.batch_get(req).await.unwrap();
1167 assert_eq!(1, res.kvs.len());
1168 }
1169
1170 #[tokio::test]
1171 async fn test_batch_put_with_prev_kv() {
1172 let tc = new_client("test_batch_put_with_prev_kv").await;
1173
1174 let key = tc.key("key");
1175 let key2 = tc.key("key2");
1176 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
1177 let res = tc.client.batch_put(req).await;
1178 assert_eq!(0, res.unwrap().take_prev_kvs().len());
1179
1180 let req = BatchPutRequest::new()
1181 .add_kv(key.as_slice(), b"value-".to_vec())
1182 .add_kv(key2.as_slice(), b"value2-".to_vec())
1183 .with_prev_kv();
1184 let res = tc.client.batch_put(req).await;
1185 let mut kvs = res.unwrap().take_prev_kvs();
1186 assert_eq!(1, kvs.len());
1187 let mut kv = kvs.pop().unwrap();
1188 assert_eq!(key, kv.take_key());
1189 assert_eq!(b"value".to_vec(), kv.take_value());
1190 }
1191
1192 #[tokio::test]
1193 async fn test_compare_and_put() {
1194 let tc = new_client("test_compare_and_put").await;
1195
1196 let key = tc.key("key");
1197 let req = CompareAndPutRequest::new()
1198 .with_key(key.as_slice())
1199 .with_expect(b"expect".to_vec())
1200 .with_value(b"value".to_vec());
1201 let res = tc.client.compare_and_put(req).await;
1202 assert!(!res.unwrap().is_success());
1203
1204 let req = CompareAndPutRequest::new()
1206 .with_key(key.as_slice())
1207 .with_value(b"value".to_vec());
1208 let res = tc.client.compare_and_put(req).await;
1209 let mut res = res.unwrap();
1210 assert!(res.is_success());
1211 assert!(res.take_prev_kv().is_none());
1212
1213 let req = CompareAndPutRequest::new()
1215 .with_key(key.as_slice())
1216 .with_expect(b"not_eq".to_vec())
1217 .with_value(b"value2".to_vec());
1218 let res = tc.client.compare_and_put(req).await;
1219 let mut res = res.unwrap();
1220 assert!(!res.is_success());
1221 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
1222
1223 let req = CompareAndPutRequest::new()
1225 .with_key(key.as_slice())
1226 .with_expect(b"value".to_vec())
1227 .with_value(b"value2".to_vec());
1228 let res = tc.client.compare_and_put(req).await;
1229 let mut res = res.unwrap();
1230 assert!(res.is_success());
1231
1232 assert!(res.take_prev_kv().is_none());
1234 }
1235
1236 #[tokio::test]
1237 async fn test_delete_with_key() {
1238 let tc = new_client("test_delete_with_key").await;
1239 tc.gen_data().await;
1240
1241 let req = DeleteRangeRequest::new()
1242 .with_key(tc.key("key-0"))
1243 .with_prev_kv();
1244 let res = tc.client.delete_range(req).await;
1245 let mut res = res.unwrap();
1246 assert_eq!(1, res.deleted());
1247 let mut kvs = res.take_prev_kvs();
1248 assert_eq!(1, kvs.len());
1249 let mut kv = kvs.pop().unwrap();
1250 assert_eq!(b"value-0".to_vec(), kv.take_value());
1251 }
1252
1253 #[tokio::test]
1254 async fn test_delete_with_prefix() {
1255 let tc = new_client("test_delete_with_prefix").await;
1256 tc.gen_data().await;
1257
1258 let req = DeleteRangeRequest::new()
1259 .with_prefix(tc.key("key-"))
1260 .with_prev_kv();
1261 let res = tc.client.delete_range(req).await;
1262 let mut res = res.unwrap();
1263 assert_eq!(10, res.deleted());
1264 let kvs = res.take_prev_kvs();
1265 assert_eq!(10, kvs.len());
1266 for (i, mut kv) in kvs.into_iter().enumerate() {
1267 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1268 }
1269 }
1270
1271 #[tokio::test]
1272 async fn test_delete_with_range() {
1273 let tc = new_client("test_delete_with_range").await;
1274 tc.gen_data().await;
1275
1276 let req = DeleteRangeRequest::new()
1277 .with_range(tc.key("key-2"), tc.key("key-7"))
1278 .with_prev_kv();
1279 let res = tc.client.delete_range(req).await;
1280 let mut res = res.unwrap();
1281 assert_eq!(5, res.deleted());
1282 let kvs = res.take_prev_kvs();
1283 assert_eq!(5, kvs.len());
1284 for (i, mut kv) in kvs.into_iter().enumerate() {
1285 assert_eq!(
1286 format!("{}-{}", "value", i + 2).into_bytes(),
1287 kv.take_value()
1288 );
1289 }
1290 }
1291
1292 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1293 Ok(())
1294 }
1295
1296 #[tokio::test]
1297 async fn test_cluster_client_adaptive_range() {
1298 let tx = new_client("test_cluster_client").await;
1299 let in_memory = tx.in_memory().unwrap();
1300 let cluster_client = tx.client.cluster_client().unwrap();
1301 let mut rng = rand::rng();
1302
1303 for i in 0..10 {
1305 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1306 in_memory
1307 .put(
1308 PutRequest::new()
1309 .with_key(format!("__prefix/{i}").as_bytes())
1310 .with_value(data.clone()),
1311 )
1312 .await
1313 .unwrap();
1314 }
1315
1316 let req = RangeRequest::new().with_prefix(b"__prefix/");
1317 let stream =
1318 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1319
1320 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1321 assert_eq!(10, res.len());
1322 }
1323}