common_meta/
state_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_error::ext::BoxedError;
17use common_procedure::error::{
18    DeletePoisonSnafu, DeleteStatesSnafu, GetPoisonSnafu, ListStateSnafu, PutPoisonSnafu,
19    PutStateSnafu, Result as ProcedureResult,
20};
21use common_procedure::store::poison_store::PoisonStore;
22use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
23use common_procedure::store::util::multiple_value_stream;
24use futures::future::try_join_all;
25use futures::StreamExt;
26use itertools::Itertools;
27use serde::{Deserialize, Serialize};
28use snafu::{ensure, OptionExt, ResultExt};
29
30use crate::error::{ProcedurePoisonConflictSnafu, Result, UnexpectedSnafu};
31use crate::key::txn_helper::TxnOpGetResponseSet;
32use crate::key::{DeserializedValueWithBytes, MetadataValue};
33use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
34use crate::kv_backend::KvBackendRef;
35use crate::range_stream::PaginationStream;
36use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest};
37use crate::rpc::KeyValue;
38
39const DELIMITER: &str = "/";
40
41const PROCEDURE_PREFIX: &str = "/__procedure__/";
42const PROCEDURE_POISON_KEY_PREFIX: &str = "/__procedure_poison/";
43
44fn with_prefix(key: &str) -> String {
45    format!("{PROCEDURE_PREFIX}{key}")
46}
47
48fn with_poison_prefix(key: &str) -> String {
49    format!("{}{}", PROCEDURE_POISON_KEY_PREFIX, key)
50}
51
52fn strip_prefix(key: &str) -> String {
53    key.trim_start_matches(PROCEDURE_PREFIX).to_string()
54}
55
56pub struct KvStateStore {
57    kv_backend: KvBackendRef,
58    // The max num of keys to be returned in a range scan request
59    // `None` stands for no limit.
60    max_num_per_range_request: Option<usize>,
61    // The max bytes of value.
62    // `None` stands for no limit.
63    max_value_size: Option<usize>,
64}
65
66impl KvStateStore {
67    pub fn new(kv_backend: KvBackendRef) -> Self {
68        Self {
69            kv_backend,
70            max_num_per_range_request: None,
71            max_value_size: None,
72        }
73    }
74
75    /// Sets the `max_value_size`. `None` stands for no limit.
76    ///
77    /// If a value is larger than the `max_value_size`,
78    /// the [`KvStateStore`] will automatically split the large value into multiple values.
79    pub fn with_max_value_size(mut self, max_value_size: Option<usize>) -> Self {
80        self.max_value_size = max_value_size;
81        self
82    }
83}
84
85fn decode_kv(kv: KeyValue) -> Result<(String, Vec<u8>)> {
86    let key = String::from_utf8_lossy(&kv.key);
87    let key = strip_prefix(&key);
88    let value = kv.value;
89
90    Ok((key, value))
91}
92
93enum SplitValue {
94    Single(Vec<u8>),
95    Multiple(Vec<Vec<u8>>),
96}
97
98fn split_value(value: Vec<u8>, max_value_size: Option<usize>) -> SplitValue {
99    if let Some(max_value_size) = max_value_size {
100        if value.len() <= max_value_size {
101            SplitValue::Single(value)
102        } else {
103            let mut values = vec![];
104            for chunk in value.into_iter().chunks(max_value_size).into_iter() {
105                values.push(chunk.collect());
106            }
107            SplitValue::Multiple(values)
108        }
109    } else {
110        SplitValue::Single(value)
111    }
112}
113
114#[async_trait]
115impl StateStore for KvStateStore {
116    async fn put(&self, key: &str, value: Vec<u8>) -> ProcedureResult<()> {
117        let split = split_value(value, self.max_value_size);
118        let key = with_prefix(key);
119        match split {
120            SplitValue::Single(value) => {
121                self.kv_backend
122                    .put(
123                        PutRequest::new()
124                            .with_key(key.to_string().into_bytes())
125                            .with_value(value),
126                    )
127                    .await
128                    .map_err(BoxedError::new)
129                    .context(PutStateSnafu { key })?;
130                Ok(())
131            }
132            SplitValue::Multiple(values) => {
133                // Note:
134                // The length of values can be up to usize::MAX.
135                // The KeySet::with_segment_suffix method uses a 10-digit number to store the segment number,
136                // which is large enough for the usize type.
137
138                // The first segment key: "0b00001111"
139                // The 2nd segment key: "0b00001111/0000000001"
140                // The 3rd segment key: "0b00001111/0000000002"
141                let operations = values
142                    .into_iter()
143                    .enumerate()
144                    .map(|(idx, value)| {
145                        let key = if idx > 0 {
146                            KeySet::with_segment_suffix(&key, idx)
147                        } else {
148                            key.to_string()
149                        };
150                        let kv_backend = self.kv_backend.clone();
151                        async move {
152                            kv_backend
153                                .put(
154                                    PutRequest::new()
155                                        .with_key(key.into_bytes())
156                                        .with_value(value),
157                                )
158                                .await
159                        }
160                    })
161                    .collect::<Vec<_>>();
162
163                try_join_all(operations)
164                    .await
165                    .map_err(BoxedError::new)
166                    .context(PutStateSnafu { key })?;
167
168                Ok(())
169            }
170        }
171    }
172
173    async fn walk_top_down(&self, path: &str) -> ProcedureResult<KeyValueStream> {
174        // extend their lifetimes to be used in the stream
175        let path = path.to_string();
176
177        let key = with_prefix(path.trim_start_matches(DELIMITER)).into_bytes();
178        let req = RangeRequest::new().with_prefix(key);
179
180        let stream = PaginationStream::new(
181            self.kv_backend.clone(),
182            req,
183            self.max_num_per_range_request.unwrap_or_default(),
184            decode_kv,
185        )
186        .into_stream();
187
188        let stream = stream.map(move |r| {
189            let path = path.clone();
190            r.map_err(BoxedError::new)
191                .with_context(|_| ListStateSnafu { path })
192        });
193
194        let stream = multiple_value_stream(Box::pin(stream));
195
196        Ok(Box::pin(stream))
197    }
198
199    async fn batch_delete(&self, keys: &[String]) -> ProcedureResult<()> {
200        let _ = self
201            .kv_backend
202            .batch_delete(BatchDeleteRequest {
203                keys: keys
204                    .iter()
205                    .map(|x| with_prefix(x).into_bytes())
206                    .collect::<Vec<_>>(),
207                ..Default::default()
208            })
209            .await
210            .map_err(BoxedError::new)
211            .with_context(|_| DeleteStatesSnafu {
212                keys: format!("{:?}", keys.to_vec()),
213            })?;
214        Ok(())
215    }
216
217    async fn delete(&self, key: &str) -> ProcedureResult<()> {
218        self.batch_delete(&[key.to_string()]).await
219    }
220}
221
222/// The value of the poison key.
223///
224/// Each poison value contains a unique token that identifies the procedure.
225/// While multiple procedures may use the same poison key (representing the same resource),
226/// each procedure will have a distinct token value to differentiate its ownership.
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct PoisonValue {
229    token: String,
230}
231
232type PoisonDecodeResult = Result<Option<DeserializedValueWithBytes<PoisonValue>>>;
233
234impl KvStateStore {
235    /// Builds a create poison transaction,
236    /// it expected the `__procedure_poison/{key}` wasn't occupied.
237    fn build_create_poison_txn(
238        &self,
239        key: &str,
240        value: &PoisonValue,
241    ) -> Result<(
242        Txn,
243        impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
244    )> {
245        let key = key.as_bytes().to_vec();
246        let value = value.try_as_raw_value()?;
247        let txn = Txn::put_if_not_exists(key.clone(), value);
248
249        Ok((
250            txn,
251            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
252        ))
253    }
254
255    /// Builds a delete poison transaction,
256    /// it expected the `__procedure_poison/{key}` was occupied.
257    fn build_delete_poison_txn(
258        &self,
259        key: &str,
260        value: PoisonValue,
261    ) -> Result<(
262        Txn,
263        impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
264    )> {
265        let key = key.as_bytes().to_vec();
266        let value = value.try_as_raw_value()?;
267
268        let txn = Txn::new()
269            .when(vec![Compare::with_value(
270                key.clone(),
271                CompareOp::Equal,
272                value,
273            )])
274            .and_then(vec![TxnOp::Delete(key.clone())])
275            .or_else(vec![TxnOp::Get(key.clone())]);
276
277        Ok((
278            txn,
279            TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
280        ))
281    }
282
283    async fn get_poison_inner(&self, key: &str) -> Result<Option<PoisonValue>> {
284        let key = with_poison_prefix(key);
285        let value = self.kv_backend.get(key.as_bytes()).await?;
286        value
287            .map(|v| PoisonValue::try_from_raw_value(&v.value))
288            .transpose()
289    }
290
291    /// Put the poison.
292    ///
293    /// If the poison is already put by other procedure, it will return an error.
294    async fn set_poison_inner(&self, key: &str, token: &str) -> Result<()> {
295        let key = with_poison_prefix(key);
296        let (txn, on_failure) = self.build_create_poison_txn(
297            &key,
298            &PoisonValue {
299                token: token.to_string(),
300            },
301        )?;
302
303        let mut resp = self.kv_backend.txn(txn).await?;
304
305        if !resp.succeeded {
306            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
307            let remote_value = on_failure(&mut set)?
308                .context(UnexpectedSnafu {
309                    err_msg: "Reads the empty poison value in comparing operation of the put consistency poison",
310                })?
311                .into_inner();
312
313            ensure!(
314                remote_value.token == token,
315                ProcedurePoisonConflictSnafu {
316                    key: &key,
317                    value: &remote_value.token,
318                }
319            );
320        }
321
322        Ok(())
323    }
324
325    /// Deletes the poison.
326    ///
327    /// If the poison is not put by the procedure, it will return an error.
328    async fn delete_poison_inner(&self, key: &str, token: &str) -> Result<()> {
329        let key = with_poison_prefix(key);
330        let (txn, on_failure) = self.build_delete_poison_txn(
331            &key,
332            PoisonValue {
333                token: token.to_string(),
334            },
335        )?;
336
337        let mut resp = self.kv_backend.txn(txn).await?;
338
339        if !resp.succeeded {
340            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
341            let remote_value = on_failure(&mut set)?;
342
343            ensure!(
344                remote_value.is_none(),
345                ProcedurePoisonConflictSnafu {
346                    key: &key,
347                    value: &remote_value.unwrap().into_inner().token,
348                }
349            );
350        }
351
352        Ok(())
353    }
354}
355
356#[async_trait]
357impl PoisonStore for KvStateStore {
358    async fn try_put_poison(&self, key: String, token: String) -> ProcedureResult<()> {
359        self.set_poison_inner(&key, &token)
360            .await
361            .map_err(BoxedError::new)
362            .context(PutPoisonSnafu { key, token })
363    }
364
365    async fn delete_poison(&self, key: String, token: String) -> ProcedureResult<()> {
366        self.delete_poison_inner(&key, &token)
367            .await
368            .map_err(BoxedError::new)
369            .context(DeletePoisonSnafu { key, token })
370    }
371
372    async fn get_poison(&self, key: &str) -> ProcedureResult<Option<String>> {
373        self.get_poison_inner(key)
374            .await
375            .map(|v| v.map(|v| v.token))
376            .map_err(BoxedError::new)
377            .context(GetPoisonSnafu { key })
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use std::assert_matches::assert_matches;
384    use std::env;
385    use std::sync::Arc;
386
387    use common_procedure::store::state_store::KeyValue;
388    use common_telemetry::info;
389    use futures::TryStreamExt;
390    use rand::{Rng, RngCore};
391    use uuid::Uuid;
392
393    use super::*;
394    use crate::error::Error;
395    use crate::kv_backend::chroot::ChrootKvBackend;
396    use crate::kv_backend::etcd::EtcdStore;
397    use crate::kv_backend::memory::MemoryKvBackend;
398
399    #[tokio::test]
400    async fn test_meta_state_store() {
401        let store = &KvStateStore {
402            kv_backend: Arc::new(MemoryKvBackend::new()),
403            max_num_per_range_request: Some(1), // for testing "more" in range
404            max_value_size: None,
405        };
406
407        let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
408            let mut data = store
409                .walk_top_down(path)
410                .await
411                .unwrap()
412                .try_collect::<Vec<_>>()
413                .await
414                .unwrap();
415            data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
416            data
417        };
418
419        let data = walk_top_down("/").await;
420        assert!(data.is_empty());
421
422        store.put("a/1", b"v1".to_vec()).await.unwrap();
423        store.put("a/2", b"v2".to_vec()).await.unwrap();
424        store.put("b/1", b"v3".to_vec()).await.unwrap();
425
426        let data = walk_top_down("/").await;
427        assert_eq!(
428            vec![
429                ("a/1".into(), b"v1".to_vec()),
430                ("a/2".into(), b"v2".to_vec()),
431                ("b/1".into(), b"v3".to_vec())
432            ],
433            data
434        );
435
436        let data = walk_top_down("a/").await;
437        assert_eq!(
438            vec![
439                ("a/1".into(), b"v1".to_vec()),
440                ("a/2".into(), b"v2".to_vec()),
441            ],
442            data
443        );
444
445        store
446            .batch_delete(&["a/2".to_string(), "b/1".to_string()])
447            .await
448            .unwrap();
449
450        let data = walk_top_down("a/").await;
451        assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
452    }
453
454    struct TestCase {
455        prefix: String,
456        key: String,
457        value: Vec<u8>,
458    }
459
460    async fn test_meta_state_store_split_value_with_size_limit(
461        kv_backend: KvBackendRef,
462        size_limit: u32,
463        num_per_range: u32,
464        max_bytes: u32,
465    ) {
466        let num_cases = rand::rng().random_range(1..=8);
467        common_telemetry::info!("num_cases: {}", num_cases);
468        let mut cases = Vec::with_capacity(num_cases);
469        for i in 0..num_cases {
470            let size = rand::rng().random_range(size_limit..=max_bytes);
471            let mut large_value = vec![0u8; size as usize];
472            rand::rng().fill_bytes(&mut large_value);
473
474            // Starts from `a`.
475            let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
476            cases.push(TestCase {
477                key: format!("{}{}.commit", prefix, Uuid::new_v4()),
478                prefix,
479                value: large_value,
480            })
481        }
482        let store = &KvStateStore {
483            kv_backend: kv_backend.clone(),
484            max_num_per_range_request: Some(num_per_range as usize), // for testing "more" in range
485            max_value_size: Some(size_limit as usize),
486        };
487        let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
488            let mut data = store
489                .walk_top_down(path)
490                .await
491                .unwrap()
492                .try_collect::<Vec<_>>()
493                .await
494                .unwrap();
495            data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
496            data
497        };
498
499        // Puts the values
500        for TestCase { key, value, .. } in &cases {
501            common_telemetry::info!("put key: {}, size: {}", key, value.len());
502            store.put(key, value.clone()).await.unwrap();
503        }
504
505        // Validates the values
506        for TestCase { prefix, key, value } in &cases {
507            let data = walk_top_down(prefix).await;
508            assert_eq!(data.len(), 1);
509            let (keyset, got) = data.into_iter().next().unwrap();
510            common_telemetry::info!("get key: {}", keyset.key());
511            let num_expected_keys = value.len().div_ceil(size_limit as usize);
512            assert_eq!(&got, value);
513            assert_eq!(keyset.key(), key);
514            assert_eq!(keyset.keys().len(), num_expected_keys);
515        }
516
517        // Deletes the values
518        for TestCase { prefix, .. } in &cases {
519            let data = walk_top_down(prefix).await;
520            let (keyset, _) = data.into_iter().next().unwrap();
521            // Deletes values
522            store.batch_delete(keyset.keys().as_slice()).await.unwrap();
523            let data = walk_top_down(prefix).await;
524            assert_eq!(data.len(), 0);
525        }
526    }
527
528    #[tokio::test]
529    async fn test_meta_state_store_split_value() {
530        let size_limit = rand::rng().random_range(128..=512);
531        let page_size = rand::rng().random_range(1..10);
532        let kv_backend = Arc::new(MemoryKvBackend::new());
533        test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
534            .await;
535    }
536
537    #[tokio::test]
538    async fn test_etcd_store_split_value() {
539        common_telemetry::init_default_ut_logging();
540        let prefix = "test_etcd_store_split_value/";
541        let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
542        let kv_backend: KvBackendRef = if endpoints.is_empty() {
543            common_telemetry::info!("Using MemoryKvBackend");
544            Arc::new(MemoryKvBackend::new())
545        } else {
546            let endpoints = endpoints
547                .split(',')
548                .map(|s| s.to_string())
549                .collect::<Vec<String>>();
550            let backend = EtcdStore::with_endpoints(endpoints, 128)
551                .await
552                .expect("malformed endpoints");
553            // Each retry requires a new isolation namespace.
554            let chroot = format!("{}{}", prefix, Uuid::new_v4());
555            info!("chroot length: {}", chroot.len());
556            Arc::new(ChrootKvBackend::new(chroot.into(), backend))
557        };
558
559        let key_size = 1024;
560        // The etcd default size limit of any requests is 1.5MiB.
561        // However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`;
562        // we don't know the exact size of the key.
563        let size_limit = 1536 * 1024 - key_size;
564        let page_size = rand::rng().random_range(1..10);
565        test_meta_state_store_split_value_with_size_limit(
566            kv_backend,
567            size_limit,
568            page_size,
569            size_limit * 10,
570        )
571        .await;
572    }
573
574    #[tokio::test]
575    async fn test_poison() {
576        let mem_kv = Arc::new(MemoryKvBackend::default());
577        let poison_manager = KvStateStore::new(mem_kv.clone());
578
579        let key = "table/1";
580
581        let token = "expected_token";
582
583        poison_manager.set_poison_inner(key, token).await.unwrap();
584
585        // Put again, should be ok.
586        poison_manager.set_poison_inner(key, token).await.unwrap();
587
588        // Delete, should be ok.
589        poison_manager
590            .delete_poison_inner(key, token)
591            .await
592            .unwrap();
593
594        // Delete again, should be ok.
595        poison_manager
596            .delete_poison_inner(key, token)
597            .await
598            .unwrap();
599    }
600
601    #[tokio::test]
602    async fn test_consistency_poison_failed() {
603        let mem_kv = Arc::new(MemoryKvBackend::default());
604        let poison_manager = KvStateStore::new(mem_kv.clone());
605
606        let key = "table/1";
607
608        let token = "expected_token";
609        let token2 = "expected_token2";
610
611        poison_manager.set_poison_inner(key, token).await.unwrap();
612
613        let err = poison_manager
614            .set_poison_inner(key, token2)
615            .await
616            .unwrap_err();
617        assert_matches!(err, Error::ProcedurePoisonConflict { .. });
618
619        let err = poison_manager
620            .delete_poison_inner(key, token2)
621            .await
622            .unwrap_err();
623        assert_matches!(err, Error::ProcedurePoisonConflict { .. });
624    }
625
626    #[test]
627    fn test_serialize_deserialize() {
628        let key = "table/1";
629        let value = PoisonValue {
630            token: "expected_token".to_string(),
631        };
632
633        let serialized_key = with_poison_prefix(key).as_bytes().to_vec();
634        let serialized_value = value.try_as_raw_value().unwrap();
635
636        let expected_key = "/__procedure_poison/table/1";
637        let expected_value = r#"{"token":"expected_token"}"#;
638
639        assert_eq!(expected_key.as_bytes(), serialized_key);
640        assert_eq!(expected_value.as_bytes(), serialized_value);
641    }
642}