1use async_trait::async_trait;
16use common_error::ext::BoxedError;
17use common_procedure::error::{
18 DeletePoisonSnafu, DeleteStatesSnafu, GetPoisonSnafu, ListStateSnafu, PutPoisonSnafu,
19 PutStateSnafu, Result as ProcedureResult,
20};
21use common_procedure::store::poison_store::PoisonStore;
22use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
23use common_procedure::store::util::multiple_value_stream;
24use futures::StreamExt;
25use futures::future::try_join_all;
26use itertools::Itertools;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29
30use crate::error::{ProcedurePoisonConflictSnafu, Result, UnexpectedSnafu};
31use crate::key::txn_helper::TxnOpGetResponseSet;
32use crate::key::{DeserializedValueWithBytes, MetadataValue};
33use crate::kv_backend::KvBackendRef;
34use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
35use crate::range_stream::PaginationStream;
36use crate::rpc::KeyValue;
37use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest};
38
39const DELIMITER: &str = "/";
40
41const PROCEDURE_PREFIX: &str = "/__procedure__/";
42const PROCEDURE_POISON_KEY_PREFIX: &str = "/__procedure_poison/";
43
44fn with_prefix(key: &str) -> String {
45 format!("{PROCEDURE_PREFIX}{key}")
46}
47
48fn with_poison_prefix(key: &str) -> String {
49 format!("{}{}", PROCEDURE_POISON_KEY_PREFIX, key)
50}
51
52fn strip_prefix(key: &str) -> String {
53 key.trim_start_matches(PROCEDURE_PREFIX).to_string()
54}
55
56pub struct KvStateStore {
57 kv_backend: KvBackendRef,
58 max_num_per_range_request: Option<usize>,
61 max_value_size: Option<usize>,
64}
65
66impl KvStateStore {
67 pub fn new(kv_backend: KvBackendRef) -> Self {
68 Self {
69 kv_backend,
70 max_num_per_range_request: None,
71 max_value_size: None,
72 }
73 }
74
75 pub fn with_max_value_size(mut self, max_value_size: Option<usize>) -> Self {
80 self.max_value_size = max_value_size;
81 self
82 }
83}
84
85fn decode_kv(kv: KeyValue) -> Result<(String, Vec<u8>)> {
86 let key = String::from_utf8_lossy(&kv.key);
87 let key = strip_prefix(&key);
88 let value = kv.value;
89
90 Ok((key, value))
91}
92
93enum SplitValue {
94 Single(Vec<u8>),
95 Multiple(Vec<Vec<u8>>),
96}
97
98fn split_value(value: Vec<u8>, max_value_size: Option<usize>) -> SplitValue {
99 if let Some(max_value_size) = max_value_size {
100 if value.len() <= max_value_size {
101 SplitValue::Single(value)
102 } else {
103 let mut values = vec![];
104 for chunk in value.into_iter().chunks(max_value_size).into_iter() {
105 values.push(chunk.collect());
106 }
107 SplitValue::Multiple(values)
108 }
109 } else {
110 SplitValue::Single(value)
111 }
112}
113
114#[async_trait]
115impl StateStore for KvStateStore {
116 async fn put(&self, key: &str, value: Vec<u8>) -> ProcedureResult<()> {
117 let split = split_value(value, self.max_value_size);
118 let key = with_prefix(key);
119 match split {
120 SplitValue::Single(value) => {
121 self.kv_backend
122 .put(
123 PutRequest::new()
124 .with_key(key.clone().into_bytes())
125 .with_value(value),
126 )
127 .await
128 .map_err(BoxedError::new)
129 .context(PutStateSnafu { key })?;
130 Ok(())
131 }
132 SplitValue::Multiple(values) => {
133 let operations = values
142 .into_iter()
143 .enumerate()
144 .map(|(idx, value)| {
145 let key = if idx > 0 {
146 KeySet::with_segment_suffix(&key, idx)
147 } else {
148 key.clone()
149 };
150 let kv_backend = self.kv_backend.clone();
151 async move {
152 kv_backend
153 .put(
154 PutRequest::new()
155 .with_key(key.into_bytes())
156 .with_value(value),
157 )
158 .await
159 }
160 })
161 .collect::<Vec<_>>();
162
163 try_join_all(operations)
164 .await
165 .map_err(BoxedError::new)
166 .context(PutStateSnafu { key })?;
167
168 Ok(())
169 }
170 }
171 }
172
173 async fn walk_top_down(&self, path: &str) -> ProcedureResult<KeyValueStream> {
174 let path = path.to_string();
176
177 let key = with_prefix(path.trim_start_matches(DELIMITER)).into_bytes();
178 let req = RangeRequest::new().with_prefix(key);
179
180 let stream = PaginationStream::new(
181 self.kv_backend.clone(),
182 req,
183 self.max_num_per_range_request.unwrap_or_default(),
184 decode_kv,
185 )
186 .into_stream();
187
188 let stream = stream.map(move |r| {
189 let path = path.clone();
190 r.map_err(BoxedError::new)
191 .with_context(|_| ListStateSnafu { path })
192 });
193
194 let stream = multiple_value_stream(Box::pin(stream));
195
196 Ok(Box::pin(stream))
197 }
198
199 async fn batch_delete(&self, keys: &[String]) -> ProcedureResult<()> {
200 let _ = self
201 .kv_backend
202 .batch_delete(BatchDeleteRequest {
203 keys: keys
204 .iter()
205 .map(|x| with_prefix(x).into_bytes())
206 .collect::<Vec<_>>(),
207 ..Default::default()
208 })
209 .await
210 .map_err(BoxedError::new)
211 .with_context(|_| DeleteStatesSnafu {
212 keys: format!("{:?}", keys.to_vec()),
213 })?;
214 Ok(())
215 }
216
217 async fn delete(&self, key: &str) -> ProcedureResult<()> {
218 self.batch_delete(&[key.to_string()]).await
219 }
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct PoisonValue {
229 token: String,
230}
231
232type PoisonDecodeResult = Result<Option<DeserializedValueWithBytes<PoisonValue>>>;
233
234impl KvStateStore {
235 fn build_create_poison_txn(
238 &self,
239 key: &str,
240 value: &PoisonValue,
241 ) -> Result<(
242 Txn,
243 impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult + use<>,
244 )> {
245 let key = key.as_bytes().to_vec();
246 let value = value.try_as_raw_value()?;
247 let txn = Txn::put_if_not_exists(key.clone(), value);
248
249 Ok((
250 txn,
251 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
252 ))
253 }
254
255 fn build_delete_poison_txn(
258 &self,
259 key: &str,
260 value: PoisonValue,
261 ) -> Result<(
262 Txn,
263 impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult + use<>,
264 )> {
265 let key = key.as_bytes().to_vec();
266 let value = value.try_as_raw_value()?;
267
268 let txn = Txn::new()
269 .when(vec![Compare::with_value(
270 key.clone(),
271 CompareOp::Equal,
272 value,
273 )])
274 .and_then(vec![TxnOp::Delete(key.clone())])
275 .or_else(vec![TxnOp::Get(key.clone())]);
276
277 Ok((
278 txn,
279 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
280 ))
281 }
282
283 async fn get_poison_inner(&self, key: &str) -> Result<Option<PoisonValue>> {
284 let key = with_poison_prefix(key);
285 let value = self.kv_backend.get(key.as_bytes()).await?;
286 value
287 .map(|v| PoisonValue::try_from_raw_value(&v.value))
288 .transpose()
289 }
290
291 async fn set_poison_inner(&self, key: &str, token: &str) -> Result<()> {
295 let key = with_poison_prefix(key);
296 let (txn, on_failure) = self.build_create_poison_txn(
297 &key,
298 &PoisonValue {
299 token: token.to_string(),
300 },
301 )?;
302
303 let mut resp = self.kv_backend.txn(txn).await?;
304
305 if !resp.succeeded {
306 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
307 let remote_value = on_failure(&mut set)?
308 .context(UnexpectedSnafu {
309 err_msg: "Reads the empty poison value in comparing operation of the put consistency poison",
310 })?
311 .into_inner();
312
313 ensure!(
314 remote_value.token == token,
315 ProcedurePoisonConflictSnafu {
316 key: &key,
317 value: &remote_value.token,
318 }
319 );
320 }
321
322 Ok(())
323 }
324
325 async fn delete_poison_inner(&self, key: &str, token: &str) -> Result<()> {
329 let key = with_poison_prefix(key);
330 let (txn, on_failure) = self.build_delete_poison_txn(
331 &key,
332 PoisonValue {
333 token: token.to_string(),
334 },
335 )?;
336
337 let mut resp = self.kv_backend.txn(txn).await?;
338
339 if !resp.succeeded {
340 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
341 let remote_value = on_failure(&mut set)?;
342
343 ensure!(
344 remote_value.is_none(),
345 ProcedurePoisonConflictSnafu {
346 key: &key,
347 value: &remote_value.unwrap().into_inner().token,
348 }
349 );
350 }
351
352 Ok(())
353 }
354}
355
356#[async_trait]
357impl PoisonStore for KvStateStore {
358 async fn try_put_poison(&self, key: String, token: String) -> ProcedureResult<()> {
359 self.set_poison_inner(&key, &token)
360 .await
361 .map_err(BoxedError::new)
362 .context(PutPoisonSnafu { key, token })
363 }
364
365 async fn delete_poison(&self, key: String, token: String) -> ProcedureResult<()> {
366 self.delete_poison_inner(&key, &token)
367 .await
368 .map_err(BoxedError::new)
369 .context(DeletePoisonSnafu { key, token })
370 }
371
372 async fn get_poison(&self, key: &str) -> ProcedureResult<Option<String>> {
373 self.get_poison_inner(key)
374 .await
375 .map(|v| v.map(|v| v.token))
376 .map_err(BoxedError::new)
377 .context(GetPoisonSnafu { key })
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use std::sync::Arc;
384 use std::{assert_matches, env};
385
386 use common_procedure::store::state_store::KeyValue;
387 use common_telemetry::info;
388 use futures::TryStreamExt;
389 use rand::{Rng, RngCore};
390 use uuid::Uuid;
391
392 use super::*;
393 use crate::error::Error;
394 use crate::kv_backend::chroot::ChrootKvBackend;
395 use crate::kv_backend::etcd::EtcdStore;
396 use crate::kv_backend::memory::MemoryKvBackend;
397
398 #[tokio::test]
399 async fn test_meta_state_store() {
400 let store = &KvStateStore {
401 kv_backend: Arc::new(MemoryKvBackend::new()),
402 max_num_per_range_request: Some(1), max_value_size: None,
404 };
405
406 let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
407 let mut data = store
408 .walk_top_down(path)
409 .await
410 .unwrap()
411 .try_collect::<Vec<_>>()
412 .await
413 .unwrap();
414 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
415 data
416 };
417
418 let data = walk_top_down("/").await;
419 assert!(data.is_empty());
420
421 store.put("a/1", b"v1".to_vec()).await.unwrap();
422 store.put("a/2", b"v2".to_vec()).await.unwrap();
423 store.put("b/1", b"v3".to_vec()).await.unwrap();
424
425 let data = walk_top_down("/").await;
426 assert_eq!(
427 vec![
428 ("a/1".into(), b"v1".to_vec()),
429 ("a/2".into(), b"v2".to_vec()),
430 ("b/1".into(), b"v3".to_vec())
431 ],
432 data
433 );
434
435 let data = walk_top_down("a/").await;
436 assert_eq!(
437 vec![
438 ("a/1".into(), b"v1".to_vec()),
439 ("a/2".into(), b"v2".to_vec()),
440 ],
441 data
442 );
443
444 store
445 .batch_delete(&["a/2".to_string(), "b/1".to_string()])
446 .await
447 .unwrap();
448
449 let data = walk_top_down("a/").await;
450 assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
451 }
452
453 struct TestCase {
454 prefix: String,
455 key: String,
456 value: Vec<u8>,
457 }
458
459 async fn test_meta_state_store_split_value_with_size_limit(
460 kv_backend: KvBackendRef,
461 size_limit: u32,
462 num_per_range: u32,
463 max_bytes: u32,
464 ) {
465 let num_cases = rand::rng().random_range(1..=8);
466 common_telemetry::info!("num_cases: {}", num_cases);
467 let mut cases = Vec::with_capacity(num_cases);
468 for i in 0..num_cases {
469 let size = rand::rng().random_range(size_limit..=max_bytes);
470 let mut large_value = vec![0u8; size as usize];
471 rand::rng().fill_bytes(&mut large_value);
472
473 let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
475 cases.push(TestCase {
476 key: format!("{}{}.commit", prefix, Uuid::new_v4()),
477 prefix,
478 value: large_value,
479 })
480 }
481 let store = &KvStateStore {
482 kv_backend: kv_backend.clone(),
483 max_num_per_range_request: Some(num_per_range as usize), max_value_size: Some(size_limit as usize),
485 };
486 let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
487 let mut data = store
488 .walk_top_down(path)
489 .await
490 .unwrap()
491 .try_collect::<Vec<_>>()
492 .await
493 .unwrap();
494 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
495 data
496 };
497
498 for TestCase { key, value, .. } in &cases {
500 common_telemetry::info!("put key: {}, size: {}", key, value.len());
501 store.put(key, value.clone()).await.unwrap();
502 }
503
504 for TestCase { prefix, key, value } in &cases {
506 let data = walk_top_down(prefix).await;
507 assert_eq!(data.len(), 1);
508 let (keyset, got) = data.into_iter().next().unwrap();
509 common_telemetry::info!("get key: {}", keyset.key());
510 let num_expected_keys = value.len().div_ceil(size_limit as usize);
511 assert_eq!(&got, value);
512 assert_eq!(keyset.key(), key);
513 assert_eq!(keyset.keys().len(), num_expected_keys);
514 }
515
516 for TestCase { prefix, .. } in &cases {
518 let data = walk_top_down(prefix).await;
519 let (keyset, _) = data.into_iter().next().unwrap();
520 store.batch_delete(keyset.keys().as_slice()).await.unwrap();
522 let data = walk_top_down(prefix).await;
523 assert_eq!(data.len(), 0);
524 }
525 }
526
527 #[tokio::test]
528 async fn test_meta_state_store_split_value() {
529 let size_limit = rand::rng().random_range(128..=512);
530 let page_size = rand::rng().random_range(1..10);
531 let kv_backend = Arc::new(MemoryKvBackend::new());
532 test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
533 .await;
534 }
535
536 #[tokio::test]
537 async fn test_etcd_store_split_value() {
538 common_telemetry::init_default_ut_logging();
539 let prefix = "test_etcd_store_split_value/";
540 let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
541 let kv_backend: KvBackendRef = if endpoints.is_empty() {
542 common_telemetry::info!("Using MemoryKvBackend");
543 Arc::new(MemoryKvBackend::new())
544 } else {
545 let endpoints = endpoints
546 .split(',')
547 .map(|s| s.to_string())
548 .collect::<Vec<String>>();
549 let backend = EtcdStore::with_endpoints(endpoints, 128)
550 .await
551 .expect("malformed endpoints");
552 let chroot = format!("{}{}", prefix, Uuid::new_v4());
554 info!("chroot length: {}", chroot.len());
555 Arc::new(ChrootKvBackend::new(chroot.into(), backend))
556 };
557
558 let key_size = 1024;
559 let size_limit = 1536 * 1024 - key_size;
563 let page_size = rand::rng().random_range(1..10);
564 test_meta_state_store_split_value_with_size_limit(
565 kv_backend,
566 size_limit,
567 page_size,
568 size_limit * 10,
569 )
570 .await;
571 }
572
573 #[tokio::test]
574 async fn test_poison() {
575 let mem_kv = Arc::new(MemoryKvBackend::default());
576 let poison_manager = KvStateStore::new(mem_kv.clone());
577
578 let key = "table/1";
579
580 let token = "expected_token";
581
582 poison_manager.set_poison_inner(key, token).await.unwrap();
583
584 poison_manager.set_poison_inner(key, token).await.unwrap();
586
587 poison_manager
589 .delete_poison_inner(key, token)
590 .await
591 .unwrap();
592
593 poison_manager
595 .delete_poison_inner(key, token)
596 .await
597 .unwrap();
598 }
599
600 #[tokio::test]
601 async fn test_consistency_poison_failed() {
602 let mem_kv = Arc::new(MemoryKvBackend::default());
603 let poison_manager = KvStateStore::new(mem_kv.clone());
604
605 let key = "table/1";
606
607 let token = "expected_token";
608 let token2 = "expected_token2";
609
610 poison_manager.set_poison_inner(key, token).await.unwrap();
611
612 let err = poison_manager
613 .set_poison_inner(key, token2)
614 .await
615 .unwrap_err();
616 assert_matches!(err, Error::ProcedurePoisonConflict { .. });
617
618 let err = poison_manager
619 .delete_poison_inner(key, token2)
620 .await
621 .unwrap_err();
622 assert_matches!(err, Error::ProcedurePoisonConflict { .. });
623 }
624
625 #[test]
626 fn test_serialize_deserialize() {
627 let key = "table/1";
628 let value = PoisonValue {
629 token: "expected_token".to_string(),
630 };
631
632 let serialized_key = with_poison_prefix(key).as_bytes().to_vec();
633 let serialized_value = value.try_as_raw_value().unwrap();
634
635 let expected_key = "/__procedure_poison/table/1";
636 let expected_value = r#"{"token":"expected_token"}"#;
637
638 assert_eq!(expected_key.as_bytes(), serialized_key);
639 assert_eq!(expected_value.as_bytes(), serialized_value);
640 }
641}