common_meta/
sequence.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use common_telemetry::{debug, warn};
19use snafu::ensure;
20use tokio::sync::Mutex;
21
22use crate::error::{self, Result};
23use crate::kv_backend::KvBackendRef;
24use crate::rpc::store::CompareAndPutRequest;
25
26pub type SequenceRef = Arc<Sequence>;
27
28pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
29
30pub struct SequenceBuilder {
31    name: String,
32    initial: u64,
33    step: u64,
34    generator: KvBackendRef,
35    max: u64,
36}
37
38fn seq_name(name: impl AsRef<str>) -> String {
39    format!("{}-{}", SEQ_PREFIX, name.as_ref())
40}
41
42impl SequenceBuilder {
43    pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
44        Self {
45            name: seq_name(name),
46            initial: 0,
47            step: 1,
48            generator,
49            max: u64::MAX,
50        }
51    }
52
53    pub fn initial(self, initial: u64) -> Self {
54        Self { initial, ..self }
55    }
56
57    pub fn step(self, step: u64) -> Self {
58        Self { step, ..self }
59    }
60
61    pub fn max(self, max: u64) -> Self {
62        Self { max, ..self }
63    }
64
65    pub fn build(self) -> Sequence {
66        Sequence {
67            inner: Mutex::new(Inner {
68                name: self.name,
69                generator: self.generator,
70                initial: self.initial,
71                next: self.initial,
72                step: self.step,
73                range: None,
74                force_quit: 1024,
75                max: self.max,
76            }),
77        }
78    }
79}
80
81pub struct Sequence {
82    inner: Mutex<Inner>,
83}
84
85impl Sequence {
86    /// Returns the next value and increments the sequence.
87    pub async fn next(&self) -> Result<u64> {
88        let mut inner = self.inner.lock().await;
89        inner.next().await
90    }
91
92    /// Returns the range of available sequences.
93    pub async fn min_max(&self) -> Range<u64> {
94        let inner = self.inner.lock().await;
95        inner.initial..inner.max
96    }
97
98    /// Returns the current value stored in the remote storage without incrementing the sequence.
99    ///
100    /// This function always fetches the true current state from the remote storage (KV backend),
101    /// ignoring any local cache to provide the most accurate view of the sequence's remote state.
102    /// It does not consume or advance the sequence value.
103    ///
104    /// Note: Since this always queries the remote storage, it may be slower than `next()` but
105    /// provides the most accurate and up-to-date information about the sequence state.
106    pub async fn peek(&self) -> Result<u64> {
107        let inner = self.inner.lock().await;
108        inner.peek().await
109    }
110
111    /// Jumps to the given value.
112    ///
113    /// The next value must be greater than both:
114    /// 1. The current local next value
115    /// 2. The current value stored in the remote storage (KV backend)
116    ///
117    /// This ensures the sequence can only move forward and maintains consistency
118    /// across different instances accessing the same sequence.
119    pub async fn jump_to(&self, next: u64) -> Result<()> {
120        let mut inner = self.inner.lock().await;
121        inner.jump_to(next).await
122    }
123}
124
125struct Inner {
126    name: String,
127    generator: KvBackendRef,
128    // The initial(minimal) value of the sequence.
129    initial: u64,
130    // The next available sequences(if it is in the range,
131    // otherwise it need to fetch from generator again).
132    next: u64,
133    // Fetch several sequences at once: [start, start + step).
134    step: u64,
135    // The range of available sequences for the local cache.
136    range: Option<Range<u64>>,
137    // Used to avoid dead loops.
138    force_quit: usize,
139    max: u64,
140}
141
142impl Inner {
143    /// 1. returns the `next` value directly if it is in the `range` (local cache)
144    /// 2. fetch(CAS) next `range` from the `generator`
145    /// 3. jump to step 1
146    pub async fn next(&mut self) -> Result<u64> {
147        for _ in 0..self.force_quit {
148            match &self.range {
149                Some(range) => {
150                    if range.contains(&self.next) {
151                        let res = Ok(self.next);
152                        self.next += 1;
153                        debug!("sequence {} next: {}", self.name, self.next);
154                        return res;
155                    }
156                    self.range = None;
157                }
158                None => {
159                    let range = self.next_range().await?;
160                    self.next = range.start;
161                    self.range = Some(range);
162                    debug!(
163                        "sequence {} next: {}, range: {:?}",
164                        self.name, self.next, self.range
165                    );
166                }
167            }
168        }
169
170        error::NextSequenceSnafu {
171            err_msg: format!("{}.next()", &self.name),
172        }
173        .fail()
174    }
175
176    /// Returns the current value from remote storage without advancing the sequence.
177    /// If no value exists in remote storage, returns the initial value.
178    pub async fn peek(&self) -> Result<u64> {
179        let key = self.name.as_bytes();
180        let value = self.generator.get(key).await?.map(|kv| kv.value);
181        let next = if let Some(value) = value {
182            let next = self.initial.max(self.parse_sequence_value(value)?);
183            debug!("The next value of sequence {} is {}", self.name, next);
184            next
185        } else {
186            debug!(
187                "The next value of sequence {} is not set, use initial value {}",
188                self.name, self.initial
189            );
190            self.initial
191        };
192
193        Ok(next)
194    }
195
196    pub async fn next_range(&self) -> Result<Range<u64>> {
197        let key = self.name.as_bytes();
198        let mut start = self.next;
199
200        let mut expect = if start == self.initial {
201            vec![]
202        } else {
203            u64::to_le_bytes(start).to_vec()
204        };
205
206        for _ in 0..self.force_quit {
207            let step = self.step.min(self.max - start);
208
209            ensure!(
210                step > 0,
211                error::NextSequenceSnafu {
212                    err_msg: format!("next sequence exhausted, max: {}", self.max)
213                }
214            );
215
216            // No overflow: step <= self.max - start -> step + start <= self.max <= u64::MAX
217            let value = u64::to_le_bytes(start + step);
218
219            let req = CompareAndPutRequest {
220                key: key.to_vec(),
221                expect,
222                value: value.to_vec(),
223            };
224
225            let res = self.generator.compare_and_put(req).await?;
226
227            if !res.success {
228                if let Some(kv) = res.prev_kv {
229                    let v = self.parse_sequence_value(kv.value.clone())?;
230                    // If the existed value is smaller than the initial, we should start from the initial.
231                    start = v.max(self.initial);
232                    expect = kv.value;
233                } else {
234                    start = self.initial;
235                    expect = vec![];
236                }
237                continue;
238            }
239
240            return Ok(Range {
241                start,
242                end: start + step,
243            });
244        }
245
246        error::NextSequenceSnafu {
247            err_msg: format!("{}.next_range()", &self.name),
248        }
249        .fail()
250    }
251
252    /// Jumps to the given value.
253    ///
254    /// The next value must be greater than both:
255    /// 1. The current local next value (self.next)
256    /// 2. The current value stored in the remote storage (KV backend)
257    ///
258    /// This ensures the sequence can only move forward and maintains consistency
259    /// across different instances accessing the same sequence.
260    pub async fn jump_to(&mut self, next: u64) -> Result<()> {
261        let key = self.name.as_bytes();
262        let current = self.generator.get(key).await?.map(|kv| kv.value);
263
264        let curr_val = match &current {
265            Some(val) => self.initial.max(self.parse_sequence_value(val.clone())?),
266            None => self.initial,
267        };
268
269        ensure!(
270            next > curr_val,
271            error::UnexpectedSnafu {
272                err_msg: format!(
273                    "The next value {} is not greater than the current next value {}",
274                    next, curr_val
275                ),
276            }
277        );
278
279        let expect = current.unwrap_or_default();
280
281        let req = CompareAndPutRequest {
282            key: key.to_vec(),
283            expect,
284            value: u64::to_le_bytes(next).to_vec(),
285        };
286        let res = self.generator.compare_and_put(req).await?;
287        ensure!(
288            res.success,
289            error::UnexpectedSnafu {
290                err_msg: format!("Failed to reset sequence {} to {}", self.name, next),
291            }
292        );
293        warn!("Sequence {} jumped to {}", self.name, next);
294        // Reset the sequence to the initial value.
295        self.initial = next;
296        self.next = next;
297        self.range = None;
298
299        Ok(())
300    }
301
302    /// Converts a Vec<u8> to u64 with proper error handling for sequence values
303    fn parse_sequence_value(&self, value: Vec<u8>) -> Result<u64> {
304        let v: [u8; 8] = match value.try_into() {
305            Ok(a) => a,
306            Err(v) => {
307                return error::UnexpectedSequenceValueSnafu {
308                    err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
309                }
310                .fail()
311            }
312        };
313        Ok(u64::from_le_bytes(v))
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use std::any::Any;
320    use std::assert_matches::assert_matches;
321    use std::collections::HashSet;
322    use std::sync::Arc;
323
324    use itertools::{Itertools, MinMaxResult};
325    use tokio::sync::mpsc;
326
327    use super::*;
328    use crate::error::Error;
329    use crate::kv_backend::memory::MemoryKvBackend;
330    use crate::kv_backend::{KvBackend, TxnService};
331    use crate::rpc::store::{
332        BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
333        BatchPutRequest, BatchPutResponse, CompareAndPutResponse, DeleteRangeRequest,
334        DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
335    };
336
337    #[tokio::test]
338    async fn test_sequence_with_existed_value() {
339        async fn test(exist: u64, expected: Vec<u64>) {
340            let kv_backend = Arc::new(MemoryKvBackend::default());
341
342            let exist = u64::to_le_bytes(exist);
343            kv_backend
344                .put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
345                .await
346                .unwrap();
347
348            let initial = 100;
349            let seq = SequenceBuilder::new("s", kv_backend)
350                .initial(initial)
351                .build();
352
353            let mut actual = Vec::with_capacity(expected.len());
354            for _ in 0..expected.len() {
355                actual.push(seq.next().await.unwrap());
356            }
357            assert_eq!(actual, expected);
358        }
359
360        // put a value not greater than the "initial", the sequence should start from "initial"
361        test(1, vec![100, 101, 102]).await;
362        test(100, vec![100, 101, 102]).await;
363
364        // put a value greater than the "initial", the sequence should start from the put value
365        test(200, vec![200, 201, 202]).await;
366    }
367
368    #[tokio::test(flavor = "multi_thread")]
369    async fn test_sequence_with_contention() {
370        let seq = Arc::new(
371            SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
372                .initial(1024)
373                .build(),
374        );
375
376        let (tx, mut rx) = mpsc::unbounded_channel();
377        // Spawn 10 tasks to concurrently get the next sequence. Each task will get 100 sequences.
378        for _ in 0..10 {
379            tokio::spawn({
380                let seq = seq.clone();
381                let tx = tx.clone();
382                async move {
383                    for _ in 0..100 {
384                        tx.send(seq.next().await.unwrap()).unwrap()
385                    }
386                }
387            });
388        }
389
390        // Test that we get 1000 unique sequences, and start from 1024 to 2023.
391        let mut nums = HashSet::new();
392        let mut c = 0;
393        while c < 1000
394            && let Some(x) = rx.recv().await
395        {
396            nums.insert(x);
397            c += 1;
398        }
399        assert_eq!(nums.len(), 1000);
400        let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
401            unreachable!("nums has more than one elements");
402        };
403        assert_eq!(*min, 1024);
404        assert_eq!(*max, 2023);
405    }
406
407    #[tokio::test]
408    async fn test_sequence() {
409        let kv_backend = Arc::new(MemoryKvBackend::default());
410        let initial = 1024;
411        let seq = SequenceBuilder::new("test_seq", kv_backend)
412            .initial(initial)
413            .build();
414
415        for i in initial..initial + 100 {
416            assert_eq!(i, seq.next().await.unwrap());
417        }
418    }
419
420    #[tokio::test]
421    async fn test_sequence_set() {
422        let kv_backend = Arc::new(MemoryKvBackend::default());
423        let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
424            .initial(1024)
425            .step(10)
426            .build();
427        seq.jump_to(1025).await.unwrap();
428        assert_eq!(seq.next().await.unwrap(), 1025);
429        let err = seq.jump_to(1025).await.unwrap_err();
430        assert_matches!(err, Error::Unexpected { .. });
431        assert_eq!(seq.next().await.unwrap(), 1026);
432
433        seq.jump_to(1048).await.unwrap();
434        // Recreate the sequence to test the sequence is reset correctly.
435        let seq = SequenceBuilder::new("test_seq", kv_backend)
436            .initial(1024)
437            .step(10)
438            .build();
439        assert_eq!(seq.next().await.unwrap(), 1048);
440    }
441
442    #[tokio::test]
443    async fn test_sequence_out_of_range() {
444        let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default()))
445            .initial(u64::MAX - 10)
446            .step(10)
447            .build();
448
449        for _ in 0..10 {
450            let _ = seq.next().await.unwrap();
451        }
452
453        let res = seq.next().await;
454        assert!(res.is_err());
455        assert!(matches!(res.unwrap_err(), Error::NextSequence { .. }))
456    }
457
458    #[tokio::test]
459    async fn test_sequence_force_quit() {
460        struct Noop;
461
462        impl TxnService for Noop {
463            type Error = Error;
464        }
465
466        #[async_trait::async_trait]
467        impl KvBackend for Noop {
468            fn name(&self) -> &str {
469                "Noop"
470            }
471
472            fn as_any(&self) -> &dyn Any {
473                self
474            }
475
476            async fn range(&self, _: RangeRequest) -> Result<RangeResponse> {
477                unreachable!()
478            }
479
480            async fn put(&self, _: PutRequest) -> Result<PutResponse> {
481                unreachable!()
482            }
483
484            async fn batch_put(&self, _: BatchPutRequest) -> Result<BatchPutResponse> {
485                unreachable!()
486            }
487
488            async fn batch_get(&self, _: BatchGetRequest) -> Result<BatchGetResponse> {
489                unreachable!()
490            }
491
492            async fn compare_and_put(
493                &self,
494                _: CompareAndPutRequest,
495            ) -> Result<CompareAndPutResponse> {
496                Ok(CompareAndPutResponse::default())
497            }
498
499            async fn delete_range(&self, _: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
500                unreachable!()
501            }
502
503            async fn batch_delete(&self, _: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
504                unreachable!()
505            }
506        }
507
508        let seq = SequenceBuilder::new("test_seq", Arc::new(Noop)).build();
509
510        let next = seq.next().await;
511        assert!(next.is_err());
512    }
513
514    #[tokio::test]
515    async fn test_sequence_peek() {
516        common_telemetry::init_default_ut_logging();
517        let kv_backend = Arc::new(MemoryKvBackend::default());
518        let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
519            .step(10)
520            .initial(1024)
521            .build();
522        // The sequence value in the kv backend is not set, so the peek value should be the initial value.
523        assert_eq!(seq.peek().await.unwrap(), 1024);
524
525        for i in 0..11 {
526            let v = seq.next().await.unwrap();
527            assert_eq!(v, 1024 + i);
528        }
529        let seq = SequenceBuilder::new("test_seq", kv_backend)
530            .initial(1024)
531            .build();
532        // The sequence is not initialized, it will fetch the value from the kv backend.
533        assert_eq!(seq.peek().await.unwrap(), 1044);
534    }
535
536    #[tokio::test]
537    async fn test_sequence_peek_shared_storage() {
538        let kv_backend = Arc::new(MemoryKvBackend::default());
539        let shared_seq = "shared_seq";
540
541        // Create two sequence instances with the SAME name but DIFFERENT configs
542        let seq1 = SequenceBuilder::new(shared_seq, kv_backend.clone())
543            .initial(100)
544            .step(5)
545            .build();
546        let seq2 = SequenceBuilder::new(shared_seq, kv_backend.clone())
547            .initial(200) // different initial
548            .step(3) // different step
549            .build();
550
551        // Initially both return their own initial values when no remote value exists
552        assert_eq!(seq1.peek().await.unwrap(), 100);
553        assert_eq!(seq2.peek().await.unwrap(), 200);
554
555        // seq1 calls next() to allocate range and update remote storage
556        assert_eq!(seq1.next().await.unwrap(), 100);
557        // After seq1.next(), remote storage has 100 + seq1.step(5) = 105
558
559        // seq2 should now see the updated remote value through peek(), not its own initial(200)
560        assert_eq!(seq1.peek().await.unwrap(), 105);
561        assert_eq!(seq2.peek().await.unwrap(), 200); // sees seq1's update, but use its own initial(200)
562
563        // seq2 calls next(), should start from its initial(200)
564        assert_eq!(seq2.next().await.unwrap(), 200);
565        // After seq2.next(), remote storage updated to 200 + seq2.step(3) = 203
566
567        // Both should see the new remote value (seq2's step was used)
568        assert_eq!(seq1.peek().await.unwrap(), 203);
569        assert_eq!(seq2.peek().await.unwrap(), 203);
570
571        // seq1 calls next(), should start from its next(105)
572        assert_eq!(seq1.next().await.unwrap(), 101);
573        assert_eq!(seq1.next().await.unwrap(), 102);
574        assert_eq!(seq1.next().await.unwrap(), 103);
575        assert_eq!(seq1.next().await.unwrap(), 104);
576        assert_eq!(seq1.next().await.unwrap(), 203);
577        // After seq1.next(), remote storage updated to 203 + seq1.step(5) = 208
578        assert_eq!(seq1.peek().await.unwrap(), 208);
579        assert_eq!(seq2.peek().await.unwrap(), 208);
580    }
581
582    #[tokio::test]
583    async fn test_sequence_peek_initial_max_logic() {
584        let kv_backend = Arc::new(MemoryKvBackend::default());
585
586        // Manually set a small value in storage
587        let key = seq_name("test_max").into_bytes();
588        kv_backend
589            .put(
590                PutRequest::new()
591                    .with_key(key)
592                    .with_value(u64::to_le_bytes(50)),
593            )
594            .await
595            .unwrap();
596
597        // Create sequence with larger initial value
598        let seq = SequenceBuilder::new("test_max", kv_backend)
599            .initial(100) // larger than remote value (50)
600            .build();
601
602        // peek() should return max(initial, remote) = max(100, 50) = 100
603        assert_eq!(seq.peek().await.unwrap(), 100);
604
605        // next() should start from the larger initial value
606        assert_eq!(seq.next().await.unwrap(), 100);
607    }
608
609    #[tokio::test]
610    async fn test_sequence_initial_greater_than_storage() {
611        let kv_backend = Arc::new(MemoryKvBackend::default());
612
613        // Test sequence behavior when initial > storage value
614        // This verifies the max(storage, initial) logic works correctly
615
616        // Step 1: Establish a low value in storage
617        let seq1 = SequenceBuilder::new("max_test", kv_backend.clone())
618            .initial(10)
619            .step(5)
620            .build();
621        assert_eq!(seq1.next().await.unwrap(), 10); // storage: 15
622
623        // Step 2: Create sequence with much larger initial
624        let seq2 = SequenceBuilder::new("max_test", kv_backend.clone())
625            .initial(100) // much larger than storage (15)
626            .step(5)
627            .build();
628
629        // seq2 should start from max(15, 100) = 100 (its initial value)
630        assert_eq!(seq2.next().await.unwrap(), 100); // storage updated to: 105
631        assert_eq!(seq2.peek().await.unwrap(), 105);
632
633        // Step 3: Verify subsequent sequences continue from updated storage
634        let seq3 = SequenceBuilder::new("max_test", kv_backend)
635            .initial(50) // smaller than current storage (105)
636            .step(1)
637            .build();
638
639        // seq3 should use max(105, 50) = 105 (storage value)
640        assert_eq!(seq3.peek().await.unwrap(), 105);
641        assert_eq!(seq3.next().await.unwrap(), 105); // storage: 106
642
643        // This demonstrates the correct max(storage, initial) behavior:
644        // - Sequences never generate values below their initial requirement
645        // - Storage always reflects the highest allocated value
646        // - Value gaps (15-99) are acceptable to maintain minimum constraints
647    }
648}