1use std::collections::{BTreeMap, BTreeSet};
18use std::ops::Bound;
19use std::sync::Arc;
20
21use common_meta::key::flow::flow_state::FlowStat;
22use common_telemetry::trace;
23use datatypes::value::Value;
24use get_size2::GetSize;
25use smallvec::{SmallVec, smallvec};
26use tokio::sync::{RwLock, mpsc, oneshot};
27use tokio::time::Instant;
28
29use crate::error::InternalSnafu;
30use crate::expr::{EvalError, ScalarExpr};
31use crate::repr::{DiffRow, Duration, KeyValDiffRow, Row, Timestamp, value_to_internal_ts};
32
33pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
35
36pub fn get_value_heap_size(v: &Value) -> usize {
38 match v {
39 Value::Binary(bin) => bin.len(),
40 Value::String(s) => s.len(),
41 Value::List(list) => list.items().iter().map(get_value_heap_size).sum(),
42 _ => 0,
43 }
44}
45
46#[derive(Clone)]
47pub struct SizeReportSender {
48 inner: mpsc::Sender<oneshot::Sender<FlowStat>>,
49}
50
51impl SizeReportSender {
52 pub fn new() -> (Self, StateReportHandler) {
53 let (tx, rx) = mpsc::channel(1);
54 let zelf = Self { inner: tx };
55 (zelf, rx)
56 }
57
58 pub async fn query(&self, timeout: std::time::Duration) -> crate::Result<FlowStat> {
60 let (tx, rx) = oneshot::channel();
61 self.inner.send(tx).await.map_err(|_| {
62 InternalSnafu {
63 reason: "failed to send size report request due to receiver dropped",
64 }
65 .build()
66 })?;
67 let timeout = tokio::time::timeout(timeout, rx);
68 timeout
69 .await
70 .map_err(|_elapsed| {
71 InternalSnafu {
72 reason: "failed to receive size report after one second timeout",
73 }
74 .build()
75 })?
76 .map_err(|_| {
77 InternalSnafu {
78 reason: "failed to receive size report due to sender dropped",
79 }
80 .build()
81 })
82 }
83}
84
85pub type StateReportHandler = mpsc::Receiver<oneshot::Sender<FlowStat>>;
87
88pub type Spine = BTreeMap<Timestamp, Batch>;
91
92#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
98pub struct KeyExpiryManager {
99 event_ts_to_key: BTreeMap<Timestamp, BTreeSet<Row>>,
101
102 key_expiration_duration: Option<Duration>,
104
105 event_timestamp_from_row: Option<ScalarExpr>,
107}
108
109impl GetSize for KeyExpiryManager {
110 fn get_heap_size(&self) -> usize {
111 let row_size = if let Some(row_size) = &self
112 .event_ts_to_key
113 .first_key_value()
114 .map(|(_, v)| v.first().get_heap_size())
115 {
116 *row_size
117 } else {
118 0
119 };
120 self.event_ts_to_key
121 .values()
122 .map(|v| v.len() * row_size + std::mem::size_of::<i64>())
123 .sum::<usize>()
124 }
125}
126
127impl KeyExpiryManager {
128 pub fn new(
129 key_expiration_duration: Option<Duration>,
130 event_timestamp_from_row: Option<ScalarExpr>,
131 ) -> Self {
132 Self {
133 event_ts_to_key: Default::default(),
134 key_expiration_duration,
135 event_timestamp_from_row,
136 }
137 }
138
139 pub fn extract_event_ts(&self, row: &Row) -> Result<Option<Timestamp>, EvalError> {
143 let ts = self
144 .event_timestamp_from_row
145 .as_ref()
146 .map(|e| e.eval(&row.inner))
147 .transpose()?
148 .map(value_to_internal_ts)
149 .transpose()?;
150 Ok(ts)
151 }
152
153 pub fn compute_expiration_timestamp(&self, now: Timestamp) -> Option<Timestamp> {
155 self.key_expiration_duration.map(|d| now - d)
156 }
157
158 pub fn get_expire_duration_and_update_event_ts(
163 &mut self,
164 now: Timestamp,
165 row: &Row,
166 ) -> Result<Option<Duration>, EvalError> {
167 let Some(event_ts) = self.extract_event_ts(row)? else {
168 return Ok(None);
169 };
170
171 self.event_ts_to_key
172 .entry(event_ts)
173 .or_default()
174 .insert(row.clone());
175
176 if let Some(expire_time) = self.compute_expiration_timestamp(now)
177 && expire_time > event_ts
178 {
179 return Ok(Some(expire_time - event_ts));
181 }
182
183 Ok(None)
184 }
185
186 pub fn get_expire_duration(
190 &self,
191 now: Timestamp,
192 row: &Row,
193 ) -> Result<Option<Duration>, EvalError> {
194 let Some(event_ts) = self.extract_event_ts(row)? else {
195 return Ok(None);
196 };
197
198 if let Some(expire_time) = self.compute_expiration_timestamp(now)
199 && expire_time > event_ts
200 {
201 return Ok(Some(expire_time - event_ts));
203 }
204
205 Ok(None)
206 }
207
208 pub fn remove_expired_keys(&mut self, now: Timestamp) -> Option<impl Iterator<Item = Row>> {
211 let expire_time = self.compute_expiration_timestamp(now)?;
212
213 let mut before = self.event_ts_to_key.split_off(&expire_time);
214 std::mem::swap(&mut before, &mut self.event_ts_to_key);
215
216 Some(before.into_iter().flat_map(|(_ts, keys)| keys.into_iter()))
217 }
218}
219
220#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
233pub struct Arrangement {
234 name: Vec<String>,
237
238 spine: Spine,
254
255 full_arrangement: bool,
260
261 is_written: bool,
268
269 expire_state: Option<KeyExpiryManager>,
271
272 last_compaction_time: Option<Timestamp>,
274
275 estimated_size: usize,
277 last_size_update: Instant,
278 size_update_interval: tokio::time::Duration,
279}
280
281impl Arrangement {
282 fn compute_size(&self) -> usize {
283 self.spine
284 .values()
285 .map(|v| {
286 let per_entry_size = v
287 .first_key_value()
288 .map(|(k, v)| {
289 k.get_heap_size()
290 + v.len() * v.first().map(|r| r.get_heap_size()).unwrap_or(0)
291 })
292 .unwrap_or(0);
293 std::mem::size_of::<i64>() + v.len() * per_entry_size
294 })
295 .sum::<usize>()
296 + self.expire_state.get_heap_size()
297 + self.name.get_heap_size()
298 }
299
300 fn update_and_fetch_size(&mut self) -> usize {
301 if self.last_size_update.elapsed() > self.size_update_interval {
302 self.estimated_size = self.compute_size();
303 self.last_size_update = Instant::now();
304 }
305 self.estimated_size
306 }
307}
308
309impl GetSize for Arrangement {
310 fn get_heap_size(&self) -> usize {
311 self.estimated_size
312 }
313}
314
315impl Default for Arrangement {
316 fn default() -> Self {
317 Self {
318 spine: Default::default(),
319 full_arrangement: false,
320 is_written: false,
321 expire_state: None,
322 last_compaction_time: None,
323 name: Vec::new(),
324 estimated_size: 0,
325 last_size_update: Instant::now(),
326 size_update_interval: tokio::time::Duration::from_secs(3),
327 }
328 }
329}
330
331impl Arrangement {
332 pub fn new_with_name(name: Vec<String>) -> Self {
333 Self {
334 spine: Default::default(),
335 full_arrangement: false,
336 is_written: false,
337 expire_state: None,
338 last_compaction_time: None,
339 name,
340 estimated_size: 0,
341 last_size_update: Instant::now(),
342 size_update_interval: tokio::time::Duration::from_secs(3),
343 }
344 }
345
346 pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> {
347 self.expire_state.as_ref()
348 }
349
350 pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
351 self.expire_state = Some(expire_state);
352 }
353
354 pub fn apply_updates(
358 &mut self,
359 now: Timestamp,
360 updates: Vec<KeyValDiffRow>,
361 ) -> Result<Option<Duration>, EvalError> {
362 self.is_written = true;
363
364 let mut max_expired_by: Option<Duration> = None;
365
366 for ((key, val), update_ts, diff) in updates {
367 if let Some(s) = &mut self.expire_state
369 && let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)?
370 {
371 max_expired_by = max_expired_by.max(Some(expired_by));
372 trace!(
373 "Expired key: {:?}, expired by: {:?} with time being now={}",
374 key, expired_by, now
375 );
376 continue;
377 }
378
379 if self
381 .spine
382 .last_key_value()
383 .map(|(highest_ts, _)| *highest_ts < update_ts)
384 .unwrap_or(true)
385 {
386 self.spine.insert(update_ts, Default::default());
387 }
388
389 let (_, batch) = self
391 .spine
392 .range_mut(update_ts..)
393 .next()
394 .expect("Previous insert should have created the batch");
395
396 let key_updates = batch.entry(key).or_default();
397 key_updates.push((val, update_ts, diff));
398
399 key_updates.sort_by_key(|(_val, ts, _diff)| *ts);
402 }
403 self.update_and_fetch_size();
404 Ok(max_expired_by)
405 }
406
407 pub fn get_next_update_time(&self, now: &Timestamp) -> Option<Timestamp> {
409 for (_ts, batch) in self.spine.range((Bound::Excluded(now), Bound::Unbounded)) {
411 let min_ts = batch
412 .iter()
413 .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts).min())
414 .min();
415
416 if min_ts.is_some() {
417 return min_ts;
418 }
419 }
420
421 None
422 }
423
424 pub fn last_compaction_time(&self) -> Option<Timestamp> {
426 self.last_compaction_time
427 }
428
429 fn split_spine_le(&mut self, split_ts: &Timestamp) -> Spine {
431 self.split_batch_at(split_ts);
432 let mut before = self.spine.split_off(&(split_ts + 1));
433 std::mem::swap(&mut before, &mut self.spine);
434 before
435 }
436
437 fn split_batch_at(&mut self, split_ts: &Timestamp) {
439 if self.spine.contains_key(split_ts) {
443 return;
444 }
445
446 let Some((_, batch_to_split)) = self.spine.range_mut(split_ts..).next() else {
447 return; };
449
450 let mut new_batch = Batch::default();
454
455 batch_to_split.retain(|key, updates| {
456 let mut new_updates = SmallVec::default();
457
458 updates.retain(|(val, ts, diff)| {
459 if *ts <= *split_ts {
460 new_updates.push((val.clone(), *ts, *diff));
462 }
463 *ts > *split_ts
465 });
466
467 if !new_updates.is_empty() {
468 new_batch.insert(key.clone(), new_updates);
469 }
470
471 !updates.is_empty()
473 });
474
475 if !new_batch.is_empty() {
476 self.spine.insert(*split_ts, new_batch);
477 }
478 }
479
480 pub fn compact_to(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
484 let mut max_expired_by: Option<Duration> = None;
485
486 let batches_to_compact = self.split_spine_le(&now);
487 self.last_compaction_time = Some(now);
488
489 if !self.full_arrangement {
491 return Ok(None);
492 }
493
494 let mut compacting_batch = Batch::default();
496
497 for (_, batch) in batches_to_compact {
498 for (key, updates) in batch {
499 if let Some(s) = &mut self.expire_state
501 && let Some(expired_by) =
502 s.get_expire_duration_and_update_event_ts(now, &key)?
503 {
504 max_expired_by = max_expired_by.max(Some(expired_by));
505 continue;
506 }
507
508 let mut row = compacting_batch
509 .remove(&key)
510 .and_then(|mut updates| updates.pop());
512
513 for update in updates {
514 row = compact_diff_row(row, &update);
515 }
516 if let Some(compacted_update) = row {
517 compacting_batch.insert(key, smallvec![compacted_update]);
518 }
519 }
520 }
521
522 self.spine.insert(now, compacting_batch);
524 self.update_and_fetch_size();
525 Ok(max_expired_by)
526 }
527
528 pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp> + Clone>(
530 &self,
531 range: R,
532 ) -> Vec<KeyValDiffRow> {
533 let batches = match range.end_bound() {
535 Bound::Included(t) => self.spine.range(range.clone()).chain(
536 self.spine
537 .range((Bound::Excluded(t), Bound::Unbounded))
538 .next(),
539 ),
540 Bound::Excluded(t) => self.spine.range(range.clone()).chain(
541 self.spine
542 .range((Bound::Included(t), Bound::Unbounded))
543 .next(),
544 ),
545 _ => self.spine.range(range.clone()).chain(None),
546 };
547
548 let mut res = vec![];
549 for (_, batch) in batches {
550 for (key, updates) in batch {
551 for (val, ts, diff) in updates {
552 if range.contains(ts) {
553 res.push(((key.clone(), val.clone()), *ts, *diff));
554 }
555 }
556 }
557 }
558 res
559 }
560
561 pub fn truncate_expired_keys(&mut self, now: Timestamp) {
563 if let Some(s) = &mut self.expire_state
564 && let Some(expired_keys) = s.remove_expired_keys(now)
565 {
566 for key in expired_keys {
567 for (_, batch) in self.spine.iter_mut() {
568 batch.remove(&key);
569 }
570 }
571 }
572 }
573
574 pub fn get(&self, now: Timestamp, key: &Row) -> Option<DiffRow> {
578 if let Some(last_compaction_time) = self.last_compaction_time()
583 && now <= last_compaction_time
584 && self.full_arrangement
585 {
586 return self
588 .spine
589 .get(&last_compaction_time)
590 .and_then(|batch| batch.get(key))
591 .and_then(|updates| updates.first().cloned());
592 }
593
594 let batches = if self.spine.contains_key(&now) {
599 self.spine.range(..=now).chain(None)
601 } else {
602 self.spine.range(..=now).chain(
604 self.spine
605 .range((Bound::Excluded(now), Bound::Unbounded))
606 .next(),
607 )
608 };
609
610 let mut final_val = None;
611 for (ts, batch) in batches {
612 if let Some(updates) = batch.get(key) {
613 if *ts <= now {
614 for update in updates {
615 final_val = compact_diff_row(final_val, update);
616 }
617 } else {
618 for update in updates.iter().filter(|(_, ts, _)| *ts <= now) {
619 final_val = compact_diff_row(final_val, update);
620 }
621 }
622 }
623 }
624 final_val
625 }
626}
627
628fn compact_diff_row(old_row: Option<DiffRow>, new_row: &DiffRow) -> Option<DiffRow> {
629 let (val, ts, diff) = new_row;
630 match (old_row, diff) {
631 (Some((row, _old_ts, old_diff)), diff) if row == *val && old_diff + diff == 0 => {
632 None
634 }
635 (Some((row, _old_ts, old_diff)), diff) if row == *val && old_diff + diff != 0 => {
636 Some((row, *ts, old_diff + *diff))
637 }
638 _ => Some((val.clone(), *ts, *diff)),
641 }
642}
643
644pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>;
646pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>;
648
649#[derive(Debug, Clone)]
651pub struct ArrangeHandler {
652 inner: Arc<RwLock<Arrangement>>,
653}
654
655impl ArrangeHandler {
656 pub fn from(arr: Arrangement) -> Self {
658 Self {
659 inner: Arc::new(RwLock::new(arr)),
660 }
661 }
662
663 pub fn write(&self) -> ArrangeWriter<'_> {
665 self.inner.blocking_write()
666 }
667
668 pub fn read(&self) -> ArrangeReader<'_> {
670 self.inner.blocking_read()
671 }
672
673 pub fn clone_future_only(&self) -> Option<Self> {
677 if self.read().is_written {
678 return None;
679 }
680 Some(Self {
681 inner: self.inner.clone(),
682 })
683 }
684
685 pub fn clone_full_arrange(&self) -> Option<Self> {
692 {
693 let zelf = self.read();
694 if !zelf.full_arrangement && zelf.is_written {
695 return None;
696 }
697 }
698
699 self.write().full_arrangement = true;
700 Some(Self {
701 inner: self.inner.clone(),
702 })
703 }
704
705 pub fn set_full_arrangement(&self, full: bool) {
706 self.write().full_arrangement = full;
707 }
708
709 pub fn is_full_arrangement(&self) -> bool {
710 self.read().full_arrangement
711 }
712}
713
714#[cfg(test)]
715mod test {
716 use std::borrow::Borrow;
717
718 use datatypes::value::Value;
719 use itertools::Itertools;
720
721 use super::*;
722
723 fn lit(v: impl Into<Value>) -> Row {
724 Row::new(vec![v.into()])
725 }
726
727 fn kv(key: impl Borrow<Row>, value: impl Borrow<Row>) -> (Row, Row) {
728 (key.borrow().clone(), value.borrow().clone())
729 }
730
731 #[test]
732 fn test_future_get() {
733 let arr = ArrangeHandler::from(Arrangement::default());
735
736 let mut arr = arr.write();
737
738 let key = lit("a");
739 let updates: Vec<KeyValDiffRow> = vec![
740 (kv(&key, lit("b")), 1 , 1 ),
741 (kv(&key, lit("c")), 2 , 1 ),
742 (kv(&key, lit("d")), 3 , 1 ),
743 ];
744
745 arr.apply_updates(0, updates).unwrap();
747
748 assert_eq!(arr.get(1, &key), Some((lit("b"), 1 , 1 )));
749 assert_eq!(arr.get(2, &key), Some((lit("c"), 2 , 1 )));
750 assert_eq!(arr.get(3, &key), Some((lit("d"), 3 , 1 )));
751 }
752
753 #[test]
754 fn only_save_future_updates() {
755 let arr = ArrangeHandler::from(Arrangement::default());
759
760 {
761 let arr1 = arr.clone_full_arrange();
762 assert!(arr1.is_some());
763 let arr2 = arr.clone_future_only();
764 assert!(arr2.is_some());
765 }
766
767 {
768 let mut arr = arr.write();
769 let updates: Vec<KeyValDiffRow> = vec![
770 (kv(lit("a"), lit("x")), 1 , 1 ),
771 (kv(lit("b"), lit("y")), 2 , 1 ),
772 (kv(lit("c"), lit("z")), 3 , 1 ),
773 ];
774 arr.apply_updates(0, updates).unwrap();
776
777 assert_eq!(
778 arr.get_updates_in_range(1..=1),
779 vec![(kv(lit("a"), lit("x")), 1 , 1 )]
780 );
781 assert_eq!(arr.spine.len(), 3);
782
783 arr.compact_to(1).unwrap();
784 assert_eq!(arr.spine.len(), 3);
785
786 let key = &lit("a");
787 assert_eq!(arr.get(3, key), Some((lit("x"), 1 , 1 )));
788 let key = &lit("b");
789 assert_eq!(arr.get(3, key), Some((lit("y"), 2 , 1 )));
790 let key = &lit("c");
791 assert_eq!(arr.get(3, key), Some((lit("z"), 3 , 1 )));
792 }
793
794 assert!(arr.clone_future_only().is_none());
795 {
796 let arr2 = arr.clone_full_arrange().unwrap();
797 let mut arr = arr2.write();
798 assert_eq!(arr.spine.len(), 3);
799
800 arr.compact_to(2).unwrap();
801 assert_eq!(arr.spine.len(), 2);
802 let key = &lit("a");
803 assert_eq!(arr.get(3, key), Some((lit("x"), 1 , 1 )));
804 let key = &lit("b");
805 assert_eq!(arr.get(3, key), Some((lit("y"), 2 , 1 )));
806 let key = &lit("c");
807 assert_eq!(arr.get(3, key), Some((lit("z"), 3 , 1 )));
808 }
809 }
810
811 #[test]
812 fn test_reduce_expire_keys() {
813 let mut arr = Arrangement::default();
814 let expire_state = KeyExpiryManager {
815 event_ts_to_key: Default::default(),
816 key_expiration_duration: Some(10),
817 event_timestamp_from_row: Some(ScalarExpr::Column(0)),
818 };
819 arr.expire_state = Some(expire_state);
820 arr.full_arrangement = true;
821
822 let arr = ArrangeHandler::from(arr);
823
824 let updates: Vec<KeyValDiffRow> = vec![
825 (kv(lit(1i64), lit("x")), 1 , 1 ),
826 (kv(lit(2i64), lit("y")), 2 , 1 ),
827 (kv(lit(3i64), lit("z")), 3 , 1 ),
828 ];
829 {
830 let mut arr = arr.write();
831 arr.apply_updates(0, updates.clone()).unwrap();
832 arr.apply_updates(0, updates).unwrap();
834
835 assert_eq!(
836 arr.get_updates_in_range(1..=1),
837 vec![
838 (kv(lit(1i64), lit("x")), 1 , 1 ),
839 (kv(lit(1i64), lit("x")), 1 , 1 )
840 ]
841 );
842 assert_eq!(arr.spine.len(), 3);
843 arr.compact_to(1).unwrap();
844 assert_eq!(arr.spine.len(), 3);
845 }
846
847 {
848 let mut arr = arr.write();
849 assert_eq!(arr.spine.len(), 3);
850
851 arr.truncate_expired_keys(11);
852 assert_eq!(arr.spine.len(), 3);
853 let key = &lit(1i64);
854 assert_eq!(arr.get(11, key), Some((lit("x"), 1 , 2 )));
855 let key = &lit(2i64);
856 assert_eq!(arr.get(11, key), Some((lit("y"), 2 , 2 )));
857 let key = &lit(3i64);
858 assert_eq!(arr.get(11, key), Some((lit("z"), 3 , 2 )));
859
860 arr.truncate_expired_keys(12);
861 assert_eq!(arr.spine.len(), 3);
862 let key = &lit(1i64);
863 assert_eq!(arr.get(12, key), None);
864 let key = &lit(2i64);
865 assert_eq!(arr.get(12, key), Some((lit("y"), 2 , 2 )));
866 let key = &lit(3i64);
867 assert_eq!(arr.get(12, key), Some((lit("z"), 3 , 2 )));
868 assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 2);
869
870 arr.truncate_expired_keys(13);
871 assert_eq!(arr.spine.len(), 3);
872 let key = &lit(1i64);
873 assert_eq!(arr.get(13, key), None);
874 let key = &lit(2i64);
875 assert_eq!(arr.get(13, key), None);
876 let key = &lit(3i64);
877 assert_eq!(arr.get(13, key), Some((lit("z"), 3 , 2 )));
878 assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 1);
879 }
880 }
881
882 #[test]
883 fn test_apply_expired_keys() {
884 let mut arr = Arrangement::default();
886 let expire_state = KeyExpiryManager {
887 event_ts_to_key: Default::default(),
888 key_expiration_duration: Some(10),
889 event_timestamp_from_row: Some(ScalarExpr::Column(0)),
890 };
891 arr.expire_state = Some(expire_state);
892
893 let arr = ArrangeHandler::from(arr);
894
895 let updates: Vec<KeyValDiffRow> = vec![
896 (kv(lit(1i64), lit("x")), 1 , 1 ),
897 (kv(lit(2i64), lit("y")), 2 , 1 ),
898 ];
899 {
900 let mut arr = arr.write();
901 let expired_by = arr.apply_updates(12, updates).unwrap();
902 assert_eq!(expired_by, Some(1));
903
904 let key = &lit(1i64);
905 assert_eq!(arr.get(12, key), None);
906 let key = &lit(2i64);
907 assert_eq!(arr.get(12, key), Some((lit("y"), 2 , 1 )));
908 }
909 }
910
911 #[test]
915 fn test_split_off() {
916 let mut arr = Arrangement::default();
917 arr.spine.insert(1, Batch::default());
919 arr.spine.insert(3, Batch::default());
920
921 let updates = vec![(kv(lit("a"), lit("x")), 2 , 1 )];
922 arr.apply_updates(2, updates).unwrap();
924
925 let mut arr1 = arr.clone();
926 {
927 assert_eq!(arr.get_next_update_time(&1), Some(2));
928 let split = &arr.split_spine_le(&2);
930 assert_eq!(split.len(), 2);
931 assert_eq!(split[&2].len(), 1);
932
933 assert_eq!(arr.get_next_update_time(&1), None);
934 }
935
936 {
937 let split = &arr1.split_spine_le(&1);
939 assert_eq!(split.len(), 1);
940 assert_eq!(split[&1].len(), 0);
941 }
942 }
943
944 #[test]
947 fn test_get_by_range() {
948 let mut arr = Arrangement::default();
949
950 let updates: Vec<KeyValDiffRow> = vec![
953 (kv(lit("a"), lit("")), 2 , 1 ),
954 (kv(lit("a"), lit("")), 1 , 1 ),
955 (kv(lit("b"), lit("")), 4 , 1 ),
956 (kv(lit("c"), lit("")), 3 , 1 ),
957 (kv(lit("c"), lit("")), 6 , 1 ),
958 (kv(lit("a"), lit("")), 5 , 1 ),
959 ];
960 arr.apply_updates(0, updates).unwrap();
961 assert_eq!(
962 arr.get_updates_in_range(2..=5),
963 vec![
964 (kv(lit("a"), lit("")), 2 , 1 ),
965 (kv(lit("b"), lit("")), 4 , 1 ),
966 (kv(lit("c"), lit("")), 3 , 1 ),
967 (kv(lit("a"), lit("")), 5 , 1 ),
968 ]
969 );
970 }
971
972 #[test]
975 fn test_get_unaligned() {
976 let mut arr = Arrangement::default();
977
978 let key = &lit("a");
981 let updates: Vec<KeyValDiffRow> = vec![
982 (kv(key, lit(1)), 2 , 1 ),
983 (kv(key, lit(2)), 1 , 1 ),
984 (kv(key, lit(3)), 4 , 1 ),
985 (kv(key, lit(4)), 3 , 1 ),
986 (kv(key, lit(5)), 6 , 1 ),
987 (kv(key, lit(6)), 5 , 1 ),
988 ];
989 arr.apply_updates(0, updates).unwrap();
990 assert_eq!(arr.get(2, key), Some((lit(1), 2 , 1 )));
992 assert_eq!(arr.get(3, key), Some((lit(4), 3 , 1 )));
994 }
995
996 #[test]
998 fn test_out_of_order_apply_updates() {
999 let mut arr = Arrangement::default();
1000
1001 let key = &lit("a");
1002 let updates: Vec<KeyValDiffRow> = vec![
1003 (kv(key, lit(5)), 6 , 1 ),
1004 (kv(key, lit(2)), 2 , -1 ),
1005 (kv(key, lit(1)), 2 , 1 ),
1006 (kv(key, lit(2)), 1 , 1 ),
1007 (kv(key, lit(3)), 4 , 1 ),
1008 (kv(key, lit(4)), 3 , 1 ),
1009 (kv(key, lit(6)), 5 , 1 ),
1010 ];
1011 arr.apply_updates(0, updates.clone()).unwrap();
1012 let sorted = updates
1013 .iter()
1014 .sorted_by_key(|(_, ts, _)| *ts)
1015 .cloned()
1016 .collect_vec();
1017 assert_eq!(arr.get_updates_in_range(1..7), sorted);
1018 }
1019
1020 #[test]
1021 fn test_full_arrangement_get_from_first_entry() {
1022 let mut arr = Arrangement::default();
1023 let updates = vec![
1025 (kv(lit("a"), lit("x")), 3 , 1 ),
1026 (kv(lit("b"), lit("y")), 1 , 1 ),
1027 (kv(lit("b"), lit("y")), 2 , -1 ),
1028 ];
1029 arr.apply_updates(0, updates).unwrap();
1030 assert_eq!(arr.get(2, &lit("b")), None );
1031 arr.full_arrangement = true;
1032 assert_eq!(arr.get(2, &lit("b")), None );
1033
1034 arr.compact_to(1).unwrap();
1035
1036 assert_eq!(
1037 arr.get(1, &lit("b")),
1038 Some((lit("y"), 1, 1)) );
1040 }
1041}