1use std::collections::{BTreeMap, BTreeSet};
18use std::ops::Bound;
19use std::sync::Arc;
20
21use common_meta::key::flow::flow_state::FlowStat;
22use common_telemetry::trace;
23use datatypes::value::Value;
24use get_size2::GetSize;
25use smallvec::{smallvec, SmallVec};
26use tokio::sync::{mpsc, oneshot, RwLock};
27use tokio::time::Instant;
28
29use crate::error::InternalSnafu;
30use crate::expr::{EvalError, ScalarExpr};
31use crate::repr::{value_to_internal_ts, DiffRow, Duration, KeyValDiffRow, Row, Timestamp};
32
33pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
35
36pub fn get_value_heap_size(v: &Value) -> usize {
38 match v {
39 Value::Binary(bin) => bin.len(),
40 Value::String(s) => s.len(),
41 Value::List(list) => list.items().iter().map(get_value_heap_size).sum(),
42 _ => 0,
43 }
44}
45
46#[derive(Clone)]
47pub struct SizeReportSender {
48 inner: mpsc::Sender<oneshot::Sender<FlowStat>>,
49}
50
51impl SizeReportSender {
52 pub fn new() -> (Self, StateReportHandler) {
53 let (tx, rx) = mpsc::channel(1);
54 let zelf = Self { inner: tx };
55 (zelf, rx)
56 }
57
58 pub async fn query(&self, timeout: std::time::Duration) -> crate::Result<FlowStat> {
60 let (tx, rx) = oneshot::channel();
61 self.inner.send(tx).await.map_err(|_| {
62 InternalSnafu {
63 reason: "failed to send size report request due to receiver dropped",
64 }
65 .build()
66 })?;
67 let timeout = tokio::time::timeout(timeout, rx);
68 timeout
69 .await
70 .map_err(|_elapsed| {
71 InternalSnafu {
72 reason: "failed to receive size report after one second timeout",
73 }
74 .build()
75 })?
76 .map_err(|_| {
77 InternalSnafu {
78 reason: "failed to receive size report due to sender dropped",
79 }
80 .build()
81 })
82 }
83}
84
85pub type StateReportHandler = mpsc::Receiver<oneshot::Sender<FlowStat>>;
87
88pub type Spine = BTreeMap<Timestamp, Batch>;
91
92#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
98pub struct KeyExpiryManager {
99 event_ts_to_key: BTreeMap<Timestamp, BTreeSet<Row>>,
101
102 key_expiration_duration: Option<Duration>,
104
105 event_timestamp_from_row: Option<ScalarExpr>,
107}
108
109impl GetSize for KeyExpiryManager {
110 fn get_heap_size(&self) -> usize {
111 let row_size = if let Some(row_size) = &self
112 .event_ts_to_key
113 .first_key_value()
114 .map(|(_, v)| v.first().get_heap_size())
115 {
116 *row_size
117 } else {
118 0
119 };
120 self.event_ts_to_key
121 .values()
122 .map(|v| v.len() * row_size + std::mem::size_of::<i64>())
123 .sum::<usize>()
124 }
125}
126
127impl KeyExpiryManager {
128 pub fn new(
129 key_expiration_duration: Option<Duration>,
130 event_timestamp_from_row: Option<ScalarExpr>,
131 ) -> Self {
132 Self {
133 event_ts_to_key: Default::default(),
134 key_expiration_duration,
135 event_timestamp_from_row,
136 }
137 }
138
139 pub fn extract_event_ts(&self, row: &Row) -> Result<Option<Timestamp>, EvalError> {
143 let ts = self
144 .event_timestamp_from_row
145 .as_ref()
146 .map(|e| e.eval(&row.inner))
147 .transpose()?
148 .map(value_to_internal_ts)
149 .transpose()?;
150 Ok(ts)
151 }
152
153 pub fn compute_expiration_timestamp(&self, now: Timestamp) -> Option<Timestamp> {
155 self.key_expiration_duration.map(|d| now - d)
156 }
157
158 pub fn get_expire_duration_and_update_event_ts(
163 &mut self,
164 now: Timestamp,
165 row: &Row,
166 ) -> Result<Option<Duration>, EvalError> {
167 let Some(event_ts) = self.extract_event_ts(row)? else {
168 return Ok(None);
169 };
170
171 self.event_ts_to_key
172 .entry(event_ts)
173 .or_default()
174 .insert(row.clone());
175
176 if let Some(expire_time) = self.compute_expiration_timestamp(now) {
177 if expire_time > event_ts {
178 return Ok(Some(expire_time - event_ts));
180 }
181 }
182
183 Ok(None)
184 }
185
186 pub fn get_expire_duration(
190 &self,
191 now: Timestamp,
192 row: &Row,
193 ) -> Result<Option<Duration>, EvalError> {
194 let Some(event_ts) = self.extract_event_ts(row)? else {
195 return Ok(None);
196 };
197
198 if let Some(expire_time) = self.compute_expiration_timestamp(now) {
199 if expire_time > event_ts {
200 return Ok(Some(expire_time - event_ts));
202 }
203 }
204
205 Ok(None)
206 }
207
208 pub fn remove_expired_keys(&mut self, now: Timestamp) -> Option<impl Iterator<Item = Row>> {
211 let expire_time = self.compute_expiration_timestamp(now)?;
212
213 let mut before = self.event_ts_to_key.split_off(&expire_time);
214 std::mem::swap(&mut before, &mut self.event_ts_to_key);
215
216 Some(before.into_iter().flat_map(|(_ts, keys)| keys.into_iter()))
217 }
218}
219
220#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
233pub struct Arrangement {
234 name: Vec<String>,
237
238 spine: Spine,
254
255 full_arrangement: bool,
260
261 is_written: bool,
268
269 expire_state: Option<KeyExpiryManager>,
271
272 last_compaction_time: Option<Timestamp>,
274
275 estimated_size: usize,
277 last_size_update: Instant,
278 size_update_interval: tokio::time::Duration,
279}
280
281impl Arrangement {
282 fn compute_size(&self) -> usize {
283 self.spine
284 .values()
285 .map(|v| {
286 let per_entry_size = v
287 .first_key_value()
288 .map(|(k, v)| {
289 k.get_heap_size()
290 + v.len() * v.first().map(|r| r.get_heap_size()).unwrap_or(0)
291 })
292 .unwrap_or(0);
293 std::mem::size_of::<i64>() + v.len() * per_entry_size
294 })
295 .sum::<usize>()
296 + self.expire_state.get_heap_size()
297 + self.name.get_heap_size()
298 }
299
300 fn update_and_fetch_size(&mut self) -> usize {
301 if self.last_size_update.elapsed() > self.size_update_interval {
302 self.estimated_size = self.compute_size();
303 self.last_size_update = Instant::now();
304 }
305 self.estimated_size
306 }
307}
308
309impl GetSize for Arrangement {
310 fn get_heap_size(&self) -> usize {
311 self.estimated_size
312 }
313}
314
315impl Default for Arrangement {
316 fn default() -> Self {
317 Self {
318 spine: Default::default(),
319 full_arrangement: false,
320 is_written: false,
321 expire_state: None,
322 last_compaction_time: None,
323 name: Vec::new(),
324 estimated_size: 0,
325 last_size_update: Instant::now(),
326 size_update_interval: tokio::time::Duration::from_secs(3),
327 }
328 }
329}
330
331impl Arrangement {
332 pub fn new_with_name(name: Vec<String>) -> Self {
333 Self {
334 spine: Default::default(),
335 full_arrangement: false,
336 is_written: false,
337 expire_state: None,
338 last_compaction_time: None,
339 name,
340 estimated_size: 0,
341 last_size_update: Instant::now(),
342 size_update_interval: tokio::time::Duration::from_secs(3),
343 }
344 }
345
346 pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> {
347 self.expire_state.as_ref()
348 }
349
350 pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
351 self.expire_state = Some(expire_state);
352 }
353
354 pub fn apply_updates(
358 &mut self,
359 now: Timestamp,
360 updates: Vec<KeyValDiffRow>,
361 ) -> Result<Option<Duration>, EvalError> {
362 self.is_written = true;
363
364 let mut max_expired_by: Option<Duration> = None;
365
366 for ((key, val), update_ts, diff) in updates {
367 if let Some(s) = &mut self.expire_state {
369 if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? {
370 max_expired_by = max_expired_by.max(Some(expired_by));
371 trace!(
372 "Expired key: {:?}, expired by: {:?} with time being now={}",
373 key,
374 expired_by,
375 now
376 );
377 continue;
378 }
379 }
380
381 if self
383 .spine
384 .last_key_value()
385 .map(|(highest_ts, _)| *highest_ts < update_ts)
386 .unwrap_or(true)
387 {
388 self.spine.insert(update_ts, Default::default());
389 }
390
391 let (_, batch) = self
393 .spine
394 .range_mut(update_ts..)
395 .next()
396 .expect("Previous insert should have created the batch");
397
398 let key_updates = batch.entry(key).or_default();
399 key_updates.push((val, update_ts, diff));
400
401 key_updates.sort_by_key(|(_val, ts, _diff)| *ts);
404 }
405 self.update_and_fetch_size();
406 Ok(max_expired_by)
407 }
408
409 pub fn get_next_update_time(&self, now: &Timestamp) -> Option<Timestamp> {
411 for (_ts, batch) in self.spine.range((Bound::Excluded(now), Bound::Unbounded)) {
413 let min_ts = batch
414 .iter()
415 .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts).min())
416 .min();
417
418 if min_ts.is_some() {
419 return min_ts;
420 }
421 }
422
423 None
424 }
425
426 pub fn last_compaction_time(&self) -> Option<Timestamp> {
428 self.last_compaction_time
429 }
430
431 fn split_spine_le(&mut self, split_ts: &Timestamp) -> Spine {
433 self.split_batch_at(split_ts);
434 let mut before = self.spine.split_off(&(split_ts + 1));
435 std::mem::swap(&mut before, &mut self.spine);
436 before
437 }
438
439 fn split_batch_at(&mut self, split_ts: &Timestamp) {
441 if self.spine.contains_key(split_ts) {
445 return;
446 }
447
448 let Some((_, batch_to_split)) = self.spine.range_mut(split_ts..).next() else {
449 return; };
451
452 let mut new_batch = Batch::default();
456
457 batch_to_split.retain(|key, updates| {
458 let mut new_updates = SmallVec::default();
459
460 updates.retain(|(val, ts, diff)| {
461 if *ts <= *split_ts {
462 new_updates.push((val.clone(), *ts, *diff));
464 }
465 *ts > *split_ts
467 });
468
469 if !new_updates.is_empty() {
470 new_batch.insert(key.clone(), new_updates);
471 }
472
473 !updates.is_empty()
475 });
476
477 if !new_batch.is_empty() {
478 self.spine.insert(*split_ts, new_batch);
479 }
480 }
481
482 pub fn compact_to(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
486 let mut max_expired_by: Option<Duration> = None;
487
488 let batches_to_compact = self.split_spine_le(&now);
489 self.last_compaction_time = Some(now);
490
491 if !self.full_arrangement {
493 return Ok(None);
494 }
495
496 let mut compacting_batch = Batch::default();
498
499 for (_, batch) in batches_to_compact {
500 for (key, updates) in batch {
501 if let Some(s) = &mut self.expire_state {
503 if let Some(expired_by) =
504 s.get_expire_duration_and_update_event_ts(now, &key)?
505 {
506 max_expired_by = max_expired_by.max(Some(expired_by));
507 continue;
508 }
509 }
510
511 let mut row = compacting_batch
512 .remove(&key)
513 .and_then(|mut updates| updates.pop());
515
516 for update in updates {
517 row = compact_diff_row(row, &update);
518 }
519 if let Some(compacted_update) = row {
520 compacting_batch.insert(key, smallvec![compacted_update]);
521 }
522 }
523 }
524
525 self.spine.insert(now, compacting_batch);
527 self.update_and_fetch_size();
528 Ok(max_expired_by)
529 }
530
531 pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp> + Clone>(
533 &self,
534 range: R,
535 ) -> Vec<KeyValDiffRow> {
536 let batches = match range.end_bound() {
538 Bound::Included(t) => self.spine.range(range.clone()).chain(
539 self.spine
540 .range((Bound::Excluded(t), Bound::Unbounded))
541 .next(),
542 ),
543 Bound::Excluded(t) => self.spine.range(range.clone()).chain(
544 self.spine
545 .range((Bound::Included(t), Bound::Unbounded))
546 .next(),
547 ),
548 _ => self.spine.range(range.clone()).chain(None),
549 };
550
551 let mut res = vec![];
552 for (_, batch) in batches {
553 for (key, updates) in batch {
554 for (val, ts, diff) in updates {
555 if range.contains(ts) {
556 res.push(((key.clone(), val.clone()), *ts, *diff));
557 }
558 }
559 }
560 }
561 res
562 }
563
564 pub fn truncate_expired_keys(&mut self, now: Timestamp) {
566 if let Some(s) = &mut self.expire_state {
567 if let Some(expired_keys) = s.remove_expired_keys(now) {
568 for key in expired_keys {
569 for (_, batch) in self.spine.iter_mut() {
570 batch.remove(&key);
571 }
572 }
573 }
574 }
575 }
576
577 pub fn get(&self, now: Timestamp, key: &Row) -> Option<DiffRow> {
581 if let Some(last_compaction_time) = self.last_compaction_time()
586 && now <= last_compaction_time
587 && self.full_arrangement
588 {
589 return self
591 .spine
592 .get(&last_compaction_time)
593 .and_then(|batch| batch.get(key))
594 .and_then(|updates| updates.first().cloned());
595 }
596
597 let batches = if self.spine.contains_key(&now) {
602 self.spine.range(..=now).chain(None)
604 } else {
605 self.spine.range(..=now).chain(
607 self.spine
608 .range((Bound::Excluded(now), Bound::Unbounded))
609 .next(),
610 )
611 };
612
613 let mut final_val = None;
614 for (ts, batch) in batches {
615 if let Some(updates) = batch.get(key) {
616 if *ts <= now {
617 for update in updates {
618 final_val = compact_diff_row(final_val, update);
619 }
620 } else {
621 for update in updates.iter().filter(|(_, ts, _)| *ts <= now) {
622 final_val = compact_diff_row(final_val, update);
623 }
624 }
625 }
626 }
627 final_val
628 }
629}
630
631fn compact_diff_row(old_row: Option<DiffRow>, new_row: &DiffRow) -> Option<DiffRow> {
632 let (val, ts, diff) = new_row;
633 match (old_row, diff) {
634 (Some((row, _old_ts, old_diff)), diff) if row == *val && old_diff + diff == 0 => {
635 None
637 }
638 (Some((row, _old_ts, old_diff)), diff) if row == *val && old_diff + diff != 0 => {
639 Some((row, *ts, old_diff + *diff))
640 }
641 _ => Some((val.clone(), *ts, *diff)),
644 }
645}
646
647pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>;
649pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>;
651
652#[derive(Debug, Clone)]
654pub struct ArrangeHandler {
655 inner: Arc<RwLock<Arrangement>>,
656}
657
658impl ArrangeHandler {
659 pub fn from(arr: Arrangement) -> Self {
661 Self {
662 inner: Arc::new(RwLock::new(arr)),
663 }
664 }
665
666 pub fn write(&self) -> ArrangeWriter<'_> {
668 self.inner.blocking_write()
669 }
670
671 pub fn read(&self) -> ArrangeReader<'_> {
673 self.inner.blocking_read()
674 }
675
676 pub fn clone_future_only(&self) -> Option<Self> {
680 if self.read().is_written {
681 return None;
682 }
683 Some(Self {
684 inner: self.inner.clone(),
685 })
686 }
687
688 pub fn clone_full_arrange(&self) -> Option<Self> {
695 {
696 let zelf = self.read();
697 if !zelf.full_arrangement && zelf.is_written {
698 return None;
699 }
700 }
701
702 self.write().full_arrangement = true;
703 Some(Self {
704 inner: self.inner.clone(),
705 })
706 }
707
708 pub fn set_full_arrangement(&self, full: bool) {
709 self.write().full_arrangement = full;
710 }
711
712 pub fn is_full_arrangement(&self) -> bool {
713 self.read().full_arrangement
714 }
715}
716
717#[cfg(test)]
718mod test {
719 use std::borrow::Borrow;
720
721 use datatypes::value::Value;
722 use itertools::Itertools;
723
724 use super::*;
725
726 fn lit(v: impl Into<Value>) -> Row {
727 Row::new(vec![v.into()])
728 }
729
730 fn kv(key: impl Borrow<Row>, value: impl Borrow<Row>) -> (Row, Row) {
731 (key.borrow().clone(), value.borrow().clone())
732 }
733
734 #[test]
735 fn test_future_get() {
736 let arr = ArrangeHandler::from(Arrangement::default());
738
739 let mut arr = arr.write();
740
741 let key = lit("a");
742 let updates: Vec<KeyValDiffRow> = vec![
743 (kv(&key, lit("b")), 1 , 1 ),
744 (kv(&key, lit("c")), 2 , 1 ),
745 (kv(&key, lit("d")), 3 , 1 ),
746 ];
747
748 arr.apply_updates(0, updates).unwrap();
750
751 assert_eq!(arr.get(1, &key), Some((lit("b"), 1 , 1 )));
752 assert_eq!(arr.get(2, &key), Some((lit("c"), 2 , 1 )));
753 assert_eq!(arr.get(3, &key), Some((lit("d"), 3 , 1 )));
754 }
755
756 #[test]
757 fn only_save_future_updates() {
758 let arr = ArrangeHandler::from(Arrangement::default());
762
763 {
764 let arr1 = arr.clone_full_arrange();
765 assert!(arr1.is_some());
766 let arr2 = arr.clone_future_only();
767 assert!(arr2.is_some());
768 }
769
770 {
771 let mut arr = arr.write();
772 let updates: Vec<KeyValDiffRow> = vec![
773 (kv(lit("a"), lit("x")), 1 , 1 ),
774 (kv(lit("b"), lit("y")), 2 , 1 ),
775 (kv(lit("c"), lit("z")), 3 , 1 ),
776 ];
777 arr.apply_updates(0, updates).unwrap();
779
780 assert_eq!(
781 arr.get_updates_in_range(1..=1),
782 vec![(kv(lit("a"), lit("x")), 1 , 1 )]
783 );
784 assert_eq!(arr.spine.len(), 3);
785
786 arr.compact_to(1).unwrap();
787 assert_eq!(arr.spine.len(), 3);
788
789 let key = &lit("a");
790 assert_eq!(arr.get(3, key), Some((lit("x"), 1 , 1 )));
791 let key = &lit("b");
792 assert_eq!(arr.get(3, key), Some((lit("y"), 2 , 1 )));
793 let key = &lit("c");
794 assert_eq!(arr.get(3, key), Some((lit("z"), 3 , 1 )));
795 }
796
797 assert!(arr.clone_future_only().is_none());
798 {
799 let arr2 = arr.clone_full_arrange().unwrap();
800 let mut arr = arr2.write();
801 assert_eq!(arr.spine.len(), 3);
802
803 arr.compact_to(2).unwrap();
804 assert_eq!(arr.spine.len(), 2);
805 let key = &lit("a");
806 assert_eq!(arr.get(3, key), Some((lit("x"), 1 , 1 )));
807 let key = &lit("b");
808 assert_eq!(arr.get(3, key), Some((lit("y"), 2 , 1 )));
809 let key = &lit("c");
810 assert_eq!(arr.get(3, key), Some((lit("z"), 3 , 1 )));
811 }
812 }
813
814 #[test]
815 fn test_reduce_expire_keys() {
816 let mut arr = Arrangement::default();
817 let expire_state = KeyExpiryManager {
818 event_ts_to_key: Default::default(),
819 key_expiration_duration: Some(10),
820 event_timestamp_from_row: Some(ScalarExpr::Column(0)),
821 };
822 arr.expire_state = Some(expire_state);
823 arr.full_arrangement = true;
824
825 let arr = ArrangeHandler::from(arr);
826
827 let updates: Vec<KeyValDiffRow> = vec![
828 (kv(lit(1i64), lit("x")), 1 , 1 ),
829 (kv(lit(2i64), lit("y")), 2 , 1 ),
830 (kv(lit(3i64), lit("z")), 3 , 1 ),
831 ];
832 {
833 let mut arr = arr.write();
834 arr.apply_updates(0, updates.clone()).unwrap();
835 arr.apply_updates(0, updates).unwrap();
837
838 assert_eq!(
839 arr.get_updates_in_range(1..=1),
840 vec![
841 (kv(lit(1i64), lit("x")), 1 , 1 ),
842 (kv(lit(1i64), lit("x")), 1 , 1 )
843 ]
844 );
845 assert_eq!(arr.spine.len(), 3);
846 arr.compact_to(1).unwrap();
847 assert_eq!(arr.spine.len(), 3);
848 }
849
850 {
851 let mut arr = arr.write();
852 assert_eq!(arr.spine.len(), 3);
853
854 arr.truncate_expired_keys(11);
855 assert_eq!(arr.spine.len(), 3);
856 let key = &lit(1i64);
857 assert_eq!(arr.get(11, key), Some((lit("x"), 1 , 2 )));
858 let key = &lit(2i64);
859 assert_eq!(arr.get(11, key), Some((lit("y"), 2 , 2 )));
860 let key = &lit(3i64);
861 assert_eq!(arr.get(11, key), Some((lit("z"), 3 , 2 )));
862
863 arr.truncate_expired_keys(12);
864 assert_eq!(arr.spine.len(), 3);
865 let key = &lit(1i64);
866 assert_eq!(arr.get(12, key), None);
867 let key = &lit(2i64);
868 assert_eq!(arr.get(12, key), Some((lit("y"), 2 , 2 )));
869 let key = &lit(3i64);
870 assert_eq!(arr.get(12, key), Some((lit("z"), 3 , 2 )));
871 assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 2);
872
873 arr.truncate_expired_keys(13);
874 assert_eq!(arr.spine.len(), 3);
875 let key = &lit(1i64);
876 assert_eq!(arr.get(13, key), None);
877 let key = &lit(2i64);
878 assert_eq!(arr.get(13, key), None);
879 let key = &lit(3i64);
880 assert_eq!(arr.get(13, key), Some((lit("z"), 3 , 2 )));
881 assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 1);
882 }
883 }
884
885 #[test]
886 fn test_apply_expired_keys() {
887 let mut arr = Arrangement::default();
889 let expire_state = KeyExpiryManager {
890 event_ts_to_key: Default::default(),
891 key_expiration_duration: Some(10),
892 event_timestamp_from_row: Some(ScalarExpr::Column(0)),
893 };
894 arr.expire_state = Some(expire_state);
895
896 let arr = ArrangeHandler::from(arr);
897
898 let updates: Vec<KeyValDiffRow> = vec![
899 (kv(lit(1i64), lit("x")), 1 , 1 ),
900 (kv(lit(2i64), lit("y")), 2 , 1 ),
901 ];
902 {
903 let mut arr = arr.write();
904 let expired_by = arr.apply_updates(12, updates).unwrap();
905 assert_eq!(expired_by, Some(1));
906
907 let key = &lit(1i64);
908 assert_eq!(arr.get(12, key), None);
909 let key = &lit(2i64);
910 assert_eq!(arr.get(12, key), Some((lit("y"), 2 , 1 )));
911 }
912 }
913
914 #[test]
918 fn test_split_off() {
919 let mut arr = Arrangement::default();
920 arr.spine.insert(1, Batch::default());
922 arr.spine.insert(3, Batch::default());
923
924 let updates = vec![(kv(lit("a"), lit("x")), 2 , 1 )];
925 arr.apply_updates(2, updates).unwrap();
927
928 let mut arr1 = arr.clone();
929 {
930 assert_eq!(arr.get_next_update_time(&1), Some(2));
931 let split = &arr.split_spine_le(&2);
933 assert_eq!(split.len(), 2);
934 assert_eq!(split[&2].len(), 1);
935
936 assert_eq!(arr.get_next_update_time(&1), None);
937 }
938
939 {
940 let split = &arr1.split_spine_le(&1);
942 assert_eq!(split.len(), 1);
943 assert_eq!(split[&1].len(), 0);
944 }
945 }
946
947 #[test]
950 fn test_get_by_range() {
951 let mut arr = Arrangement::default();
952
953 let updates: Vec<KeyValDiffRow> = vec![
956 (kv(lit("a"), lit("")), 2 , 1 ),
957 (kv(lit("a"), lit("")), 1 , 1 ),
958 (kv(lit("b"), lit("")), 4 , 1 ),
959 (kv(lit("c"), lit("")), 3 , 1 ),
960 (kv(lit("c"), lit("")), 6 , 1 ),
961 (kv(lit("a"), lit("")), 5 , 1 ),
962 ];
963 arr.apply_updates(0, updates).unwrap();
964 assert_eq!(
965 arr.get_updates_in_range(2..=5),
966 vec![
967 (kv(lit("a"), lit("")), 2 , 1 ),
968 (kv(lit("b"), lit("")), 4 , 1 ),
969 (kv(lit("c"), lit("")), 3 , 1 ),
970 (kv(lit("a"), lit("")), 5 , 1 ),
971 ]
972 );
973 }
974
975 #[test]
978 fn test_get_unaligned() {
979 let mut arr = Arrangement::default();
980
981 let key = &lit("a");
984 let updates: Vec<KeyValDiffRow> = vec![
985 (kv(key, lit(1)), 2 , 1 ),
986 (kv(key, lit(2)), 1 , 1 ),
987 (kv(key, lit(3)), 4 , 1 ),
988 (kv(key, lit(4)), 3 , 1 ),
989 (kv(key, lit(5)), 6 , 1 ),
990 (kv(key, lit(6)), 5 , 1 ),
991 ];
992 arr.apply_updates(0, updates).unwrap();
993 assert_eq!(arr.get(2, key), Some((lit(1), 2 , 1 )));
995 assert_eq!(arr.get(3, key), Some((lit(4), 3 , 1 )));
997 }
998
999 #[test]
1001 fn test_out_of_order_apply_updates() {
1002 let mut arr = Arrangement::default();
1003
1004 let key = &lit("a");
1005 let updates: Vec<KeyValDiffRow> = vec![
1006 (kv(key, lit(5)), 6 , 1 ),
1007 (kv(key, lit(2)), 2 , -1 ),
1008 (kv(key, lit(1)), 2 , 1 ),
1009 (kv(key, lit(2)), 1 , 1 ),
1010 (kv(key, lit(3)), 4 , 1 ),
1011 (kv(key, lit(4)), 3 , 1 ),
1012 (kv(key, lit(6)), 5 , 1 ),
1013 ];
1014 arr.apply_updates(0, updates.clone()).unwrap();
1015 let sorted = updates
1016 .iter()
1017 .sorted_by_key(|(_, ts, _)| *ts)
1018 .cloned()
1019 .collect_vec();
1020 assert_eq!(arr.get_updates_in_range(1..7), sorted);
1021 }
1022
1023 #[test]
1024 fn test_full_arrangement_get_from_first_entry() {
1025 let mut arr = Arrangement::default();
1026 let updates = vec![
1028 (kv(lit("a"), lit("x")), 3 , 1 ),
1029 (kv(lit("b"), lit("y")), 1 , 1 ),
1030 (kv(lit("b"), lit("y")), 2 , -1 ),
1031 ];
1032 arr.apply_updates(0, updates).unwrap();
1033 assert_eq!(arr.get(2, &lit("b")), None );
1034 arr.full_arrangement = true;
1035 assert_eq!(arr.get(2, &lit("b")), None );
1036
1037 arr.compact_to(1).unwrap();
1038
1039 assert_eq!(
1040 arr.get(1, &lit("b")),
1041 Some((lit("y"), 1, 1)) );
1043 }
1044}