1use async_trait::async_trait;
16use common_error::ext::BoxedError;
17use common_procedure::error::{
18 DeletePoisonSnafu, DeleteStatesSnafu, GetPoisonSnafu, ListStateSnafu, PutPoisonSnafu,
19 PutStateSnafu, Result as ProcedureResult,
20};
21use common_procedure::store::poison_store::PoisonStore;
22use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
23use common_procedure::store::util::multiple_value_stream;
24use futures::future::try_join_all;
25use futures::StreamExt;
26use itertools::Itertools;
27use serde::{Deserialize, Serialize};
28use snafu::{ensure, OptionExt, ResultExt};
29
30use crate::error::{ProcedurePoisonConflictSnafu, Result, UnexpectedSnafu};
31use crate::key::txn_helper::TxnOpGetResponseSet;
32use crate::key::{DeserializedValueWithBytes, MetadataValue};
33use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
34use crate::kv_backend::KvBackendRef;
35use crate::range_stream::PaginationStream;
36use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest};
37use crate::rpc::KeyValue;
38
39const DELIMITER: &str = "/";
40
41const PROCEDURE_PREFIX: &str = "/__procedure__/";
42const PROCEDURE_POISON_KEY_PREFIX: &str = "/__procedure_poison/";
43
44fn with_prefix(key: &str) -> String {
45 format!("{PROCEDURE_PREFIX}{key}")
46}
47
48fn with_poison_prefix(key: &str) -> String {
49 format!("{}{}", PROCEDURE_POISON_KEY_PREFIX, key)
50}
51
52fn strip_prefix(key: &str) -> String {
53 key.trim_start_matches(PROCEDURE_PREFIX).to_string()
54}
55
56pub struct KvStateStore {
57 kv_backend: KvBackendRef,
58 max_num_per_range_request: Option<usize>,
61 max_value_size: Option<usize>,
64}
65
66impl KvStateStore {
67 pub fn new(kv_backend: KvBackendRef) -> Self {
68 Self {
69 kv_backend,
70 max_num_per_range_request: None,
71 max_value_size: None,
72 }
73 }
74
75 pub fn with_max_value_size(mut self, max_value_size: Option<usize>) -> Self {
80 self.max_value_size = max_value_size;
81 self
82 }
83}
84
85fn decode_kv(kv: KeyValue) -> Result<(String, Vec<u8>)> {
86 let key = String::from_utf8_lossy(&kv.key);
87 let key = strip_prefix(&key);
88 let value = kv.value;
89
90 Ok((key, value))
91}
92
93enum SplitValue {
94 Single(Vec<u8>),
95 Multiple(Vec<Vec<u8>>),
96}
97
98fn split_value(value: Vec<u8>, max_value_size: Option<usize>) -> SplitValue {
99 if let Some(max_value_size) = max_value_size {
100 if value.len() <= max_value_size {
101 SplitValue::Single(value)
102 } else {
103 let mut values = vec![];
104 for chunk in value.into_iter().chunks(max_value_size).into_iter() {
105 values.push(chunk.collect());
106 }
107 SplitValue::Multiple(values)
108 }
109 } else {
110 SplitValue::Single(value)
111 }
112}
113
114#[async_trait]
115impl StateStore for KvStateStore {
116 async fn put(&self, key: &str, value: Vec<u8>) -> ProcedureResult<()> {
117 let split = split_value(value, self.max_value_size);
118 let key = with_prefix(key);
119 match split {
120 SplitValue::Single(value) => {
121 self.kv_backend
122 .put(
123 PutRequest::new()
124 .with_key(key.to_string().into_bytes())
125 .with_value(value),
126 )
127 .await
128 .map_err(BoxedError::new)
129 .context(PutStateSnafu { key })?;
130 Ok(())
131 }
132 SplitValue::Multiple(values) => {
133 let operations = values
142 .into_iter()
143 .enumerate()
144 .map(|(idx, value)| {
145 let key = if idx > 0 {
146 KeySet::with_segment_suffix(&key, idx)
147 } else {
148 key.to_string()
149 };
150 let kv_backend = self.kv_backend.clone();
151 async move {
152 kv_backend
153 .put(
154 PutRequest::new()
155 .with_key(key.into_bytes())
156 .with_value(value),
157 )
158 .await
159 }
160 })
161 .collect::<Vec<_>>();
162
163 try_join_all(operations)
164 .await
165 .map_err(BoxedError::new)
166 .context(PutStateSnafu { key })?;
167
168 Ok(())
169 }
170 }
171 }
172
173 async fn walk_top_down(&self, path: &str) -> ProcedureResult<KeyValueStream> {
174 let path = path.to_string();
176
177 let key = with_prefix(path.trim_start_matches(DELIMITER)).into_bytes();
178 let req = RangeRequest::new().with_prefix(key);
179
180 let stream = PaginationStream::new(
181 self.kv_backend.clone(),
182 req,
183 self.max_num_per_range_request.unwrap_or_default(),
184 decode_kv,
185 )
186 .into_stream();
187
188 let stream = stream.map(move |r| {
189 let path = path.clone();
190 r.map_err(BoxedError::new)
191 .with_context(|_| ListStateSnafu { path })
192 });
193
194 let stream = multiple_value_stream(Box::pin(stream));
195
196 Ok(Box::pin(stream))
197 }
198
199 async fn batch_delete(&self, keys: &[String]) -> ProcedureResult<()> {
200 let _ = self
201 .kv_backend
202 .batch_delete(BatchDeleteRequest {
203 keys: keys
204 .iter()
205 .map(|x| with_prefix(x).into_bytes())
206 .collect::<Vec<_>>(),
207 ..Default::default()
208 })
209 .await
210 .map_err(BoxedError::new)
211 .with_context(|_| DeleteStatesSnafu {
212 keys: format!("{:?}", keys.to_vec()),
213 })?;
214 Ok(())
215 }
216
217 async fn delete(&self, key: &str) -> ProcedureResult<()> {
218 self.batch_delete(&[key.to_string()]).await
219 }
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct PoisonValue {
229 token: String,
230}
231
232type PoisonDecodeResult = Result<Option<DeserializedValueWithBytes<PoisonValue>>>;
233
234impl KvStateStore {
235 fn build_create_poison_txn(
238 &self,
239 key: &str,
240 value: &PoisonValue,
241 ) -> Result<(
242 Txn,
243 impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
244 )> {
245 let key = key.as_bytes().to_vec();
246 let value = value.try_as_raw_value()?;
247 let txn = Txn::put_if_not_exists(key.clone(), value);
248
249 Ok((
250 txn,
251 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
252 ))
253 }
254
255 fn build_delete_poison_txn(
258 &self,
259 key: &str,
260 value: PoisonValue,
261 ) -> Result<(
262 Txn,
263 impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
264 )> {
265 let key = key.as_bytes().to_vec();
266 let value = value.try_as_raw_value()?;
267
268 let txn = Txn::new()
269 .when(vec![Compare::with_value(
270 key.clone(),
271 CompareOp::Equal,
272 value,
273 )])
274 .and_then(vec![TxnOp::Delete(key.clone())])
275 .or_else(vec![TxnOp::Get(key.clone())]);
276
277 Ok((
278 txn,
279 TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
280 ))
281 }
282
283 async fn get_poison_inner(&self, key: &str) -> Result<Option<PoisonValue>> {
284 let key = with_poison_prefix(key);
285 let value = self.kv_backend.get(key.as_bytes()).await?;
286 value
287 .map(|v| PoisonValue::try_from_raw_value(&v.value))
288 .transpose()
289 }
290
291 async fn set_poison_inner(&self, key: &str, token: &str) -> Result<()> {
295 let key = with_poison_prefix(key);
296 let (txn, on_failure) = self.build_create_poison_txn(
297 &key,
298 &PoisonValue {
299 token: token.to_string(),
300 },
301 )?;
302
303 let mut resp = self.kv_backend.txn(txn).await?;
304
305 if !resp.succeeded {
306 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
307 let remote_value = on_failure(&mut set)?
308 .context(UnexpectedSnafu {
309 err_msg: "Reads the empty poison value in comparing operation of the put consistency poison",
310 })?
311 .into_inner();
312
313 ensure!(
314 remote_value.token == token,
315 ProcedurePoisonConflictSnafu {
316 key: &key,
317 value: &remote_value.token,
318 }
319 );
320 }
321
322 Ok(())
323 }
324
325 async fn delete_poison_inner(&self, key: &str, token: &str) -> Result<()> {
329 let key = with_poison_prefix(key);
330 let (txn, on_failure) = self.build_delete_poison_txn(
331 &key,
332 PoisonValue {
333 token: token.to_string(),
334 },
335 )?;
336
337 let mut resp = self.kv_backend.txn(txn).await?;
338
339 if !resp.succeeded {
340 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
341 let remote_value = on_failure(&mut set)?;
342
343 ensure!(
344 remote_value.is_none(),
345 ProcedurePoisonConflictSnafu {
346 key: &key,
347 value: &remote_value.unwrap().into_inner().token,
348 }
349 );
350 }
351
352 Ok(())
353 }
354}
355
356#[async_trait]
357impl PoisonStore for KvStateStore {
358 async fn try_put_poison(&self, key: String, token: String) -> ProcedureResult<()> {
359 self.set_poison_inner(&key, &token)
360 .await
361 .map_err(BoxedError::new)
362 .context(PutPoisonSnafu { key, token })
363 }
364
365 async fn delete_poison(&self, key: String, token: String) -> ProcedureResult<()> {
366 self.delete_poison_inner(&key, &token)
367 .await
368 .map_err(BoxedError::new)
369 .context(DeletePoisonSnafu { key, token })
370 }
371
372 async fn get_poison(&self, key: &str) -> ProcedureResult<Option<String>> {
373 self.get_poison_inner(key)
374 .await
375 .map(|v| v.map(|v| v.token))
376 .map_err(BoxedError::new)
377 .context(GetPoisonSnafu { key })
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use std::assert_matches::assert_matches;
384 use std::env;
385 use std::sync::Arc;
386
387 use common_procedure::store::state_store::KeyValue;
388 use common_telemetry::info;
389 use futures::TryStreamExt;
390 use rand::{Rng, RngCore};
391 use uuid::Uuid;
392
393 use super::*;
394 use crate::error::Error;
395 use crate::kv_backend::chroot::ChrootKvBackend;
396 use crate::kv_backend::etcd::EtcdStore;
397 use crate::kv_backend::memory::MemoryKvBackend;
398
399 #[tokio::test]
400 async fn test_meta_state_store() {
401 let store = &KvStateStore {
402 kv_backend: Arc::new(MemoryKvBackend::new()),
403 max_num_per_range_request: Some(1), max_value_size: None,
405 };
406
407 let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
408 let mut data = store
409 .walk_top_down(path)
410 .await
411 .unwrap()
412 .try_collect::<Vec<_>>()
413 .await
414 .unwrap();
415 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
416 data
417 };
418
419 let data = walk_top_down("/").await;
420 assert!(data.is_empty());
421
422 store.put("a/1", b"v1".to_vec()).await.unwrap();
423 store.put("a/2", b"v2".to_vec()).await.unwrap();
424 store.put("b/1", b"v3".to_vec()).await.unwrap();
425
426 let data = walk_top_down("/").await;
427 assert_eq!(
428 vec![
429 ("a/1".into(), b"v1".to_vec()),
430 ("a/2".into(), b"v2".to_vec()),
431 ("b/1".into(), b"v3".to_vec())
432 ],
433 data
434 );
435
436 let data = walk_top_down("a/").await;
437 assert_eq!(
438 vec![
439 ("a/1".into(), b"v1".to_vec()),
440 ("a/2".into(), b"v2".to_vec()),
441 ],
442 data
443 );
444
445 store
446 .batch_delete(&["a/2".to_string(), "b/1".to_string()])
447 .await
448 .unwrap();
449
450 let data = walk_top_down("a/").await;
451 assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
452 }
453
454 struct TestCase {
455 prefix: String,
456 key: String,
457 value: Vec<u8>,
458 }
459
460 async fn test_meta_state_store_split_value_with_size_limit(
461 kv_backend: KvBackendRef,
462 size_limit: u32,
463 num_per_range: u32,
464 max_bytes: u32,
465 ) {
466 let num_cases = rand::rng().random_range(1..=8);
467 common_telemetry::info!("num_cases: {}", num_cases);
468 let mut cases = Vec::with_capacity(num_cases);
469 for i in 0..num_cases {
470 let size = rand::rng().random_range(size_limit..=max_bytes);
471 let mut large_value = vec![0u8; size as usize];
472 rand::rng().fill_bytes(&mut large_value);
473
474 let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
476 cases.push(TestCase {
477 key: format!("{}{}.commit", prefix, Uuid::new_v4()),
478 prefix,
479 value: large_value,
480 })
481 }
482 let store = &KvStateStore {
483 kv_backend: kv_backend.clone(),
484 max_num_per_range_request: Some(num_per_range as usize), max_value_size: Some(size_limit as usize),
486 };
487 let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
488 let mut data = store
489 .walk_top_down(path)
490 .await
491 .unwrap()
492 .try_collect::<Vec<_>>()
493 .await
494 .unwrap();
495 data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
496 data
497 };
498
499 for TestCase { key, value, .. } in &cases {
501 common_telemetry::info!("put key: {}, size: {}", key, value.len());
502 store.put(key, value.clone()).await.unwrap();
503 }
504
505 for TestCase { prefix, key, value } in &cases {
507 let data = walk_top_down(prefix).await;
508 assert_eq!(data.len(), 1);
509 let (keyset, got) = data.into_iter().next().unwrap();
510 common_telemetry::info!("get key: {}", keyset.key());
511 let num_expected_keys = value.len().div_ceil(size_limit as usize);
512 assert_eq!(&got, value);
513 assert_eq!(keyset.key(), key);
514 assert_eq!(keyset.keys().len(), num_expected_keys);
515 }
516
517 for TestCase { prefix, .. } in &cases {
519 let data = walk_top_down(prefix).await;
520 let (keyset, _) = data.into_iter().next().unwrap();
521 store.batch_delete(keyset.keys().as_slice()).await.unwrap();
523 let data = walk_top_down(prefix).await;
524 assert_eq!(data.len(), 0);
525 }
526 }
527
528 #[tokio::test]
529 async fn test_meta_state_store_split_value() {
530 let size_limit = rand::rng().random_range(128..=512);
531 let page_size = rand::rng().random_range(1..10);
532 let kv_backend = Arc::new(MemoryKvBackend::new());
533 test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
534 .await;
535 }
536
537 #[tokio::test]
538 async fn test_etcd_store_split_value() {
539 common_telemetry::init_default_ut_logging();
540 let prefix = "test_etcd_store_split_value/";
541 let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
542 let kv_backend: KvBackendRef = if endpoints.is_empty() {
543 common_telemetry::info!("Using MemoryKvBackend");
544 Arc::new(MemoryKvBackend::new())
545 } else {
546 let endpoints = endpoints
547 .split(',')
548 .map(|s| s.to_string())
549 .collect::<Vec<String>>();
550 let backend = EtcdStore::with_endpoints(endpoints, 128)
551 .await
552 .expect("malformed endpoints");
553 let chroot = format!("{}{}", prefix, Uuid::new_v4());
555 info!("chroot length: {}", chroot.len());
556 Arc::new(ChrootKvBackend::new(chroot.into(), backend))
557 };
558
559 let key_size = 1024;
560 let size_limit = 1536 * 1024 - key_size;
564 let page_size = rand::rng().random_range(1..10);
565 test_meta_state_store_split_value_with_size_limit(
566 kv_backend,
567 size_limit,
568 page_size,
569 size_limit * 10,
570 )
571 .await;
572 }
573
574 #[tokio::test]
575 async fn test_poison() {
576 let mem_kv = Arc::new(MemoryKvBackend::default());
577 let poison_manager = KvStateStore::new(mem_kv.clone());
578
579 let key = "table/1";
580
581 let token = "expected_token";
582
583 poison_manager.set_poison_inner(key, token).await.unwrap();
584
585 poison_manager.set_poison_inner(key, token).await.unwrap();
587
588 poison_manager
590 .delete_poison_inner(key, token)
591 .await
592 .unwrap();
593
594 poison_manager
596 .delete_poison_inner(key, token)
597 .await
598 .unwrap();
599 }
600
601 #[tokio::test]
602 async fn test_consistency_poison_failed() {
603 let mem_kv = Arc::new(MemoryKvBackend::default());
604 let poison_manager = KvStateStore::new(mem_kv.clone());
605
606 let key = "table/1";
607
608 let token = "expected_token";
609 let token2 = "expected_token2";
610
611 poison_manager.set_poison_inner(key, token).await.unwrap();
612
613 let err = poison_manager
614 .set_poison_inner(key, token2)
615 .await
616 .unwrap_err();
617 assert_matches!(err, Error::ProcedurePoisonConflict { .. });
618
619 let err = poison_manager
620 .delete_poison_inner(key, token2)
621 .await
622 .unwrap_err();
623 assert_matches!(err, Error::ProcedurePoisonConflict { .. });
624 }
625
626 #[test]
627 fn test_serialize_deserialize() {
628 let key = "table/1";
629 let value = PoisonValue {
630 token: "expected_token".to_string(),
631 };
632
633 let serialized_key = with_poison_prefix(key).as_bytes().to_vec();
634 let serialized_value = value.try_as_raw_value().unwrap();
635
636 let expected_key = "/__procedure_poison/table/1";
637 let expected_value = r#"{"token":"expected_token"}"#;
638
639 assert_eq!(expected_key.as_bytes(), serialized_key);
640 assert_eq!(expected_value.as_bytes(), serialized_value);
641 }
642}