1mod ask_leader;
16mod heartbeat;
17mod load_balance;
18mod procedure;
19
20mod cluster;
21mod store;
22mod util;
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use api::v1::meta::{ProcedureDetailResponse, Role};
28pub use ask_leader::AskLeader;
29use cluster::Client as ClusterClient;
30pub use cluster::ClusterKvBackend;
31use common_error::ext::BoxedError;
32use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
33use common_meta::cluster::{
34 ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
35};
36use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
37use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
38use common_meta::error::{
39 self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
40};
41use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::range_stream::PaginationStream;
44use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
45use common_meta::rpc::procedure::{
46 AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
47 RemoveRegionFollowerRequest,
48};
49use common_meta::rpc::store::{
50 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
51 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
52 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
53};
54use common_meta::rpc::KeyValue;
55use common_telemetry::info;
56use futures::TryStreamExt;
57use heartbeat::Client as HeartbeatClient;
58use procedure::Client as ProcedureClient;
59use snafu::{OptionExt, ResultExt};
60use store::Client as StoreClient;
61
62pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
63use crate::error::{
64 ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
65 Result,
66};
67
68pub type Id = u64;
69
70const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
71const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
72const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
73
74#[derive(Clone, Debug, Default)]
75pub struct MetaClientBuilder {
76 id: Id,
77 role: Role,
78 enable_heartbeat: bool,
79 enable_store: bool,
80 enable_procedure: bool,
81 enable_access_cluster_info: bool,
82 region_follower: Option<RegionFollowerClientRef>,
83 channel_manager: Option<ChannelManager>,
84 ddl_channel_manager: Option<ChannelManager>,
85 heartbeat_channel_manager: Option<ChannelManager>,
86}
87
88impl MetaClientBuilder {
89 pub fn new(member_id: u64, role: Role) -> Self {
90 Self {
91 id: member_id,
92 role,
93 ..Default::default()
94 }
95 }
96
97 pub fn frontend_default_options() -> Self {
99 Self::new(0, Role::Frontend)
101 .enable_store()
102 .enable_heartbeat()
103 .enable_procedure()
104 .enable_access_cluster_info()
105 }
106
107 pub fn datanode_default_options(member_id: u64) -> Self {
109 Self::new(member_id, Role::Datanode)
110 .enable_store()
111 .enable_heartbeat()
112 }
113
114 pub fn flownode_default_options(member_id: u64) -> Self {
116 Self::new(member_id, Role::Flownode)
117 .enable_store()
118 .enable_heartbeat()
119 .enable_procedure()
120 .enable_access_cluster_info()
121 }
122
123 pub fn enable_heartbeat(self) -> Self {
124 Self {
125 enable_heartbeat: true,
126 ..self
127 }
128 }
129
130 pub fn enable_store(self) -> Self {
131 Self {
132 enable_store: true,
133 ..self
134 }
135 }
136
137 pub fn enable_procedure(self) -> Self {
138 Self {
139 enable_procedure: true,
140 ..self
141 }
142 }
143
144 pub fn enable_access_cluster_info(self) -> Self {
145 Self {
146 enable_access_cluster_info: true,
147 ..self
148 }
149 }
150
151 pub fn channel_manager(self, channel_manager: ChannelManager) -> Self {
152 Self {
153 channel_manager: Some(channel_manager),
154 ..self
155 }
156 }
157
158 pub fn ddl_channel_manager(self, channel_manager: ChannelManager) -> Self {
159 Self {
160 ddl_channel_manager: Some(channel_manager),
161 ..self
162 }
163 }
164
165 pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
166 Self {
167 heartbeat_channel_manager: Some(channel_manager),
168 ..self
169 }
170 }
171
172 pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
173 Self {
174 region_follower: Some(region_follower),
175 ..self
176 }
177 }
178
179 pub fn build(self) -> MetaClient {
180 let mut client = if let Some(mgr) = self.channel_manager {
181 MetaClient::with_channel_manager(self.id, mgr)
182 } else {
183 MetaClient::new(self.id)
184 };
185
186 let mgr = client.channel_manager.clone();
187
188 if self.enable_heartbeat {
189 let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
190 client.heartbeat = Some(HeartbeatClient::new(
191 self.id,
192 self.role,
193 mgr,
194 DEFAULT_ASK_LEADER_MAX_RETRY,
195 ));
196 }
197
198 if self.enable_store {
199 client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
200 }
201
202 if self.enable_procedure {
203 let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
204 client.procedure = Some(ProcedureClient::new(
205 self.id,
206 self.role,
207 mgr,
208 DEFAULT_SUBMIT_DDL_MAX_RETRY,
209 ));
210 }
211
212 if self.enable_access_cluster_info {
213 client.cluster = Some(ClusterClient::new(
214 self.id,
215 self.role,
216 mgr,
217 DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
218 ))
219 }
220
221 if let Some(region_follower) = self.region_follower {
222 client.region_follower = Some(region_follower);
223 }
224
225 client
226 }
227}
228
229#[derive(Debug, Default)]
230pub struct MetaClient {
231 id: Id,
232 channel_manager: ChannelManager,
233 heartbeat: Option<HeartbeatClient>,
234 store: Option<StoreClient>,
235 procedure: Option<ProcedureClient>,
236 cluster: Option<ClusterClient>,
237 region_follower: Option<RegionFollowerClientRef>,
238}
239
240pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
241
242#[async_trait::async_trait]
244pub trait RegionFollowerClient: Sync + Send + Debug {
245 async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
246
247 async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
248
249 async fn start(&self, urls: &[&str]) -> Result<()>;
250}
251
252#[async_trait::async_trait]
253impl ProcedureExecutor for MetaClient {
254 async fn submit_ddl_task(
255 &self,
256 _ctx: &ExecutorContext,
257 request: SubmitDdlTaskRequest,
258 ) -> MetaResult<SubmitDdlTaskResponse> {
259 self.submit_ddl_task(request)
260 .await
261 .map_err(BoxedError::new)
262 .context(meta_error::ExternalSnafu)
263 }
264
265 async fn migrate_region(
266 &self,
267 _ctx: &ExecutorContext,
268 request: MigrateRegionRequest,
269 ) -> MetaResult<MigrateRegionResponse> {
270 self.migrate_region(request)
271 .await
272 .map_err(BoxedError::new)
273 .context(meta_error::ExternalSnafu)
274 }
275
276 async fn add_region_follower(
277 &self,
278 _ctx: &ExecutorContext,
279 request: AddRegionFollowerRequest,
280 ) -> MetaResult<()> {
281 if let Some(region_follower) = &self.region_follower {
282 region_follower
283 .add_region_follower(request)
284 .await
285 .map_err(BoxedError::new)
286 .context(meta_error::ExternalSnafu)
287 } else {
288 UnsupportedSnafu {
289 operation: "add_region_follower",
290 }
291 .fail()
292 }
293 }
294
295 async fn remove_region_follower(
296 &self,
297 _ctx: &ExecutorContext,
298 request: RemoveRegionFollowerRequest,
299 ) -> MetaResult<()> {
300 if let Some(region_follower) = &self.region_follower {
301 region_follower
302 .remove_region_follower(request)
303 .await
304 .map_err(BoxedError::new)
305 .context(meta_error::ExternalSnafu)
306 } else {
307 UnsupportedSnafu {
308 operation: "remove_region_follower",
309 }
310 .fail()
311 }
312 }
313
314 async fn query_procedure_state(
315 &self,
316 _ctx: &ExecutorContext,
317 pid: &str,
318 ) -> MetaResult<ProcedureStateResponse> {
319 self.query_procedure_state(pid)
320 .await
321 .map_err(BoxedError::new)
322 .context(meta_error::ExternalSnafu)
323 }
324
325 async fn list_procedures(&self, _ctx: &ExecutorContext) -> MetaResult<ProcedureDetailResponse> {
326 self.procedure_client()
327 .map_err(BoxedError::new)
328 .context(meta_error::ExternalSnafu)?
329 .list_procedures()
330 .await
331 .map_err(BoxedError::new)
332 .context(meta_error::ExternalSnafu)
333 }
334}
335
336#[async_trait::async_trait]
337impl ClusterInfo for MetaClient {
338 type Error = Error;
339
340 async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
341 let cluster_client = self.cluster_client()?;
342
343 let (get_metasrv_nodes, nodes_key_prefix) = match role {
344 None => (true, Some(NodeInfoKey::key_prefix())),
345 Some(ClusterRole::Metasrv) => (true, None),
346 Some(role) => (false, Some(NodeInfoKey::key_prefix_with_role(role))),
347 };
348
349 let mut nodes = if get_metasrv_nodes {
350 let last_activity_ts = -1; let (leader, followers) = cluster_client.get_metasrv_peers().await?;
353 followers
354 .into_iter()
355 .map(|node| NodeInfo {
356 peer: node.peer.unwrap_or_default(),
357 last_activity_ts,
358 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
359 version: node.version,
360 git_commit: node.git_commit,
361 start_time_ms: node.start_time_ms,
362 })
363 .chain(leader.into_iter().map(|node| NodeInfo {
364 peer: node.peer.unwrap_or_default(),
365 last_activity_ts,
366 status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
367 version: node.version,
368 git_commit: node.git_commit,
369 start_time_ms: node.start_time_ms,
370 }))
371 .collect::<Vec<_>>()
372 } else {
373 Vec::new()
374 };
375
376 if let Some(prefix) = nodes_key_prefix {
377 let req = RangeRequest::new().with_prefix(prefix);
378 let res = cluster_client.range(req).await?;
379 for kv in res.kvs {
380 nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
381 }
382 }
383
384 Ok(nodes)
385 }
386
387 async fn list_region_stats(&self) -> Result<Vec<RegionStat>> {
388 let cluster_kv_backend = Arc::new(self.cluster_client()?);
389 let range_prefix = DatanodeStatKey::prefix_key();
390 let req = RangeRequest::new().with_prefix(range_prefix);
391 let stream =
392 PaginationStream::new(cluster_kv_backend, req, 256, decode_stats).into_stream();
393 let mut datanode_stats = stream
394 .try_collect::<Vec<_>>()
395 .await
396 .context(ConvertMetaResponseSnafu)?;
397 let region_stats = datanode_stats
398 .iter_mut()
399 .flat_map(|datanode_stat| {
400 let last = datanode_stat.stats.pop();
401 last.map(|stat| stat.region_stats).unwrap_or_default()
402 })
403 .collect::<Vec<_>>();
404
405 Ok(region_stats)
406 }
407
408 async fn list_flow_stats(&self) -> Result<Option<FlowStat>> {
409 let cluster_backend = ClusterKvBackend::new(Arc::new(self.cluster_client()?));
410 let cluster_backend = Arc::new(cluster_backend) as KvBackendRef;
411 let flow_state_manager = FlowStateManager::new(cluster_backend);
412 let res = flow_state_manager.get().await.context(GetFlowStatSnafu)?;
413
414 Ok(res.map(|r| r.into()))
415 }
416}
417
418fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
419 DatanodeStatValue::try_from(kv.value)
420 .map_err(BoxedError::new)
421 .context(ExternalSnafu)
422}
423
424impl MetaClient {
425 pub fn new(id: Id) -> Self {
426 Self {
427 id,
428 ..Default::default()
429 }
430 }
431
432 pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
433 Self {
434 id,
435 channel_manager,
436 ..Default::default()
437 }
438 }
439
440 pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
441 where
442 U: AsRef<str>,
443 A: AsRef<[U]> + Clone,
444 {
445 info!("MetaClient channel config: {:?}", self.channel_config());
446
447 if let Some(client) = &mut self.region_follower {
448 let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
449 client.start(&urls).await?;
450 info!("Region follower client started");
451 }
452 if let Some(client) = &mut self.heartbeat {
453 client.start(urls.clone()).await?;
454 info!("Heartbeat client started");
455 }
456 if let Some(client) = &mut self.store {
457 client.start(urls.clone()).await?;
458 info!("Store client started");
459 }
460 if let Some(client) = &mut self.procedure {
461 client.start(urls.clone()).await?;
462 info!("DDL client started");
463 }
464 if let Some(client) = &mut self.cluster {
465 client.start(urls).await?;
466 info!("Cluster client started");
467 }
468
469 Ok(())
470 }
471
472 pub async fn ask_leader(&self) -> Result<String> {
475 self.heartbeat_client()?.ask_leader().await
476 }
477
478 pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
485 self.heartbeat_client()?.heartbeat().await
486 }
487
488 pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
490 self.store_client()?
491 .range(req.into())
492 .await?
493 .try_into()
494 .context(ConvertMetaResponseSnafu)
495 }
496
497 pub async fn put(&self, req: PutRequest) -> Result<PutResponse> {
499 self.store_client()?
500 .put(req.into())
501 .await?
502 .try_into()
503 .context(ConvertMetaResponseSnafu)
504 }
505
506 pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
508 self.store_client()?
509 .batch_get(req.into())
510 .await?
511 .try_into()
512 .context(ConvertMetaResponseSnafu)
513 }
514
515 pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
517 self.store_client()?
518 .batch_put(req.into())
519 .await?
520 .try_into()
521 .context(ConvertMetaResponseSnafu)
522 }
523
524 pub async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
526 self.store_client()?
527 .batch_delete(req.into())
528 .await?
529 .try_into()
530 .context(ConvertMetaResponseSnafu)
531 }
532
533 pub async fn compare_and_put(
536 &self,
537 req: CompareAndPutRequest,
538 ) -> Result<CompareAndPutResponse> {
539 self.store_client()?
540 .compare_and_put(req.into())
541 .await?
542 .try_into()
543 .context(ConvertMetaResponseSnafu)
544 }
545
546 pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
548 self.store_client()?
549 .delete_range(req.into())
550 .await?
551 .try_into()
552 .context(ConvertMetaResponseSnafu)
553 }
554
555 pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
557 self.procedure_client()?.query_procedure_state(pid).await
558 }
559
560 pub async fn migrate_region(
562 &self,
563 request: MigrateRegionRequest,
564 ) -> Result<MigrateRegionResponse> {
565 self.procedure_client()?
566 .migrate_region(
567 request.region_id,
568 request.from_peer,
569 request.to_peer,
570 request.timeout,
571 )
572 .await
573 }
574
575 pub async fn submit_ddl_task(
577 &self,
578 req: SubmitDdlTaskRequest,
579 ) -> Result<SubmitDdlTaskResponse> {
580 let res = self
581 .procedure_client()?
582 .submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
583 .await?
584 .try_into()
585 .context(ConvertMetaResponseSnafu)?;
586
587 Ok(res)
588 }
589
590 pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
591 self.heartbeat.clone().context(NotStartedSnafu {
592 name: "heartbeat_client",
593 })
594 }
595
596 pub fn store_client(&self) -> Result<StoreClient> {
597 self.store.clone().context(NotStartedSnafu {
598 name: "store_client",
599 })
600 }
601
602 pub fn procedure_client(&self) -> Result<ProcedureClient> {
603 self.procedure.clone().context(NotStartedSnafu {
604 name: "procedure_client",
605 })
606 }
607
608 pub fn cluster_client(&self) -> Result<ClusterClient> {
609 self.cluster.clone().context(NotStartedSnafu {
610 name: "cluster_client",
611 })
612 }
613
614 pub fn channel_config(&self) -> &ChannelConfig {
615 self.channel_manager.config()
616 }
617
618 pub fn id(&self) -> Id {
619 self.id
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use std::sync::atomic::{AtomicUsize, Ordering};
626
627 use api::v1::meta::{HeartbeatRequest, Peer};
628 use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
629 use rand::Rng;
630
631 use super::*;
632 use crate::error;
633 use crate::mocks::{self, MockMetaContext};
634
635 const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
636
637 struct TestClient {
638 ns: String,
639 client: MetaClient,
640 meta_ctx: MockMetaContext,
641 }
642
643 impl TestClient {
644 async fn new(ns: impl Into<String>) -> Self {
645 let (client, meta_ctx) = mocks::mock_client_with_memstore().await;
647 Self {
648 ns: ns.into(),
649 client,
650 meta_ctx,
651 }
652 }
653
654 fn key(&self, name: &str) -> Vec<u8> {
655 format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
656 }
657
658 async fn gen_data(&self) {
659 for i in 0..10 {
660 let req = PutRequest::new()
661 .with_key(self.key(&format!("key-{i}")))
662 .with_value(format!("{}-{}", "value", i).into_bytes())
663 .with_prev_kv();
664 let res = self.client.put(req).await;
665 let _ = res.unwrap();
666 }
667 }
668
669 async fn clear_data(&self) {
670 let req =
671 DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
672 let res = self.client.delete_range(req).await;
673 let _ = res.unwrap();
674 }
675
676 #[allow(dead_code)]
677 fn kv_backend(&self) -> KvBackendRef {
678 self.meta_ctx.kv_backend.clone()
679 }
680
681 fn in_memory(&self) -> Option<ResettableKvBackendRef> {
682 self.meta_ctx.in_memory.clone()
683 }
684 }
685
686 async fn new_client(ns: impl Into<String>) -> TestClient {
687 let client = TestClient::new(ns).await;
688 client.clear_data().await;
689 client
690 }
691
692 #[tokio::test]
693 async fn test_meta_client_builder() {
694 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
695
696 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
697 .enable_heartbeat()
698 .build();
699 let _ = meta_client.heartbeat_client().unwrap();
700 assert!(meta_client.store_client().is_err());
701 meta_client.start(urls).await.unwrap();
702
703 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode).build();
704 assert!(meta_client.heartbeat_client().is_err());
705 assert!(meta_client.store_client().is_err());
706 meta_client.start(urls).await.unwrap();
707
708 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
709 .enable_store()
710 .build();
711 assert!(meta_client.heartbeat_client().is_err());
712 let _ = meta_client.store_client().unwrap();
713 meta_client.start(urls).await.unwrap();
714
715 let mut meta_client = MetaClientBuilder::new(2, Role::Datanode)
716 .enable_heartbeat()
717 .enable_store()
718 .build();
719 assert_eq!(2, meta_client.id());
720 assert_eq!(2, meta_client.id());
721 let _ = meta_client.heartbeat_client().unwrap();
722 let _ = meta_client.store_client().unwrap();
723 meta_client.start(urls).await.unwrap();
724 }
725
726 #[tokio::test]
727 async fn test_not_start_heartbeat_client() {
728 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
729 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
730 .enable_store()
731 .build();
732 meta_client.start(urls).await.unwrap();
733 let res = meta_client.ask_leader().await;
734 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
735 }
736
737 #[tokio::test]
738 async fn test_not_start_store_client() {
739 let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
740 let mut meta_client = MetaClientBuilder::new(0, Role::Datanode)
741 .enable_heartbeat()
742 .build();
743
744 meta_client.start(urls).await.unwrap();
745 let res = meta_client.put(PutRequest::default()).await;
746 assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
747 }
748
749 #[tokio::test]
750 async fn test_ask_leader() {
751 let tc = new_client("test_ask_leader").await;
752 tc.client.ask_leader().await.unwrap();
753 }
754
755 #[tokio::test]
756 async fn test_heartbeat() {
757 let tc = new_client("test_heartbeat").await;
758 let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
759 let request_sent = Arc::new(AtomicUsize::new(0));
762 let request_sent_clone = request_sent.clone();
763 let _handle = tokio::spawn(async move {
764 for _ in 0..5 {
765 let req = HeartbeatRequest {
766 peer: Some(Peer {
767 id: 1,
768 addr: "meta_client_peer".to_string(),
769 }),
770 ..Default::default()
771 };
772 sender.send(req).await.unwrap();
773 request_sent_clone.fetch_add(1, Ordering::Relaxed);
774 }
775 });
776
777 let heartbeat_count = Arc::new(AtomicUsize::new(0));
778 let heartbeat_count_clone = heartbeat_count.clone();
779 let handle = tokio::spawn(async move {
780 while let Some(_resp) = receiver.message().await.unwrap() {
781 heartbeat_count_clone.fetch_add(1, Ordering::Relaxed);
782 }
783 });
784
785 handle.await.unwrap();
786 assert_eq!(
788 request_sent.load(Ordering::Relaxed) + 1,
789 heartbeat_count.load(Ordering::Relaxed)
790 );
791 }
792
793 #[tokio::test]
794 async fn test_range_get() {
795 let tc = new_client("test_range_get").await;
796 tc.gen_data().await;
797
798 let key = tc.key("key-0");
799 let req = RangeRequest::new().with_key(key.as_slice());
800 let res = tc.client.range(req).await;
801 let mut kvs = res.unwrap().take_kvs();
802 assert_eq!(1, kvs.len());
803 let mut kv = kvs.pop().unwrap();
804 assert_eq!(key, kv.take_key());
805 assert_eq!(b"value-0".to_vec(), kv.take_value());
806 }
807
808 #[tokio::test]
809 async fn test_range_get_prefix() {
810 let tc = new_client("test_range_get_prefix").await;
811 tc.gen_data().await;
812
813 let req = RangeRequest::new().with_prefix(tc.key("key-"));
814 let res = tc.client.range(req).await;
815 let kvs = res.unwrap().take_kvs();
816 assert_eq!(10, kvs.len());
817 for (i, mut kv) in kvs.into_iter().enumerate() {
818 assert_eq!(tc.key(&format!("key-{i}")), kv.take_key());
819 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
820 }
821 }
822
823 #[tokio::test]
824 async fn test_range() {
825 let tc = new_client("test_range").await;
826 tc.gen_data().await;
827
828 let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
829 let res = tc.client.range(req).await;
830 let kvs = res.unwrap().take_kvs();
831 assert_eq!(3, kvs.len());
832 for (i, mut kv) in kvs.into_iter().enumerate() {
833 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
834 assert_eq!(
835 format!("{}-{}", "value", i + 5).into_bytes(),
836 kv.take_value()
837 );
838 }
839 }
840
841 #[tokio::test]
842 async fn test_range_keys_only() {
843 let tc = new_client("test_range_keys_only").await;
844 tc.gen_data().await;
845
846 let req = RangeRequest::new()
847 .with_range(tc.key("key-5"), tc.key("key-8"))
848 .with_keys_only();
849 let res = tc.client.range(req).await;
850 let kvs = res.unwrap().take_kvs();
851 assert_eq!(3, kvs.len());
852 for (i, mut kv) in kvs.into_iter().enumerate() {
853 assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
854 assert!(kv.take_value().is_empty());
855 }
856 }
857
858 #[tokio::test]
859 async fn test_put() {
860 let tc = new_client("test_put").await;
861
862 let req = PutRequest::new()
863 .with_key(tc.key("key"))
864 .with_value(b"value".to_vec());
865 let res = tc.client.put(req).await;
866 assert!(res.unwrap().prev_kv.is_none());
867 }
868
869 #[tokio::test]
870 async fn test_put_with_prev_kv() {
871 let tc = new_client("test_put_with_prev_kv").await;
872
873 let key = tc.key("key");
874 let req = PutRequest::new()
875 .with_key(key.as_slice())
876 .with_value(b"value".to_vec())
877 .with_prev_kv();
878 let res = tc.client.put(req).await;
879 assert!(res.unwrap().prev_kv.is_none());
880
881 let req = PutRequest::new()
882 .with_key(key.as_slice())
883 .with_value(b"value1".to_vec())
884 .with_prev_kv();
885 let res = tc.client.put(req).await;
886 let mut kv = res.unwrap().prev_kv.unwrap();
887 assert_eq!(key, kv.take_key());
888 assert_eq!(b"value".to_vec(), kv.take_value());
889 }
890
891 #[tokio::test]
892 async fn test_batch_put() {
893 let tc = new_client("test_batch_put").await;
894
895 let mut req = BatchPutRequest::new();
896 for i in 0..275 {
897 req = req.add_kv(
898 tc.key(&format!("key-{}", i)),
899 format!("value-{}", i).into_bytes(),
900 );
901 }
902
903 let res = tc.client.batch_put(req).await;
904 assert_eq!(0, res.unwrap().take_prev_kvs().len());
905
906 let req = RangeRequest::new().with_prefix(tc.key("key-"));
907 let res = tc.client.range(req).await;
908 let kvs = res.unwrap().take_kvs();
909 assert_eq!(275, kvs.len());
910 }
911
912 #[tokio::test]
913 async fn test_batch_get() {
914 let tc = new_client("test_batch_get").await;
915 tc.gen_data().await;
916
917 let mut req = BatchGetRequest::default();
918 for i in 0..256 {
919 req = req.add_key(tc.key(&format!("key-{}", i)));
920 }
921 let res = tc.client.batch_get(req).await.unwrap();
922 assert_eq!(10, res.kvs.len());
923
924 let req = BatchGetRequest::default()
925 .add_key(tc.key("key-1"))
926 .add_key(tc.key("key-999"));
927 let res = tc.client.batch_get(req).await.unwrap();
928 assert_eq!(1, res.kvs.len());
929 }
930
931 #[tokio::test]
932 async fn test_batch_put_with_prev_kv() {
933 let tc = new_client("test_batch_put_with_prev_kv").await;
934
935 let key = tc.key("key");
936 let key2 = tc.key("key2");
937 let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
938 let res = tc.client.batch_put(req).await;
939 assert_eq!(0, res.unwrap().take_prev_kvs().len());
940
941 let req = BatchPutRequest::new()
942 .add_kv(key.as_slice(), b"value-".to_vec())
943 .add_kv(key2.as_slice(), b"value2-".to_vec())
944 .with_prev_kv();
945 let res = tc.client.batch_put(req).await;
946 let mut kvs = res.unwrap().take_prev_kvs();
947 assert_eq!(1, kvs.len());
948 let mut kv = kvs.pop().unwrap();
949 assert_eq!(key, kv.take_key());
950 assert_eq!(b"value".to_vec(), kv.take_value());
951 }
952
953 #[tokio::test]
954 async fn test_compare_and_put() {
955 let tc = new_client("test_compare_and_put").await;
956
957 let key = tc.key("key");
958 let req = CompareAndPutRequest::new()
959 .with_key(key.as_slice())
960 .with_expect(b"expect".to_vec())
961 .with_value(b"value".to_vec());
962 let res = tc.client.compare_and_put(req).await;
963 assert!(!res.unwrap().is_success());
964
965 let req = CompareAndPutRequest::new()
967 .with_key(key.as_slice())
968 .with_value(b"value".to_vec());
969 let res = tc.client.compare_and_put(req).await;
970 let mut res = res.unwrap();
971 assert!(res.is_success());
972 assert!(res.take_prev_kv().is_none());
973
974 let req = CompareAndPutRequest::new()
976 .with_key(key.as_slice())
977 .with_expect(b"not_eq".to_vec())
978 .with_value(b"value2".to_vec());
979 let res = tc.client.compare_and_put(req).await;
980 let mut res = res.unwrap();
981 assert!(!res.is_success());
982 assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
983
984 let req = CompareAndPutRequest::new()
986 .with_key(key.as_slice())
987 .with_expect(b"value".to_vec())
988 .with_value(b"value2".to_vec());
989 let res = tc.client.compare_and_put(req).await;
990 let mut res = res.unwrap();
991 assert!(res.is_success());
992
993 assert!(res.take_prev_kv().is_none());
995 }
996
997 #[tokio::test]
998 async fn test_delete_with_key() {
999 let tc = new_client("test_delete_with_key").await;
1000 tc.gen_data().await;
1001
1002 let req = DeleteRangeRequest::new()
1003 .with_key(tc.key("key-0"))
1004 .with_prev_kv();
1005 let res = tc.client.delete_range(req).await;
1006 let mut res = res.unwrap();
1007 assert_eq!(1, res.deleted());
1008 let mut kvs = res.take_prev_kvs();
1009 assert_eq!(1, kvs.len());
1010 let mut kv = kvs.pop().unwrap();
1011 assert_eq!(b"value-0".to_vec(), kv.take_value());
1012 }
1013
1014 #[tokio::test]
1015 async fn test_delete_with_prefix() {
1016 let tc = new_client("test_delete_with_prefix").await;
1017 tc.gen_data().await;
1018
1019 let req = DeleteRangeRequest::new()
1020 .with_prefix(tc.key("key-"))
1021 .with_prev_kv();
1022 let res = tc.client.delete_range(req).await;
1023 let mut res = res.unwrap();
1024 assert_eq!(10, res.deleted());
1025 let kvs = res.take_prev_kvs();
1026 assert_eq!(10, kvs.len());
1027 for (i, mut kv) in kvs.into_iter().enumerate() {
1028 assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
1029 }
1030 }
1031
1032 #[tokio::test]
1033 async fn test_delete_with_range() {
1034 let tc = new_client("test_delete_with_range").await;
1035 tc.gen_data().await;
1036
1037 let req = DeleteRangeRequest::new()
1038 .with_range(tc.key("key-2"), tc.key("key-7"))
1039 .with_prev_kv();
1040 let res = tc.client.delete_range(req).await;
1041 let mut res = res.unwrap();
1042 assert_eq!(5, res.deleted());
1043 let kvs = res.take_prev_kvs();
1044 assert_eq!(5, kvs.len());
1045 for (i, mut kv) in kvs.into_iter().enumerate() {
1046 assert_eq!(
1047 format!("{}-{}", "value", i + 2).into_bytes(),
1048 kv.take_value()
1049 );
1050 }
1051 }
1052
1053 fn mock_decoder(_kv: KeyValue) -> MetaResult<()> {
1054 Ok(())
1055 }
1056
1057 #[tokio::test]
1058 async fn test_cluster_client_adaptive_range() {
1059 let tx = new_client("test_cluster_client").await;
1060 let in_memory = tx.in_memory().unwrap();
1061 let cluster_client = tx.client.cluster_client().unwrap();
1062 let mut rng = rand::rng();
1063
1064 for i in 0..10 {
1066 let data: Vec<u8> = (0..1024 * 1024).map(|_| rng.random()).collect();
1067 in_memory
1068 .put(
1069 PutRequest::new()
1070 .with_key(format!("__prefix/{i}").as_bytes())
1071 .with_value(data.clone()),
1072 )
1073 .await
1074 .unwrap();
1075 }
1076
1077 let req = RangeRequest::new().with_prefix(b"__prefix/");
1078 let stream =
1079 PaginationStream::new(Arc::new(cluster_client), req, 10, mock_decoder).into_stream();
1080
1081 let res = stream.try_collect::<Vec<_>>().await.unwrap();
1082 assert_eq!(10, res.len());
1083 }
1084}