common_meta/
sequence.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use common_telemetry::{debug, warn};
19use snafu::ensure;
20use tokio::sync::Mutex;
21
22use crate::ddl::allocator::resource_id::ResourceIdAllocator;
23use crate::error::{self, Result};
24use crate::kv_backend::KvBackendRef;
25use crate::rpc::store::CompareAndPutRequest;
26
27pub type SequenceRef = Arc<Sequence>;
28
29pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
30
31pub struct SequenceBuilder {
32    name: String,
33    initial: u64,
34    step: u64,
35    generator: KvBackendRef,
36    max: u64,
37}
38
39fn seq_name(name: impl AsRef<str>) -> String {
40    format!("{}-{}", SEQ_PREFIX, name.as_ref())
41}
42
43impl SequenceBuilder {
44    pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
45        Self {
46            name: seq_name(name),
47            initial: 0,
48            step: 1,
49            generator,
50            max: u64::MAX,
51        }
52    }
53
54    pub fn initial(self, initial: u64) -> Self {
55        Self { initial, ..self }
56    }
57
58    pub fn step(self, step: u64) -> Self {
59        Self { step, ..self }
60    }
61
62    pub fn max(self, max: u64) -> Self {
63        Self { max, ..self }
64    }
65
66    pub fn build(self) -> Sequence {
67        Sequence {
68            inner: Mutex::new(Inner {
69                name: self.name,
70                generator: self.generator,
71                initial: self.initial,
72                next: self.initial,
73                step: self.step,
74                range: None,
75                force_quit: 1024,
76                max: self.max,
77            }),
78        }
79    }
80}
81
82pub struct Sequence {
83    inner: Mutex<Inner>,
84}
85
86#[async_trait::async_trait]
87impl ResourceIdAllocator for Sequence {
88    async fn next(&self) -> Result<u64> {
89        self.next().await
90    }
91
92    async fn peek(&self) -> Result<u64> {
93        self.peek().await
94    }
95
96    async fn jump_to(&self, next: u64) -> Result<()> {
97        self.jump_to(next).await
98    }
99
100    async fn min_max(&self) -> Range<u64> {
101        self.min_max().await
102    }
103}
104
105impl Sequence {
106    /// Returns the next value and increments the sequence.
107    pub async fn next(&self) -> Result<u64> {
108        let mut inner = self.inner.lock().await;
109        inner.next().await
110    }
111
112    /// Returns the range of available sequences.
113    pub async fn min_max(&self) -> Range<u64> {
114        let inner = self.inner.lock().await;
115        inner.initial..inner.max
116    }
117
118    /// Returns the current value stored in the remote storage without incrementing the sequence.
119    ///
120    /// This function always fetches the true current state from the remote storage (KV backend),
121    /// ignoring any local cache to provide the most accurate view of the sequence's remote state.
122    /// It does not consume or advance the sequence value.
123    ///
124    /// Note: Since this always queries the remote storage, it may be slower than `next()` but
125    /// provides the most accurate and up-to-date information about the sequence state.
126    pub async fn peek(&self) -> Result<u64> {
127        let inner = self.inner.lock().await;
128        inner.peek().await
129    }
130
131    /// Jumps to the given value.
132    ///
133    /// The next value must be greater than both:
134    /// 1. The current local next value
135    /// 2. The current value stored in the remote storage (KV backend)
136    ///
137    /// This ensures the sequence can only move forward and maintains consistency
138    /// across different instances accessing the same sequence.
139    pub async fn jump_to(&self, next: u64) -> Result<()> {
140        let mut inner = self.inner.lock().await;
141        inner.jump_to(next).await
142    }
143}
144
145struct Inner {
146    name: String,
147    generator: KvBackendRef,
148    // The initial(minimal) value of the sequence.
149    initial: u64,
150    // The next available sequences(if it is in the range,
151    // otherwise it need to fetch from generator again).
152    next: u64,
153    // Fetch several sequences at once: [start, start + step).
154    step: u64,
155    // The range of available sequences for the local cache.
156    range: Option<Range<u64>>,
157    // Used to avoid dead loops.
158    force_quit: usize,
159    max: u64,
160}
161
162impl Inner {
163    /// 1. returns the `next` value directly if it is in the `range` (local cache)
164    /// 2. fetch(CAS) next `range` from the `generator`
165    /// 3. jump to step 1
166    pub async fn next(&mut self) -> Result<u64> {
167        for _ in 0..self.force_quit {
168            match &self.range {
169                Some(range) => {
170                    if range.contains(&self.next) {
171                        let res = Ok(self.next);
172                        self.next += 1;
173                        debug!("sequence {} next: {}", self.name, self.next);
174                        return res;
175                    }
176                    self.range = None;
177                }
178                None => {
179                    let range = self.next_range().await?;
180                    self.next = range.start;
181                    self.range = Some(range);
182                    debug!(
183                        "sequence {} next: {}, range: {:?}",
184                        self.name, self.next, self.range
185                    );
186                }
187            }
188        }
189
190        error::NextSequenceSnafu {
191            err_msg: format!("{}.next()", &self.name),
192        }
193        .fail()
194    }
195
196    /// Returns the current value from remote storage without advancing the sequence.
197    /// If no value exists in remote storage, returns the initial value.
198    pub async fn peek(&self) -> Result<u64> {
199        let key = self.name.as_bytes();
200        let value = self.generator.get(key).await?.map(|kv| kv.value);
201        let next = if let Some(value) = value {
202            let next = self.initial.max(self.parse_sequence_value(value)?);
203            debug!("The next value of sequence {} is {}", self.name, next);
204            next
205        } else {
206            debug!(
207                "The next value of sequence {} is not set, use initial value {}",
208                self.name, self.initial
209            );
210            self.initial
211        };
212
213        Ok(next)
214    }
215
216    pub async fn next_range(&self) -> Result<Range<u64>> {
217        let key = self.name.as_bytes();
218        let mut start = self.next;
219
220        let mut expect = if start == self.initial {
221            vec![]
222        } else {
223            u64::to_le_bytes(start).to_vec()
224        };
225
226        for _ in 0..self.force_quit {
227            let step = self.step.min(self.max - start);
228
229            ensure!(
230                step > 0,
231                error::NextSequenceSnafu {
232                    err_msg: format!("next sequence exhausted, max: {}", self.max)
233                }
234            );
235
236            // No overflow: step <= self.max - start -> step + start <= self.max <= u64::MAX
237            let value = u64::to_le_bytes(start + step);
238
239            let req = CompareAndPutRequest {
240                key: key.to_vec(),
241                expect,
242                value: value.to_vec(),
243            };
244
245            let res = self.generator.compare_and_put(req).await?;
246
247            if !res.success {
248                if let Some(kv) = res.prev_kv {
249                    let v = self.parse_sequence_value(kv.value.clone())?;
250                    // If the existed value is smaller than the initial, we should start from the initial.
251                    start = v.max(self.initial);
252                    expect = kv.value;
253                } else {
254                    start = self.initial;
255                    expect = vec![];
256                }
257                continue;
258            }
259
260            return Ok(Range {
261                start,
262                end: start + step,
263            });
264        }
265
266        error::NextSequenceSnafu {
267            err_msg: format!("{}.next_range()", &self.name),
268        }
269        .fail()
270    }
271
272    /// Jumps to the given value.
273    ///
274    /// The next value must be greater than both:
275    /// 1. The current local next value (self.next)
276    /// 2. The current value stored in the remote storage (KV backend)
277    ///
278    /// This ensures the sequence can only move forward and maintains consistency
279    /// across different instances accessing the same sequence.
280    pub async fn jump_to(&mut self, next: u64) -> Result<()> {
281        let key = self.name.as_bytes();
282        let current = self.generator.get(key).await?.map(|kv| kv.value);
283
284        let curr_val = match &current {
285            Some(val) => self.initial.max(self.parse_sequence_value(val.clone())?),
286            None => self.initial,
287        };
288
289        ensure!(
290            next > curr_val,
291            error::UnexpectedSnafu {
292                err_msg: format!(
293                    "The next value {} is not greater than the current next value {}",
294                    next, curr_val
295                ),
296            }
297        );
298
299        let expect = current.unwrap_or_default();
300
301        let req = CompareAndPutRequest {
302            key: key.to_vec(),
303            expect,
304            value: u64::to_le_bytes(next).to_vec(),
305        };
306        let res = self.generator.compare_and_put(req).await?;
307        ensure!(
308            res.success,
309            error::UnexpectedSnafu {
310                err_msg: format!("Failed to reset sequence {} to {}", self.name, next),
311            }
312        );
313        warn!("Sequence {} jumped to {}", self.name, next);
314        // Reset the sequence to the initial value.
315        self.initial = next;
316        self.next = next;
317        self.range = None;
318
319        Ok(())
320    }
321
322    /// Converts a Vec<u8> to u64 with proper error handling for sequence values
323    fn parse_sequence_value(&self, value: Vec<u8>) -> Result<u64> {
324        let v: [u8; 8] = match value.try_into() {
325            Ok(a) => a,
326            Err(v) => {
327                return error::UnexpectedSequenceValueSnafu {
328                    err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
329                }
330                .fail();
331            }
332        };
333        Ok(u64::from_le_bytes(v))
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use std::any::Any;
340    use std::assert_matches::assert_matches;
341    use std::collections::HashSet;
342    use std::sync::Arc;
343
344    use itertools::{Itertools, MinMaxResult};
345    use tokio::sync::mpsc;
346
347    use super::*;
348    use crate::error::Error;
349    use crate::kv_backend::memory::MemoryKvBackend;
350    use crate::kv_backend::{KvBackend, TxnService};
351    use crate::rpc::store::{
352        BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
353        BatchPutRequest, BatchPutResponse, CompareAndPutResponse, DeleteRangeRequest,
354        DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
355    };
356
357    #[tokio::test]
358    async fn test_sequence_with_existed_value() {
359        async fn test(exist: u64, expected: Vec<u64>) {
360            let kv_backend = Arc::new(MemoryKvBackend::default());
361
362            let exist = u64::to_le_bytes(exist);
363            kv_backend
364                .put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
365                .await
366                .unwrap();
367
368            let initial = 100;
369            let seq = SequenceBuilder::new("s", kv_backend)
370                .initial(initial)
371                .build();
372
373            let mut actual = Vec::with_capacity(expected.len());
374            for _ in 0..expected.len() {
375                actual.push(seq.next().await.unwrap());
376            }
377            assert_eq!(actual, expected);
378        }
379
380        // put a value not greater than the "initial", the sequence should start from "initial"
381        test(1, vec![100, 101, 102]).await;
382        test(100, vec![100, 101, 102]).await;
383
384        // put a value greater than the "initial", the sequence should start from the put value
385        test(200, vec![200, 201, 202]).await;
386    }
387
388    #[tokio::test(flavor = "multi_thread")]
389    async fn test_sequence_with_contention() {
390        let seq = Arc::new(
391            SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
392                .initial(1024)
393                .build(),
394        );
395
396        let (tx, mut rx) = mpsc::unbounded_channel();
397        // Spawn 10 tasks to concurrently get the next sequence. Each task will get 100 sequences.
398        for _ in 0..10 {
399            tokio::spawn({
400                let seq = seq.clone();
401                let tx = tx.clone();
402                async move {
403                    for _ in 0..100 {
404                        tx.send(seq.next().await.unwrap()).unwrap()
405                    }
406                }
407            });
408        }
409
410        // Test that we get 1000 unique sequences, and start from 1024 to 2023.
411        let mut nums = HashSet::new();
412        let mut c = 0;
413        while c < 1000
414            && let Some(x) = rx.recv().await
415        {
416            nums.insert(x);
417            c += 1;
418        }
419        assert_eq!(nums.len(), 1000);
420        let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
421            unreachable!("nums has more than one elements");
422        };
423        assert_eq!(*min, 1024);
424        assert_eq!(*max, 2023);
425    }
426
427    #[tokio::test]
428    async fn test_sequence() {
429        let kv_backend = Arc::new(MemoryKvBackend::default());
430        let initial = 1024;
431        let seq = SequenceBuilder::new("test_seq", kv_backend)
432            .initial(initial)
433            .build();
434
435        for i in initial..initial + 100 {
436            assert_eq!(i, seq.next().await.unwrap());
437        }
438    }
439
440    #[tokio::test]
441    async fn test_sequence_set() {
442        let kv_backend = Arc::new(MemoryKvBackend::default());
443        let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
444            .initial(1024)
445            .step(10)
446            .build();
447        seq.jump_to(1025).await.unwrap();
448        assert_eq!(seq.next().await.unwrap(), 1025);
449        let err = seq.jump_to(1025).await.unwrap_err();
450        assert_matches!(err, Error::Unexpected { .. });
451        assert_eq!(seq.next().await.unwrap(), 1026);
452
453        seq.jump_to(1048).await.unwrap();
454        // Recreate the sequence to test the sequence is reset correctly.
455        let seq = SequenceBuilder::new("test_seq", kv_backend)
456            .initial(1024)
457            .step(10)
458            .build();
459        assert_eq!(seq.next().await.unwrap(), 1048);
460    }
461
462    #[tokio::test]
463    async fn test_sequence_out_of_range() {
464        let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default()))
465            .initial(u64::MAX - 10)
466            .step(10)
467            .build();
468
469        for _ in 0..10 {
470            let _ = seq.next().await.unwrap();
471        }
472
473        let res = seq.next().await;
474        assert!(res.is_err());
475        assert!(matches!(res.unwrap_err(), Error::NextSequence { .. }))
476    }
477
478    #[tokio::test]
479    async fn test_sequence_force_quit() {
480        struct Noop;
481
482        impl TxnService for Noop {
483            type Error = Error;
484        }
485
486        #[async_trait::async_trait]
487        impl KvBackend for Noop {
488            fn name(&self) -> &str {
489                "Noop"
490            }
491
492            fn as_any(&self) -> &dyn Any {
493                self
494            }
495
496            async fn range(&self, _: RangeRequest) -> Result<RangeResponse> {
497                unreachable!()
498            }
499
500            async fn put(&self, _: PutRequest) -> Result<PutResponse> {
501                unreachable!()
502            }
503
504            async fn batch_put(&self, _: BatchPutRequest) -> Result<BatchPutResponse> {
505                unreachable!()
506            }
507
508            async fn batch_get(&self, _: BatchGetRequest) -> Result<BatchGetResponse> {
509                unreachable!()
510            }
511
512            async fn compare_and_put(
513                &self,
514                _: CompareAndPutRequest,
515            ) -> Result<CompareAndPutResponse> {
516                Ok(CompareAndPutResponse::default())
517            }
518
519            async fn delete_range(&self, _: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
520                unreachable!()
521            }
522
523            async fn batch_delete(&self, _: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
524                unreachable!()
525            }
526        }
527
528        let seq = SequenceBuilder::new("test_seq", Arc::new(Noop)).build();
529
530        let next = seq.next().await;
531        assert!(next.is_err());
532    }
533
534    #[tokio::test]
535    async fn test_sequence_peek() {
536        common_telemetry::init_default_ut_logging();
537        let kv_backend = Arc::new(MemoryKvBackend::default());
538        let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
539            .step(10)
540            .initial(1024)
541            .build();
542        // The sequence value in the kv backend is not set, so the peek value should be the initial value.
543        assert_eq!(seq.peek().await.unwrap(), 1024);
544
545        for i in 0..11 {
546            let v = seq.next().await.unwrap();
547            assert_eq!(v, 1024 + i);
548        }
549        let seq = SequenceBuilder::new("test_seq", kv_backend)
550            .initial(1024)
551            .build();
552        // The sequence is not initialized, it will fetch the value from the kv backend.
553        assert_eq!(seq.peek().await.unwrap(), 1044);
554    }
555
556    #[tokio::test]
557    async fn test_sequence_peek_shared_storage() {
558        let kv_backend = Arc::new(MemoryKvBackend::default());
559        let shared_seq = "shared_seq";
560
561        // Create two sequence instances with the SAME name but DIFFERENT configs
562        let seq1 = SequenceBuilder::new(shared_seq, kv_backend.clone())
563            .initial(100)
564            .step(5)
565            .build();
566        let seq2 = SequenceBuilder::new(shared_seq, kv_backend.clone())
567            .initial(200) // different initial
568            .step(3) // different step
569            .build();
570
571        // Initially both return their own initial values when no remote value exists
572        assert_eq!(seq1.peek().await.unwrap(), 100);
573        assert_eq!(seq2.peek().await.unwrap(), 200);
574
575        // seq1 calls next() to allocate range and update remote storage
576        assert_eq!(seq1.next().await.unwrap(), 100);
577        // After seq1.next(), remote storage has 100 + seq1.step(5) = 105
578
579        // seq2 should now see the updated remote value through peek(), not its own initial(200)
580        assert_eq!(seq1.peek().await.unwrap(), 105);
581        assert_eq!(seq2.peek().await.unwrap(), 200); // sees seq1's update, but use its own initial(200)
582
583        // seq2 calls next(), should start from its initial(200)
584        assert_eq!(seq2.next().await.unwrap(), 200);
585        // After seq2.next(), remote storage updated to 200 + seq2.step(3) = 203
586
587        // Both should see the new remote value (seq2's step was used)
588        assert_eq!(seq1.peek().await.unwrap(), 203);
589        assert_eq!(seq2.peek().await.unwrap(), 203);
590
591        // seq1 calls next(), should start from its next(105)
592        assert_eq!(seq1.next().await.unwrap(), 101);
593        assert_eq!(seq1.next().await.unwrap(), 102);
594        assert_eq!(seq1.next().await.unwrap(), 103);
595        assert_eq!(seq1.next().await.unwrap(), 104);
596        assert_eq!(seq1.next().await.unwrap(), 203);
597        // After seq1.next(), remote storage updated to 203 + seq1.step(5) = 208
598        assert_eq!(seq1.peek().await.unwrap(), 208);
599        assert_eq!(seq2.peek().await.unwrap(), 208);
600    }
601
602    #[tokio::test]
603    async fn test_sequence_peek_initial_max_logic() {
604        let kv_backend = Arc::new(MemoryKvBackend::default());
605
606        // Manually set a small value in storage
607        let key = seq_name("test_max").into_bytes();
608        kv_backend
609            .put(
610                PutRequest::new()
611                    .with_key(key)
612                    .with_value(u64::to_le_bytes(50)),
613            )
614            .await
615            .unwrap();
616
617        // Create sequence with larger initial value
618        let seq = SequenceBuilder::new("test_max", kv_backend)
619            .initial(100) // larger than remote value (50)
620            .build();
621
622        // peek() should return max(initial, remote) = max(100, 50) = 100
623        assert_eq!(seq.peek().await.unwrap(), 100);
624
625        // next() should start from the larger initial value
626        assert_eq!(seq.next().await.unwrap(), 100);
627    }
628
629    #[tokio::test]
630    async fn test_sequence_initial_greater_than_storage() {
631        let kv_backend = Arc::new(MemoryKvBackend::default());
632
633        // Test sequence behavior when initial > storage value
634        // This verifies the max(storage, initial) logic works correctly
635
636        // Step 1: Establish a low value in storage
637        let seq1 = SequenceBuilder::new("max_test", kv_backend.clone())
638            .initial(10)
639            .step(5)
640            .build();
641        assert_eq!(seq1.next().await.unwrap(), 10); // storage: 15
642
643        // Step 2: Create sequence with much larger initial
644        let seq2 = SequenceBuilder::new("max_test", kv_backend.clone())
645            .initial(100) // much larger than storage (15)
646            .step(5)
647            .build();
648
649        // seq2 should start from max(15, 100) = 100 (its initial value)
650        assert_eq!(seq2.next().await.unwrap(), 100); // storage updated to: 105
651        assert_eq!(seq2.peek().await.unwrap(), 105);
652
653        // Step 3: Verify subsequent sequences continue from updated storage
654        let seq3 = SequenceBuilder::new("max_test", kv_backend)
655            .initial(50) // smaller than current storage (105)
656            .step(1)
657            .build();
658
659        // seq3 should use max(105, 50) = 105 (storage value)
660        assert_eq!(seq3.peek().await.unwrap(), 105);
661        assert_eq!(seq3.next().await.unwrap(), 105); // storage: 106
662
663        // This demonstrates the correct max(storage, initial) behavior:
664        // - Sequences never generate values below their initial requirement
665        // - Storage always reflects the highest allocated value
666        // - Value gaps (15-99) are acceptable to maintain minimum constraints
667    }
668}