1use std::ops::Range;
16use std::sync::Arc;
17
18use common_telemetry::{debug, warn};
19use snafu::ensure;
20use tokio::sync::Mutex;
21
22use crate::error::{self, Result};
23use crate::kv_backend::KvBackendRef;
24use crate::rpc::store::CompareAndPutRequest;
25
26pub type SequenceRef = Arc<Sequence>;
27
28pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
29
30pub struct SequenceBuilder {
31 name: String,
32 initial: u64,
33 step: u64,
34 generator: KvBackendRef,
35 max: u64,
36}
37
38fn seq_name(name: impl AsRef<str>) -> String {
39 format!("{}-{}", SEQ_PREFIX, name.as_ref())
40}
41
42impl SequenceBuilder {
43 pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
44 Self {
45 name: seq_name(name),
46 initial: 0,
47 step: 1,
48 generator,
49 max: u64::MAX,
50 }
51 }
52
53 pub fn initial(self, initial: u64) -> Self {
54 Self { initial, ..self }
55 }
56
57 pub fn step(self, step: u64) -> Self {
58 Self { step, ..self }
59 }
60
61 pub fn max(self, max: u64) -> Self {
62 Self { max, ..self }
63 }
64
65 pub fn build(self) -> Sequence {
66 Sequence {
67 inner: Mutex::new(Inner {
68 name: self.name,
69 generator: self.generator,
70 initial: self.initial,
71 next: self.initial,
72 step: self.step,
73 range: None,
74 force_quit: 1024,
75 max: self.max,
76 }),
77 }
78 }
79}
80
81pub struct Sequence {
82 inner: Mutex<Inner>,
83}
84
85impl Sequence {
86 pub async fn next(&self) -> Result<u64> {
88 let mut inner = self.inner.lock().await;
89 inner.next().await
90 }
91
92 pub async fn min_max(&self) -> Range<u64> {
94 let inner = self.inner.lock().await;
95 inner.initial..inner.max
96 }
97
98 pub async fn peek(&self) -> Result<u64> {
107 let inner = self.inner.lock().await;
108 inner.peek().await
109 }
110
111 pub async fn jump_to(&self, next: u64) -> Result<()> {
120 let mut inner = self.inner.lock().await;
121 inner.jump_to(next).await
122 }
123}
124
125struct Inner {
126 name: String,
127 generator: KvBackendRef,
128 initial: u64,
130 next: u64,
133 step: u64,
135 range: Option<Range<u64>>,
137 force_quit: usize,
139 max: u64,
140}
141
142impl Inner {
143 pub async fn next(&mut self) -> Result<u64> {
147 for _ in 0..self.force_quit {
148 match &self.range {
149 Some(range) => {
150 if range.contains(&self.next) {
151 let res = Ok(self.next);
152 self.next += 1;
153 debug!("sequence {} next: {}", self.name, self.next);
154 return res;
155 }
156 self.range = None;
157 }
158 None => {
159 let range = self.next_range().await?;
160 self.next = range.start;
161 self.range = Some(range);
162 debug!(
163 "sequence {} next: {}, range: {:?}",
164 self.name, self.next, self.range
165 );
166 }
167 }
168 }
169
170 error::NextSequenceSnafu {
171 err_msg: format!("{}.next()", &self.name),
172 }
173 .fail()
174 }
175
176 pub async fn peek(&self) -> Result<u64> {
179 let key = self.name.as_bytes();
180 let value = self.generator.get(key).await?.map(|kv| kv.value);
181 let next = if let Some(value) = value {
182 let next = self.initial.max(self.parse_sequence_value(value)?);
183 debug!("The next value of sequence {} is {}", self.name, next);
184 next
185 } else {
186 debug!(
187 "The next value of sequence {} is not set, use initial value {}",
188 self.name, self.initial
189 );
190 self.initial
191 };
192
193 Ok(next)
194 }
195
196 pub async fn next_range(&self) -> Result<Range<u64>> {
197 let key = self.name.as_bytes();
198 let mut start = self.next;
199
200 let mut expect = if start == self.initial {
201 vec![]
202 } else {
203 u64::to_le_bytes(start).to_vec()
204 };
205
206 for _ in 0..self.force_quit {
207 let step = self.step.min(self.max - start);
208
209 ensure!(
210 step > 0,
211 error::NextSequenceSnafu {
212 err_msg: format!("next sequence exhausted, max: {}", self.max)
213 }
214 );
215
216 let value = u64::to_le_bytes(start + step);
218
219 let req = CompareAndPutRequest {
220 key: key.to_vec(),
221 expect,
222 value: value.to_vec(),
223 };
224
225 let res = self.generator.compare_and_put(req).await?;
226
227 if !res.success {
228 if let Some(kv) = res.prev_kv {
229 let v = self.parse_sequence_value(kv.value.clone())?;
230 start = v.max(self.initial);
232 expect = kv.value;
233 } else {
234 start = self.initial;
235 expect = vec![];
236 }
237 continue;
238 }
239
240 return Ok(Range {
241 start,
242 end: start + step,
243 });
244 }
245
246 error::NextSequenceSnafu {
247 err_msg: format!("{}.next_range()", &self.name),
248 }
249 .fail()
250 }
251
252 pub async fn jump_to(&mut self, next: u64) -> Result<()> {
261 let key = self.name.as_bytes();
262 let current = self.generator.get(key).await?.map(|kv| kv.value);
263
264 let curr_val = match ¤t {
265 Some(val) => self.initial.max(self.parse_sequence_value(val.clone())?),
266 None => self.initial,
267 };
268
269 ensure!(
270 next > curr_val,
271 error::UnexpectedSnafu {
272 err_msg: format!(
273 "The next value {} is not greater than the current next value {}",
274 next, curr_val
275 ),
276 }
277 );
278
279 let expect = current.unwrap_or_default();
280
281 let req = CompareAndPutRequest {
282 key: key.to_vec(),
283 expect,
284 value: u64::to_le_bytes(next).to_vec(),
285 };
286 let res = self.generator.compare_and_put(req).await?;
287 ensure!(
288 res.success,
289 error::UnexpectedSnafu {
290 err_msg: format!("Failed to reset sequence {} to {}", self.name, next),
291 }
292 );
293 warn!("Sequence {} jumped to {}", self.name, next);
294 self.initial = next;
296 self.next = next;
297 self.range = None;
298
299 Ok(())
300 }
301
302 fn parse_sequence_value(&self, value: Vec<u8>) -> Result<u64> {
304 let v: [u8; 8] = match value.try_into() {
305 Ok(a) => a,
306 Err(v) => {
307 return error::UnexpectedSequenceValueSnafu {
308 err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
309 }
310 .fail()
311 }
312 };
313 Ok(u64::from_le_bytes(v))
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use std::any::Any;
320 use std::assert_matches::assert_matches;
321 use std::collections::HashSet;
322 use std::sync::Arc;
323
324 use itertools::{Itertools, MinMaxResult};
325 use tokio::sync::mpsc;
326
327 use super::*;
328 use crate::error::Error;
329 use crate::kv_backend::memory::MemoryKvBackend;
330 use crate::kv_backend::{KvBackend, TxnService};
331 use crate::rpc::store::{
332 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
333 BatchPutRequest, BatchPutResponse, CompareAndPutResponse, DeleteRangeRequest,
334 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
335 };
336
337 #[tokio::test]
338 async fn test_sequence_with_existed_value() {
339 async fn test(exist: u64, expected: Vec<u64>) {
340 let kv_backend = Arc::new(MemoryKvBackend::default());
341
342 let exist = u64::to_le_bytes(exist);
343 kv_backend
344 .put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
345 .await
346 .unwrap();
347
348 let initial = 100;
349 let seq = SequenceBuilder::new("s", kv_backend)
350 .initial(initial)
351 .build();
352
353 let mut actual = Vec::with_capacity(expected.len());
354 for _ in 0..expected.len() {
355 actual.push(seq.next().await.unwrap());
356 }
357 assert_eq!(actual, expected);
358 }
359
360 test(1, vec![100, 101, 102]).await;
362 test(100, vec![100, 101, 102]).await;
363
364 test(200, vec![200, 201, 202]).await;
366 }
367
368 #[tokio::test(flavor = "multi_thread")]
369 async fn test_sequence_with_contention() {
370 let seq = Arc::new(
371 SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
372 .initial(1024)
373 .build(),
374 );
375
376 let (tx, mut rx) = mpsc::unbounded_channel();
377 for _ in 0..10 {
379 tokio::spawn({
380 let seq = seq.clone();
381 let tx = tx.clone();
382 async move {
383 for _ in 0..100 {
384 tx.send(seq.next().await.unwrap()).unwrap()
385 }
386 }
387 });
388 }
389
390 let mut nums = HashSet::new();
392 let mut c = 0;
393 while c < 1000
394 && let Some(x) = rx.recv().await
395 {
396 nums.insert(x);
397 c += 1;
398 }
399 assert_eq!(nums.len(), 1000);
400 let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
401 unreachable!("nums has more than one elements");
402 };
403 assert_eq!(*min, 1024);
404 assert_eq!(*max, 2023);
405 }
406
407 #[tokio::test]
408 async fn test_sequence() {
409 let kv_backend = Arc::new(MemoryKvBackend::default());
410 let initial = 1024;
411 let seq = SequenceBuilder::new("test_seq", kv_backend)
412 .initial(initial)
413 .build();
414
415 for i in initial..initial + 100 {
416 assert_eq!(i, seq.next().await.unwrap());
417 }
418 }
419
420 #[tokio::test]
421 async fn test_sequence_set() {
422 let kv_backend = Arc::new(MemoryKvBackend::default());
423 let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
424 .initial(1024)
425 .step(10)
426 .build();
427 seq.jump_to(1025).await.unwrap();
428 assert_eq!(seq.next().await.unwrap(), 1025);
429 let err = seq.jump_to(1025).await.unwrap_err();
430 assert_matches!(err, Error::Unexpected { .. });
431 assert_eq!(seq.next().await.unwrap(), 1026);
432
433 seq.jump_to(1048).await.unwrap();
434 let seq = SequenceBuilder::new("test_seq", kv_backend)
436 .initial(1024)
437 .step(10)
438 .build();
439 assert_eq!(seq.next().await.unwrap(), 1048);
440 }
441
442 #[tokio::test]
443 async fn test_sequence_out_of_range() {
444 let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default()))
445 .initial(u64::MAX - 10)
446 .step(10)
447 .build();
448
449 for _ in 0..10 {
450 let _ = seq.next().await.unwrap();
451 }
452
453 let res = seq.next().await;
454 assert!(res.is_err());
455 assert!(matches!(res.unwrap_err(), Error::NextSequence { .. }))
456 }
457
458 #[tokio::test]
459 async fn test_sequence_force_quit() {
460 struct Noop;
461
462 impl TxnService for Noop {
463 type Error = Error;
464 }
465
466 #[async_trait::async_trait]
467 impl KvBackend for Noop {
468 fn name(&self) -> &str {
469 "Noop"
470 }
471
472 fn as_any(&self) -> &dyn Any {
473 self
474 }
475
476 async fn range(&self, _: RangeRequest) -> Result<RangeResponse> {
477 unreachable!()
478 }
479
480 async fn put(&self, _: PutRequest) -> Result<PutResponse> {
481 unreachable!()
482 }
483
484 async fn batch_put(&self, _: BatchPutRequest) -> Result<BatchPutResponse> {
485 unreachable!()
486 }
487
488 async fn batch_get(&self, _: BatchGetRequest) -> Result<BatchGetResponse> {
489 unreachable!()
490 }
491
492 async fn compare_and_put(
493 &self,
494 _: CompareAndPutRequest,
495 ) -> Result<CompareAndPutResponse> {
496 Ok(CompareAndPutResponse::default())
497 }
498
499 async fn delete_range(&self, _: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
500 unreachable!()
501 }
502
503 async fn batch_delete(&self, _: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
504 unreachable!()
505 }
506 }
507
508 let seq = SequenceBuilder::new("test_seq", Arc::new(Noop)).build();
509
510 let next = seq.next().await;
511 assert!(next.is_err());
512 }
513
514 #[tokio::test]
515 async fn test_sequence_peek() {
516 common_telemetry::init_default_ut_logging();
517 let kv_backend = Arc::new(MemoryKvBackend::default());
518 let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
519 .step(10)
520 .initial(1024)
521 .build();
522 assert_eq!(seq.peek().await.unwrap(), 1024);
524
525 for i in 0..11 {
526 let v = seq.next().await.unwrap();
527 assert_eq!(v, 1024 + i);
528 }
529 let seq = SequenceBuilder::new("test_seq", kv_backend)
530 .initial(1024)
531 .build();
532 assert_eq!(seq.peek().await.unwrap(), 1044);
534 }
535
536 #[tokio::test]
537 async fn test_sequence_peek_shared_storage() {
538 let kv_backend = Arc::new(MemoryKvBackend::default());
539 let shared_seq = "shared_seq";
540
541 let seq1 = SequenceBuilder::new(shared_seq, kv_backend.clone())
543 .initial(100)
544 .step(5)
545 .build();
546 let seq2 = SequenceBuilder::new(shared_seq, kv_backend.clone())
547 .initial(200) .step(3) .build();
550
551 assert_eq!(seq1.peek().await.unwrap(), 100);
553 assert_eq!(seq2.peek().await.unwrap(), 200);
554
555 assert_eq!(seq1.next().await.unwrap(), 100);
557 assert_eq!(seq1.peek().await.unwrap(), 105);
561 assert_eq!(seq2.peek().await.unwrap(), 200); assert_eq!(seq2.next().await.unwrap(), 200);
565 assert_eq!(seq1.peek().await.unwrap(), 203);
569 assert_eq!(seq2.peek().await.unwrap(), 203);
570
571 assert_eq!(seq1.next().await.unwrap(), 101);
573 assert_eq!(seq1.next().await.unwrap(), 102);
574 assert_eq!(seq1.next().await.unwrap(), 103);
575 assert_eq!(seq1.next().await.unwrap(), 104);
576 assert_eq!(seq1.next().await.unwrap(), 203);
577 assert_eq!(seq1.peek().await.unwrap(), 208);
579 assert_eq!(seq2.peek().await.unwrap(), 208);
580 }
581
582 #[tokio::test]
583 async fn test_sequence_peek_initial_max_logic() {
584 let kv_backend = Arc::new(MemoryKvBackend::default());
585
586 let key = seq_name("test_max").into_bytes();
588 kv_backend
589 .put(
590 PutRequest::new()
591 .with_key(key)
592 .with_value(u64::to_le_bytes(50)),
593 )
594 .await
595 .unwrap();
596
597 let seq = SequenceBuilder::new("test_max", kv_backend)
599 .initial(100) .build();
601
602 assert_eq!(seq.peek().await.unwrap(), 100);
604
605 assert_eq!(seq.next().await.unwrap(), 100);
607 }
608
609 #[tokio::test]
610 async fn test_sequence_initial_greater_than_storage() {
611 let kv_backend = Arc::new(MemoryKvBackend::default());
612
613 let seq1 = SequenceBuilder::new("max_test", kv_backend.clone())
618 .initial(10)
619 .step(5)
620 .build();
621 assert_eq!(seq1.next().await.unwrap(), 10); let seq2 = SequenceBuilder::new("max_test", kv_backend.clone())
625 .initial(100) .step(5)
627 .build();
628
629 assert_eq!(seq2.next().await.unwrap(), 100); assert_eq!(seq2.peek().await.unwrap(), 105);
632
633 let seq3 = SequenceBuilder::new("max_test", kv_backend)
635 .initial(50) .step(1)
637 .build();
638
639 assert_eq!(seq3.peek().await.unwrap(), 105);
641 assert_eq!(seq3.next().await.unwrap(), 105); }
648}