1use std::ops::Range;
16use std::sync::Arc;
17
18use common_telemetry::{debug, warn};
19use snafu::ensure;
20use tokio::sync::Mutex;
21
22use crate::ddl::allocator::resource_id::ResourceIdAllocator;
23use crate::error::{self, Result};
24use crate::kv_backend::KvBackendRef;
25use crate::rpc::store::CompareAndPutRequest;
26
27pub type SequenceRef = Arc<Sequence>;
28
29pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
30
31pub struct SequenceBuilder {
32 name: String,
33 initial: u64,
34 step: u64,
35 generator: KvBackendRef,
36 max: u64,
37}
38
39fn seq_name(name: impl AsRef<str>) -> String {
40 format!("{}-{}", SEQ_PREFIX, name.as_ref())
41}
42
43impl SequenceBuilder {
44 pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
45 Self {
46 name: seq_name(name),
47 initial: 0,
48 step: 1,
49 generator,
50 max: u64::MAX,
51 }
52 }
53
54 pub fn initial(self, initial: u64) -> Self {
55 Self { initial, ..self }
56 }
57
58 pub fn step(self, step: u64) -> Self {
59 Self { step, ..self }
60 }
61
62 pub fn max(self, max: u64) -> Self {
63 Self { max, ..self }
64 }
65
66 pub fn build(self) -> Sequence {
67 Sequence {
68 inner: Mutex::new(Inner {
69 name: self.name,
70 generator: self.generator,
71 initial: self.initial,
72 next: self.initial,
73 step: self.step,
74 range: None,
75 force_quit: 1024,
76 max: self.max,
77 }),
78 }
79 }
80}
81
82pub struct Sequence {
83 inner: Mutex<Inner>,
84}
85
86#[async_trait::async_trait]
87impl ResourceIdAllocator for Sequence {
88 async fn next(&self) -> Result<u64> {
89 self.next().await
90 }
91
92 async fn peek(&self) -> Result<u64> {
93 self.peek().await
94 }
95
96 async fn jump_to(&self, next: u64) -> Result<()> {
97 self.jump_to(next).await
98 }
99
100 async fn min_max(&self) -> Range<u64> {
101 self.min_max().await
102 }
103}
104
105impl Sequence {
106 pub async fn next(&self) -> Result<u64> {
108 let mut inner = self.inner.lock().await;
109 inner.next().await
110 }
111
112 pub async fn min_max(&self) -> Range<u64> {
114 let inner = self.inner.lock().await;
115 inner.initial..inner.max
116 }
117
118 pub async fn peek(&self) -> Result<u64> {
127 let inner = self.inner.lock().await;
128 inner.peek().await
129 }
130
131 pub async fn jump_to(&self, next: u64) -> Result<()> {
140 let mut inner = self.inner.lock().await;
141 inner.jump_to(next).await
142 }
143}
144
145struct Inner {
146 name: String,
147 generator: KvBackendRef,
148 initial: u64,
150 next: u64,
153 step: u64,
155 range: Option<Range<u64>>,
157 force_quit: usize,
159 max: u64,
160}
161
162impl Inner {
163 pub async fn next(&mut self) -> Result<u64> {
167 for _ in 0..self.force_quit {
168 match &self.range {
169 Some(range) => {
170 if range.contains(&self.next) {
171 let res = Ok(self.next);
172 self.next += 1;
173 debug!("sequence {} next: {}", self.name, self.next);
174 return res;
175 }
176 self.range = None;
177 }
178 None => {
179 let range = self.next_range().await?;
180 self.next = range.start;
181 self.range = Some(range);
182 debug!(
183 "sequence {} next: {}, range: {:?}",
184 self.name, self.next, self.range
185 );
186 }
187 }
188 }
189
190 error::NextSequenceSnafu {
191 err_msg: format!("{}.next()", &self.name),
192 }
193 .fail()
194 }
195
196 pub async fn peek(&self) -> Result<u64> {
199 let key = self.name.as_bytes();
200 let value = self.generator.get(key).await?.map(|kv| kv.value);
201 let next = if let Some(value) = value {
202 let next = self.initial.max(self.parse_sequence_value(value)?);
203 debug!("The next value of sequence {} is {}", self.name, next);
204 next
205 } else {
206 debug!(
207 "The next value of sequence {} is not set, use initial value {}",
208 self.name, self.initial
209 );
210 self.initial
211 };
212
213 Ok(next)
214 }
215
216 pub async fn next_range(&self) -> Result<Range<u64>> {
217 let key = self.name.as_bytes();
218 let mut start = self.next;
219
220 let mut expect = if start == self.initial {
221 vec![]
222 } else {
223 u64::to_le_bytes(start).to_vec()
224 };
225
226 for _ in 0..self.force_quit {
227 let step = self.step.min(self.max - start);
228
229 ensure!(
230 step > 0,
231 error::NextSequenceSnafu {
232 err_msg: format!("next sequence exhausted, max: {}", self.max)
233 }
234 );
235
236 let value = u64::to_le_bytes(start + step);
238
239 let req = CompareAndPutRequest {
240 key: key.to_vec(),
241 expect,
242 value: value.to_vec(),
243 };
244
245 let res = self.generator.compare_and_put(req).await?;
246
247 if !res.success {
248 if let Some(kv) = res.prev_kv {
249 let v = self.parse_sequence_value(kv.value.clone())?;
250 start = v.max(self.initial);
252 expect = kv.value;
253 } else {
254 start = self.initial;
255 expect = vec![];
256 }
257 continue;
258 }
259
260 return Ok(Range {
261 start,
262 end: start + step,
263 });
264 }
265
266 error::NextSequenceSnafu {
267 err_msg: format!("{}.next_range()", &self.name),
268 }
269 .fail()
270 }
271
272 pub async fn jump_to(&mut self, next: u64) -> Result<()> {
281 let key = self.name.as_bytes();
282 let current = self.generator.get(key).await?.map(|kv| kv.value);
283
284 let curr_val = match ¤t {
285 Some(val) => self.initial.max(self.parse_sequence_value(val.clone())?),
286 None => self.initial,
287 };
288
289 ensure!(
290 next > curr_val,
291 error::UnexpectedSnafu {
292 err_msg: format!(
293 "The next value {} is not greater than the current next value {}",
294 next, curr_val
295 ),
296 }
297 );
298
299 let expect = current.unwrap_or_default();
300
301 let req = CompareAndPutRequest {
302 key: key.to_vec(),
303 expect,
304 value: u64::to_le_bytes(next).to_vec(),
305 };
306 let res = self.generator.compare_and_put(req).await?;
307 ensure!(
308 res.success,
309 error::UnexpectedSnafu {
310 err_msg: format!("Failed to reset sequence {} to {}", self.name, next),
311 }
312 );
313 warn!("Sequence {} jumped to {}", self.name, next);
314 self.initial = next;
316 self.next = next;
317 self.range = None;
318
319 Ok(())
320 }
321
322 fn parse_sequence_value(&self, value: Vec<u8>) -> Result<u64> {
324 let v: [u8; 8] = match value.try_into() {
325 Ok(a) => a,
326 Err(v) => {
327 return error::UnexpectedSequenceValueSnafu {
328 err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
329 }
330 .fail();
331 }
332 };
333 Ok(u64::from_le_bytes(v))
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use std::any::Any;
340 use std::assert_matches::assert_matches;
341 use std::collections::HashSet;
342 use std::sync::Arc;
343
344 use itertools::{Itertools, MinMaxResult};
345 use tokio::sync::mpsc;
346
347 use super::*;
348 use crate::error::Error;
349 use crate::kv_backend::memory::MemoryKvBackend;
350 use crate::kv_backend::{KvBackend, TxnService};
351 use crate::rpc::store::{
352 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
353 BatchPutRequest, BatchPutResponse, CompareAndPutResponse, DeleteRangeRequest,
354 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
355 };
356
357 #[tokio::test]
358 async fn test_sequence_with_existed_value() {
359 async fn test(exist: u64, expected: Vec<u64>) {
360 let kv_backend = Arc::new(MemoryKvBackend::default());
361
362 let exist = u64::to_le_bytes(exist);
363 kv_backend
364 .put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
365 .await
366 .unwrap();
367
368 let initial = 100;
369 let seq = SequenceBuilder::new("s", kv_backend)
370 .initial(initial)
371 .build();
372
373 let mut actual = Vec::with_capacity(expected.len());
374 for _ in 0..expected.len() {
375 actual.push(seq.next().await.unwrap());
376 }
377 assert_eq!(actual, expected);
378 }
379
380 test(1, vec![100, 101, 102]).await;
382 test(100, vec![100, 101, 102]).await;
383
384 test(200, vec![200, 201, 202]).await;
386 }
387
388 #[tokio::test(flavor = "multi_thread")]
389 async fn test_sequence_with_contention() {
390 let seq = Arc::new(
391 SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
392 .initial(1024)
393 .build(),
394 );
395
396 let (tx, mut rx) = mpsc::unbounded_channel();
397 for _ in 0..10 {
399 tokio::spawn({
400 let seq = seq.clone();
401 let tx = tx.clone();
402 async move {
403 for _ in 0..100 {
404 tx.send(seq.next().await.unwrap()).unwrap()
405 }
406 }
407 });
408 }
409
410 let mut nums = HashSet::new();
412 let mut c = 0;
413 while c < 1000
414 && let Some(x) = rx.recv().await
415 {
416 nums.insert(x);
417 c += 1;
418 }
419 assert_eq!(nums.len(), 1000);
420 let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
421 unreachable!("nums has more than one elements");
422 };
423 assert_eq!(*min, 1024);
424 assert_eq!(*max, 2023);
425 }
426
427 #[tokio::test]
428 async fn test_sequence() {
429 let kv_backend = Arc::new(MemoryKvBackend::default());
430 let initial = 1024;
431 let seq = SequenceBuilder::new("test_seq", kv_backend)
432 .initial(initial)
433 .build();
434
435 for i in initial..initial + 100 {
436 assert_eq!(i, seq.next().await.unwrap());
437 }
438 }
439
440 #[tokio::test]
441 async fn test_sequence_set() {
442 let kv_backend = Arc::new(MemoryKvBackend::default());
443 let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
444 .initial(1024)
445 .step(10)
446 .build();
447 seq.jump_to(1025).await.unwrap();
448 assert_eq!(seq.next().await.unwrap(), 1025);
449 let err = seq.jump_to(1025).await.unwrap_err();
450 assert_matches!(err, Error::Unexpected { .. });
451 assert_eq!(seq.next().await.unwrap(), 1026);
452
453 seq.jump_to(1048).await.unwrap();
454 let seq = SequenceBuilder::new("test_seq", kv_backend)
456 .initial(1024)
457 .step(10)
458 .build();
459 assert_eq!(seq.next().await.unwrap(), 1048);
460 }
461
462 #[tokio::test]
463 async fn test_sequence_out_of_range() {
464 let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default()))
465 .initial(u64::MAX - 10)
466 .step(10)
467 .build();
468
469 for _ in 0..10 {
470 let _ = seq.next().await.unwrap();
471 }
472
473 let res = seq.next().await;
474 assert!(res.is_err());
475 assert!(matches!(res.unwrap_err(), Error::NextSequence { .. }))
476 }
477
478 #[tokio::test]
479 async fn test_sequence_force_quit() {
480 struct Noop;
481
482 impl TxnService for Noop {
483 type Error = Error;
484 }
485
486 #[async_trait::async_trait]
487 impl KvBackend for Noop {
488 fn name(&self) -> &str {
489 "Noop"
490 }
491
492 fn as_any(&self) -> &dyn Any {
493 self
494 }
495
496 async fn range(&self, _: RangeRequest) -> Result<RangeResponse> {
497 unreachable!()
498 }
499
500 async fn put(&self, _: PutRequest) -> Result<PutResponse> {
501 unreachable!()
502 }
503
504 async fn batch_put(&self, _: BatchPutRequest) -> Result<BatchPutResponse> {
505 unreachable!()
506 }
507
508 async fn batch_get(&self, _: BatchGetRequest) -> Result<BatchGetResponse> {
509 unreachable!()
510 }
511
512 async fn compare_and_put(
513 &self,
514 _: CompareAndPutRequest,
515 ) -> Result<CompareAndPutResponse> {
516 Ok(CompareAndPutResponse::default())
517 }
518
519 async fn delete_range(&self, _: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
520 unreachable!()
521 }
522
523 async fn batch_delete(&self, _: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
524 unreachable!()
525 }
526 }
527
528 let seq = SequenceBuilder::new("test_seq", Arc::new(Noop)).build();
529
530 let next = seq.next().await;
531 assert!(next.is_err());
532 }
533
534 #[tokio::test]
535 async fn test_sequence_peek() {
536 common_telemetry::init_default_ut_logging();
537 let kv_backend = Arc::new(MemoryKvBackend::default());
538 let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
539 .step(10)
540 .initial(1024)
541 .build();
542 assert_eq!(seq.peek().await.unwrap(), 1024);
544
545 for i in 0..11 {
546 let v = seq.next().await.unwrap();
547 assert_eq!(v, 1024 + i);
548 }
549 let seq = SequenceBuilder::new("test_seq", kv_backend)
550 .initial(1024)
551 .build();
552 assert_eq!(seq.peek().await.unwrap(), 1044);
554 }
555
556 #[tokio::test]
557 async fn test_sequence_peek_shared_storage() {
558 let kv_backend = Arc::new(MemoryKvBackend::default());
559 let shared_seq = "shared_seq";
560
561 let seq1 = SequenceBuilder::new(shared_seq, kv_backend.clone())
563 .initial(100)
564 .step(5)
565 .build();
566 let seq2 = SequenceBuilder::new(shared_seq, kv_backend.clone())
567 .initial(200) .step(3) .build();
570
571 assert_eq!(seq1.peek().await.unwrap(), 100);
573 assert_eq!(seq2.peek().await.unwrap(), 200);
574
575 assert_eq!(seq1.next().await.unwrap(), 100);
577 assert_eq!(seq1.peek().await.unwrap(), 105);
581 assert_eq!(seq2.peek().await.unwrap(), 200); assert_eq!(seq2.next().await.unwrap(), 200);
585 assert_eq!(seq1.peek().await.unwrap(), 203);
589 assert_eq!(seq2.peek().await.unwrap(), 203);
590
591 assert_eq!(seq1.next().await.unwrap(), 101);
593 assert_eq!(seq1.next().await.unwrap(), 102);
594 assert_eq!(seq1.next().await.unwrap(), 103);
595 assert_eq!(seq1.next().await.unwrap(), 104);
596 assert_eq!(seq1.next().await.unwrap(), 203);
597 assert_eq!(seq1.peek().await.unwrap(), 208);
599 assert_eq!(seq2.peek().await.unwrap(), 208);
600 }
601
602 #[tokio::test]
603 async fn test_sequence_peek_initial_max_logic() {
604 let kv_backend = Arc::new(MemoryKvBackend::default());
605
606 let key = seq_name("test_max").into_bytes();
608 kv_backend
609 .put(
610 PutRequest::new()
611 .with_key(key)
612 .with_value(u64::to_le_bytes(50)),
613 )
614 .await
615 .unwrap();
616
617 let seq = SequenceBuilder::new("test_max", kv_backend)
619 .initial(100) .build();
621
622 assert_eq!(seq.peek().await.unwrap(), 100);
624
625 assert_eq!(seq.next().await.unwrap(), 100);
627 }
628
629 #[tokio::test]
630 async fn test_sequence_initial_greater_than_storage() {
631 let kv_backend = Arc::new(MemoryKvBackend::default());
632
633 let seq1 = SequenceBuilder::new("max_test", kv_backend.clone())
638 .initial(10)
639 .step(5)
640 .build();
641 assert_eq!(seq1.next().await.unwrap(), 10); let seq2 = SequenceBuilder::new("max_test", kv_backend.clone())
645 .initial(100) .step(5)
647 .build();
648
649 assert_eq!(seq2.next().await.unwrap(), 100); assert_eq!(seq2.peek().await.unwrap(), 105);
652
653 let seq3 = SequenceBuilder::new("max_test", kv_backend)
655 .initial(50) .step(1)
657 .build();
658
659 assert_eq!(seq3.peek().await.unwrap(), 105);
661 assert_eq!(seq3.next().await.unwrap(), 105); }
668}