flow/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! utilities for managing state of dataflow execution
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::ops::Bound;
19use std::sync::Arc;
20
21use common_meta::key::flow::flow_state::FlowStat;
22use common_telemetry::trace;
23use datatypes::value::Value;
24use get_size2::GetSize;
25use smallvec::{smallvec, SmallVec};
26use tokio::sync::{mpsc, oneshot, RwLock};
27use tokio::time::Instant;
28
29use crate::error::InternalSnafu;
30use crate::expr::{EvalError, ScalarExpr};
31use crate::repr::{value_to_internal_ts, DiffRow, Duration, KeyValDiffRow, Row, Timestamp};
32
33/// A batch of updates, arranged by key
34pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
35
36/// Get a estimate of heap size of a value
37pub fn get_value_heap_size(v: &Value) -> usize {
38    match v {
39        Value::Binary(bin) => bin.len(),
40        Value::String(s) => s.len(),
41        Value::List(list) => list.items().iter().map(get_value_heap_size).sum(),
42        _ => 0,
43    }
44}
45
46#[derive(Clone)]
47pub struct SizeReportSender {
48    inner: mpsc::Sender<oneshot::Sender<FlowStat>>,
49}
50
51impl SizeReportSender {
52    pub fn new() -> (Self, StateReportHandler) {
53        let (tx, rx) = mpsc::channel(1);
54        let zelf = Self { inner: tx };
55        (zelf, rx)
56    }
57
58    /// Query the size report, will timeout after one second if no response
59    pub async fn query(&self, timeout: std::time::Duration) -> crate::Result<FlowStat> {
60        let (tx, rx) = oneshot::channel();
61        self.inner.send(tx).await.map_err(|_| {
62            InternalSnafu {
63                reason: "failed to send size report request due to receiver dropped",
64            }
65            .build()
66        })?;
67        let timeout = tokio::time::timeout(timeout, rx);
68        timeout
69            .await
70            .map_err(|_elapsed| {
71                InternalSnafu {
72                    reason: "failed to receive size report after one second timeout",
73                }
74                .build()
75            })?
76            .map_err(|_| {
77                InternalSnafu {
78                    reason: "failed to receive size report due to sender dropped",
79                }
80                .build()
81            })
82    }
83}
84
85/// Handle the size report request, and send the report back
86pub type StateReportHandler = mpsc::Receiver<oneshot::Sender<FlowStat>>;
87
88/// A spine of batches, arranged by timestamp
89/// TODO(discord9): consider internally index by key, value, and timestamp for faster lookup
90pub type Spine = BTreeMap<Timestamp, Batch>;
91
92/// Determine when should a key expire according to it's event timestamp in key.
93///
94/// If a key is expired, any future updates to it should be ignored.
95///
96/// Note that key is expired by it's event timestamp (contained in the key), not by the time it's inserted (system timestamp).
97#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
98pub struct KeyExpiryManager {
99    /// A map from event timestamp to key, used for expire keys.
100    event_ts_to_key: BTreeMap<Timestamp, BTreeSet<Row>>,
101
102    /// Duration after which a key is considered expired, and will be removed from state
103    key_expiration_duration: Option<Duration>,
104
105    /// Expression to get timestamp from key row
106    event_timestamp_from_row: Option<ScalarExpr>,
107}
108
109impl GetSize for KeyExpiryManager {
110    fn get_heap_size(&self) -> usize {
111        let row_size = if let Some(row_size) = &self
112            .event_ts_to_key
113            .first_key_value()
114            .map(|(_, v)| v.first().get_heap_size())
115        {
116            *row_size
117        } else {
118            0
119        };
120        self.event_ts_to_key
121            .values()
122            .map(|v| v.len() * row_size + std::mem::size_of::<i64>())
123            .sum::<usize>()
124    }
125}
126
127impl KeyExpiryManager {
128    pub fn new(
129        key_expiration_duration: Option<Duration>,
130        event_timestamp_from_row: Option<ScalarExpr>,
131    ) -> Self {
132        Self {
133            event_ts_to_key: Default::default(),
134            key_expiration_duration,
135            event_timestamp_from_row,
136        }
137    }
138
139    /// Extract event timestamp from key row.
140    ///
141    /// If no expire state is set, return None.
142    pub fn extract_event_ts(&self, row: &Row) -> Result<Option<Timestamp>, EvalError> {
143        let ts = self
144            .event_timestamp_from_row
145            .as_ref()
146            .map(|e| e.eval(&row.inner))
147            .transpose()?
148            .map(value_to_internal_ts)
149            .transpose()?;
150        Ok(ts)
151    }
152
153    /// Return timestamp that should be expired by the time `now` by compute `now - expiration_duration`
154    pub fn compute_expiration_timestamp(&self, now: Timestamp) -> Option<Timestamp> {
155        self.key_expiration_duration.map(|d| now - d)
156    }
157
158    /// Update the event timestamp to key mapping.
159    ///
160    /// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired.
161    /// - If it's not expired, return None
162    pub fn get_expire_duration_and_update_event_ts(
163        &mut self,
164        now: Timestamp,
165        row: &Row,
166    ) -> Result<Option<Duration>, EvalError> {
167        let Some(event_ts) = self.extract_event_ts(row)? else {
168            return Ok(None);
169        };
170
171        self.event_ts_to_key
172            .entry(event_ts)
173            .or_default()
174            .insert(row.clone());
175
176        if let Some(expire_time) = self.compute_expiration_timestamp(now) {
177            if expire_time > event_ts {
178                // return how much time it's expired
179                return Ok(Some(expire_time - event_ts));
180            }
181        }
182
183        Ok(None)
184    }
185
186    /// Get the expire duration of a key, if it's expired by now.
187    ///
188    /// Return None if the key is not expired
189    pub fn get_expire_duration(
190        &self,
191        now: Timestamp,
192        row: &Row,
193    ) -> Result<Option<Duration>, EvalError> {
194        let Some(event_ts) = self.extract_event_ts(row)? else {
195            return Ok(None);
196        };
197
198        if let Some(expire_time) = self.compute_expiration_timestamp(now) {
199            if expire_time > event_ts {
200                // return how much time it's expired
201                return Ok(Some(expire_time - event_ts));
202            }
203        }
204
205        Ok(None)
206    }
207
208    /// Remove expired keys from the state, and return an iterator of removed keys with
209    /// event_ts less than expire time (i.e. now - key_expiration_duration).
210    pub fn remove_expired_keys(&mut self, now: Timestamp) -> Option<impl Iterator<Item = Row>> {
211        let expire_time = self.compute_expiration_timestamp(now)?;
212
213        let mut before = self.event_ts_to_key.split_off(&expire_time);
214        std::mem::swap(&mut before, &mut self.event_ts_to_key);
215
216        Some(before.into_iter().flat_map(|(_ts, keys)| keys.into_iter()))
217    }
218}
219
220/// A shared state of key-value pair for various state in dataflow execution.
221///
222/// i.e: Mfp operator with temporal filter need to store it's future output so that it can add now, and delete later.
223/// To get all needed updates in a time span, use [`get_updates_in_range`].
224///
225/// And reduce operator need full state of it's output, so that it can query (and modify by calling [`apply_updates`])
226/// existing state, also need a way to expire keys. To get a key's current value, use [`get`] with time being `now`
227/// so it's like:
228/// `mfp operator -> arrange(store futures only, no expire) -> reduce operator <-> arrange(full, with key expiring time) -> output`
229///
230/// Note the two way arrow between reduce operator and arrange, it's because reduce operator need to query existing state
231/// and also need to update existing state.
232#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
233pub struct Arrangement {
234    /// A name or identifier for the arrangement which can be used for debugging or logging purposes.
235    /// This field is not critical to the functionality but aids in monitoring and management of arrangements.
236    name: Vec<String>,
237
238    /// Manages a collection of pending updates in a `BTreeMap` where each key is a timestamp and each value is a `Batch` of updates.
239    /// Updates are grouped into batched based on their timestamps.
240    /// Each batch covers a range of time from the last key (exclusive) to the current key (inclusive).
241    ///
242    /// - Updates with a timestamp (`update_ts`) that falls between two keys are placed in the batch of the higher key.
243    ///   For example, if the keys are `1, 5, 7, 9` and `update_ts` is `6`, the update goes into the batch with key `7`.
244    /// - Updates with a timestamp before the first key are categorized under the first key.
245    /// - Updates with a timestamp greater than the highest key result in a new batch being created with that timestamp as the key.
246    ///
247    /// The first key represents the current state and includes consolidated updates from the past. It is always set to `now`.
248    /// Each key should have only one update per batch with a `diff=1` for the batch representing the current time (`now`).
249    ///
250    /// Since updates typically occur as a delete followed by an insert, a small vector of size 2 is used to store updates for efficiency.
251    ///
252    /// TODO(discord9): Consider balancing the batch size?
253    spine: Spine,
254
255    /// Indicates whether the arrangement maintains a complete history of updates.
256    /// - `true`: Maintains all past and future updates, necessary for full state reconstruction at any point in time.
257    /// - `false`: Only future updates are retained, optimizing for scenarios where past state is irrelevant and conserving resources.
258    ///            Useful for case like `map -> arrange -> reduce`.
259    full_arrangement: bool,
260
261    /// Indicates whether the arrangement has been modified since its creation.
262    /// - `true`: The arrangement has been written to, meaning it has received updates.
263    ///           Cloning this arrangement is generally unsafe as it may lead to inconsistencies if the clone is modified independently.
264    ///           However, cloning is safe when both the original and the clone require a full arrangement, as this ensures consistency.
265    /// - `false`: The arrangement is in its initial state and has not been modified. It can be safely cloned and shared
266    ///            without concerns of carrying over unintended state changes.
267    is_written: bool,
268
269    /// Manage the expire state of the arrangement.
270    expire_state: Option<KeyExpiryManager>,
271
272    /// The time that the last compaction happened, also known as the current time.
273    last_compaction_time: Option<Timestamp>,
274
275    /// Estimated size of the arrangement in heap size.
276    estimated_size: usize,
277    last_size_update: Instant,
278    size_update_interval: tokio::time::Duration,
279}
280
281impl Arrangement {
282    fn compute_size(&self) -> usize {
283        self.spine
284            .values()
285            .map(|v| {
286                let per_entry_size = v
287                    .first_key_value()
288                    .map(|(k, v)| {
289                        k.get_heap_size()
290                            + v.len() * v.first().map(|r| r.get_heap_size()).unwrap_or(0)
291                    })
292                    .unwrap_or(0);
293                std::mem::size_of::<i64>() + v.len() * per_entry_size
294            })
295            .sum::<usize>()
296            + self.expire_state.get_heap_size()
297            + self.name.get_heap_size()
298    }
299
300    fn update_and_fetch_size(&mut self) -> usize {
301        if self.last_size_update.elapsed() > self.size_update_interval {
302            self.estimated_size = self.compute_size();
303            self.last_size_update = Instant::now();
304        }
305        self.estimated_size
306    }
307}
308
309impl GetSize for Arrangement {
310    fn get_heap_size(&self) -> usize {
311        self.estimated_size
312    }
313}
314
315impl Default for Arrangement {
316    fn default() -> Self {
317        Self {
318            spine: Default::default(),
319            full_arrangement: false,
320            is_written: false,
321            expire_state: None,
322            last_compaction_time: None,
323            name: Vec::new(),
324            estimated_size: 0,
325            last_size_update: Instant::now(),
326            size_update_interval: tokio::time::Duration::from_secs(3),
327        }
328    }
329}
330
331impl Arrangement {
332    pub fn new_with_name(name: Vec<String>) -> Self {
333        Self {
334            spine: Default::default(),
335            full_arrangement: false,
336            is_written: false,
337            expire_state: None,
338            last_compaction_time: None,
339            name,
340            estimated_size: 0,
341            last_size_update: Instant::now(),
342            size_update_interval: tokio::time::Duration::from_secs(3),
343        }
344    }
345
346    pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> {
347        self.expire_state.as_ref()
348    }
349
350    pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
351        self.expire_state = Some(expire_state);
352    }
353
354    /// Apply updates into spine, with no respect of whether the updates are in futures, past, or now.
355    ///
356    /// Return the maximum expire time (already expire by how much time) of all updates if any keys is already expired.
357    pub fn apply_updates(
358        &mut self,
359        now: Timestamp,
360        updates: Vec<KeyValDiffRow>,
361    ) -> Result<Option<Duration>, EvalError> {
362        self.is_written = true;
363
364        let mut max_expired_by: Option<Duration> = None;
365
366        for ((key, val), update_ts, diff) in updates {
367            // check if the key is expired
368            if let Some(s) = &mut self.expire_state {
369                if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? {
370                    max_expired_by = max_expired_by.max(Some(expired_by));
371                    trace!(
372                        "Expired key: {:?}, expired by: {:?} with time being now={}",
373                        key,
374                        expired_by,
375                        now
376                    );
377                    continue;
378                }
379            }
380
381            // If the `highest_ts` is less than `update_ts`, we need to create a new batch with key being `update_ts`.
382            if self
383                .spine
384                .last_key_value()
385                .map(|(highest_ts, _)| *highest_ts < update_ts)
386                .unwrap_or(true)
387            {
388                self.spine.insert(update_ts, Default::default());
389            }
390
391            // Get the first batch with key that's greater or equal to `update_ts`.
392            let (_, batch) = self
393                .spine
394                .range_mut(update_ts..)
395                .next()
396                .expect("Previous insert should have created the batch");
397
398            let key_updates = batch.entry(key).or_default();
399            key_updates.push((val, update_ts, diff));
400
401            // a stable sort make updates sort in order of insertion
402            // without changing the order of updates within same tick
403            key_updates.sort_by_key(|(_val, ts, _diff)| *ts);
404        }
405        self.update_and_fetch_size();
406        Ok(max_expired_by)
407    }
408
409    /// Find out the time of next update in the future that is the next update with `timestamp > now`.
410    pub fn get_next_update_time(&self, now: &Timestamp) -> Option<Timestamp> {
411        // iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch
412        for (_ts, batch) in self.spine.range((Bound::Excluded(now), Bound::Unbounded)) {
413            let min_ts = batch
414                .iter()
415                .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts).min())
416                .min();
417
418            if min_ts.is_some() {
419                return min_ts;
420            }
421        }
422
423        None
424    }
425
426    /// Get the last compaction time.
427    pub fn last_compaction_time(&self) -> Option<Timestamp> {
428        self.last_compaction_time
429    }
430
431    /// Split spine off at `split_ts`, and return the spine that's before `split_ts` (including `split_ts`).
432    fn split_spine_le(&mut self, split_ts: &Timestamp) -> Spine {
433        self.split_batch_at(split_ts);
434        let mut before = self.spine.split_off(&(split_ts + 1));
435        std::mem::swap(&mut before, &mut self.spine);
436        before
437    }
438
439    /// Split the batch at `split_ts` into two parts.
440    fn split_batch_at(&mut self, split_ts: &Timestamp) {
441        // FAST PATH:
442        //
443        // The `split_ts` hit the boundary of a batch, nothing to do.
444        if self.spine.contains_key(split_ts) {
445            return;
446        }
447
448        let Some((_, batch_to_split)) = self.spine.range_mut(split_ts..).next() else {
449            return; // No batch to split, nothing to do.
450        };
451
452        // SLOW PATH:
453        //
454        // The `split_ts` is in the middle of a batch, we need to split the batch into two parts.
455        let mut new_batch = Batch::default();
456
457        batch_to_split.retain(|key, updates| {
458            let mut new_updates = SmallVec::default();
459
460            updates.retain(|(val, ts, diff)| {
461                if *ts <= *split_ts {
462                    // Move the updates that are less than or equal to `split_ts` to the new batch.
463                    new_updates.push((val.clone(), *ts, *diff));
464                }
465                // Keep the updates that are greater than `split_ts` in the current batch.
466                *ts > *split_ts
467            });
468
469            if !new_updates.is_empty() {
470                new_batch.insert(key.clone(), new_updates);
471            }
472
473            // Keep the key in the current batch if it still has updates.
474            !updates.is_empty()
475        });
476
477        if !new_batch.is_empty() {
478            self.spine.insert(*split_ts, new_batch);
479        }
480    }
481
482    /// Advance time to `now` and consolidate all older (`now` included) updates to the first key.
483    ///
484    /// Return the maximum expire time(already expire by how much time) of all updates if any keys is already expired.
485    pub fn compact_to(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
486        let mut max_expired_by: Option<Duration> = None;
487
488        let batches_to_compact = self.split_spine_le(&now);
489        self.last_compaction_time = Some(now);
490
491        // If a full arrangement is not needed, we can just discard everything before and including now,
492        if !self.full_arrangement {
493            return Ok(None);
494        }
495
496        // else we update them into current state.
497        let mut compacting_batch = Batch::default();
498
499        for (_, batch) in batches_to_compact {
500            for (key, updates) in batch {
501                // check if the key is expired
502                if let Some(s) = &mut self.expire_state {
503                    if let Some(expired_by) =
504                        s.get_expire_duration_and_update_event_ts(now, &key)?
505                    {
506                        max_expired_by = max_expired_by.max(Some(expired_by));
507                        continue;
508                    }
509                }
510
511                let mut row = compacting_batch
512                    .remove(&key)
513                    // only one row in the updates during compaction
514                    .and_then(|mut updates| updates.pop());
515
516                for update in updates {
517                    row = compact_diff_row(row, &update);
518                }
519                if let Some(compacted_update) = row {
520                    compacting_batch.insert(key, smallvec![compacted_update]);
521                }
522            }
523        }
524
525        // insert the compacted batch into spine with key being `now`
526        self.spine.insert(now, compacting_batch);
527        self.update_and_fetch_size();
528        Ok(max_expired_by)
529    }
530
531    /// Get the updates of the arrangement from the given range of time.
532    pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp> + Clone>(
533        &self,
534        range: R,
535    ) -> Vec<KeyValDiffRow> {
536        // Include the next batch in case the range is not aligned with the boundary of a batch.
537        let batches = match range.end_bound() {
538            Bound::Included(t) => self.spine.range(range.clone()).chain(
539                self.spine
540                    .range((Bound::Excluded(t), Bound::Unbounded))
541                    .next(),
542            ),
543            Bound::Excluded(t) => self.spine.range(range.clone()).chain(
544                self.spine
545                    .range((Bound::Included(t), Bound::Unbounded))
546                    .next(),
547            ),
548            _ => self.spine.range(range.clone()).chain(None),
549        };
550
551        let mut res = vec![];
552        for (_, batch) in batches {
553            for (key, updates) in batch {
554                for (val, ts, diff) in updates {
555                    if range.contains(ts) {
556                        res.push(((key.clone(), val.clone()), *ts, *diff));
557                    }
558                }
559            }
560        }
561        res
562    }
563
564    /// Expire keys in now that are older than expire_time, intended for reducing memory usage and limit late data arrive
565    pub fn truncate_expired_keys(&mut self, now: Timestamp) {
566        if let Some(s) = &mut self.expire_state {
567            if let Some(expired_keys) = s.remove_expired_keys(now) {
568                for key in expired_keys {
569                    for (_, batch) in self.spine.iter_mut() {
570                        batch.remove(&key);
571                    }
572                }
573            }
574        }
575    }
576
577    /// Get current state of things.
578    ///
579    /// Useful for query existing keys (i.e. reduce and join operator need to query existing state)
580    pub fn get(&self, now: Timestamp, key: &Row) -> Option<DiffRow> {
581        // FAST PATH:
582        //
583        // If `now <= last_compaction_time`, and it's full arrangement, we can directly return the value
584        // from the current state (which should be the first batch in the spine if it exist).
585        if let Some(last_compaction_time) = self.last_compaction_time()
586            && now <= last_compaction_time
587            && self.full_arrangement
588        {
589            // if the last compaction time's batch is not exist, it means the spine doesn't have it's first batch as current value
590            return self
591                .spine
592                .get(&last_compaction_time)
593                .and_then(|batch| batch.get(key))
594                .and_then(|updates| updates.first().cloned());
595        }
596
597        // SLOW PATH:
598        //
599        // Accumulate updates from the oldest batch to the batch containing `now`.
600
601        let batches = if self.spine.contains_key(&now) {
602            // hit the boundary of a batch
603            self.spine.range(..=now).chain(None)
604        } else {
605            // not hit the boundary of a batch, should include the next batch
606            self.spine.range(..=now).chain(
607                self.spine
608                    .range((Bound::Excluded(now), Bound::Unbounded))
609                    .next(),
610            )
611        };
612
613        let mut final_val = None;
614        for (ts, batch) in batches {
615            if let Some(updates) = batch.get(key) {
616                if *ts <= now {
617                    for update in updates {
618                        final_val = compact_diff_row(final_val, update);
619                    }
620                } else {
621                    for update in updates.iter().filter(|(_, ts, _)| *ts <= now) {
622                        final_val = compact_diff_row(final_val, update);
623                    }
624                }
625            }
626        }
627        final_val
628    }
629}
630
631fn compact_diff_row(old_row: Option<DiffRow>, new_row: &DiffRow) -> Option<DiffRow> {
632    let (val, ts, diff) = new_row;
633    match (old_row, diff) {
634        (Some((row, _old_ts, old_diff)), diff) if row == *val && old_diff + diff == 0 => {
635            // the key is deleted now
636            None
637        }
638        (Some((row, _old_ts, old_diff)), diff) if row == *val && old_diff + diff != 0 => {
639            Some((row, *ts, old_diff + *diff))
640        }
641        // if old val not equal new val, simple consider it as being overwritten, for each key can only have one value
642        // so it make sense to just replace the old value with new value
643        _ => Some((val.clone(), *ts, *diff)),
644    }
645}
646
647/// Simply a type alias for ReadGuard of Arrangement
648pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>;
649/// Simply a type alias for WriteGuard of Arrangement
650pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>;
651
652/// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state
653#[derive(Debug, Clone)]
654pub struct ArrangeHandler {
655    inner: Arc<RwLock<Arrangement>>,
656}
657
658impl ArrangeHandler {
659    /// create a new handler from arrangement
660    pub fn from(arr: Arrangement) -> Self {
661        Self {
662            inner: Arc::new(RwLock::new(arr)),
663        }
664    }
665
666    /// write lock the arrangement
667    pub fn write(&self) -> ArrangeWriter<'_> {
668        self.inner.blocking_write()
669    }
670
671    /// read lock the arrangement
672    pub fn read(&self) -> ArrangeReader<'_> {
673        self.inner.blocking_read()
674    }
675
676    /// Clone the handler, but only keep the future updates.
677    ///
678    /// It's a cheap operation, since it's `Arc-ed` and only clone the `Arc`.
679    pub fn clone_future_only(&self) -> Option<Self> {
680        if self.read().is_written {
681            return None;
682        }
683        Some(Self {
684            inner: self.inner.clone(),
685        })
686    }
687
688    /// Clone the handler, but keep all updates.
689    ///
690    /// Prevent illegal clone after the arrange have been written,
691    /// because that will cause loss of data before clone.
692    ///
693    /// It's a cheap operation, since it's `Arc-ed` and only clone the `Arc`.
694    pub fn clone_full_arrange(&self) -> Option<Self> {
695        {
696            let zelf = self.read();
697            if !zelf.full_arrangement && zelf.is_written {
698                return None;
699            }
700        }
701
702        self.write().full_arrangement = true;
703        Some(Self {
704            inner: self.inner.clone(),
705        })
706    }
707
708    pub fn set_full_arrangement(&self, full: bool) {
709        self.write().full_arrangement = full;
710    }
711
712    pub fn is_full_arrangement(&self) -> bool {
713        self.read().full_arrangement
714    }
715}
716
717#[cfg(test)]
718mod test {
719    use std::borrow::Borrow;
720
721    use datatypes::value::Value;
722    use itertools::Itertools;
723
724    use super::*;
725
726    fn lit(v: impl Into<Value>) -> Row {
727        Row::new(vec![v.into()])
728    }
729
730    fn kv(key: impl Borrow<Row>, value: impl Borrow<Row>) -> (Row, Row) {
731        (key.borrow().clone(), value.borrow().clone())
732    }
733
734    #[test]
735    fn test_future_get() {
736        // test if apply only future updates, whether get(future_time) can operate correctly
737        let arr = ArrangeHandler::from(Arrangement::default());
738
739        let mut arr = arr.write();
740
741        let key = lit("a");
742        let updates: Vec<KeyValDiffRow> = vec![
743            (kv(&key, lit("b")), 1 /* ts */, 1 /* diff */),
744            (kv(&key, lit("c")), 2 /* ts */, 1 /* diff */),
745            (kv(&key, lit("d")), 3 /* ts */, 1 /* diff */),
746        ];
747
748        // all updates above are future updates
749        arr.apply_updates(0, updates).unwrap();
750
751        assert_eq!(arr.get(1, &key), Some((lit("b"), 1 /* ts */, 1 /* diff */)));
752        assert_eq!(arr.get(2, &key), Some((lit("c"), 2 /* ts */, 1 /* diff */)));
753        assert_eq!(arr.get(3, &key), Some((lit("d"), 3 /* ts */, 1 /* diff */)));
754    }
755
756    #[test]
757    fn only_save_future_updates() {
758        // mfp operator's temporal filter need to record future updates so that it can delete on time
759        // i.e. insert a record now, delete this record 5 minutes later
760        // they will only need to keep future updates(if downstream don't need full arrangement that is)
761        let arr = ArrangeHandler::from(Arrangement::default());
762
763        {
764            let arr1 = arr.clone_full_arrange();
765            assert!(arr1.is_some());
766            let arr2 = arr.clone_future_only();
767            assert!(arr2.is_some());
768        }
769
770        {
771            let mut arr = arr.write();
772            let updates: Vec<KeyValDiffRow> = vec![
773                (kv(lit("a"), lit("x")), 1 /* ts */, 1 /* diff */),
774                (kv(lit("b"), lit("y")), 2 /* ts */, 1 /* diff */),
775                (kv(lit("c"), lit("z")), 3 /* ts */, 1 /* diff */),
776            ];
777            // all updates above are future updates
778            arr.apply_updates(0, updates).unwrap();
779
780            assert_eq!(
781                arr.get_updates_in_range(1..=1),
782                vec![(kv(lit("a"), lit("x")), 1 /* ts */, 1 /* diff */)]
783            );
784            assert_eq!(arr.spine.len(), 3);
785
786            arr.compact_to(1).unwrap();
787            assert_eq!(arr.spine.len(), 3);
788
789            let key = &lit("a");
790            assert_eq!(arr.get(3, key), Some((lit("x"), 1 /* ts */, 1 /* diff */)));
791            let key = &lit("b");
792            assert_eq!(arr.get(3, key), Some((lit("y"), 2 /* ts */, 1 /* diff */)));
793            let key = &lit("c");
794            assert_eq!(arr.get(3, key), Some((lit("z"), 3 /* ts */, 1 /* diff */)));
795        }
796
797        assert!(arr.clone_future_only().is_none());
798        {
799            let arr2 = arr.clone_full_arrange().unwrap();
800            let mut arr = arr2.write();
801            assert_eq!(arr.spine.len(), 3);
802
803            arr.compact_to(2).unwrap();
804            assert_eq!(arr.spine.len(), 2);
805            let key = &lit("a");
806            assert_eq!(arr.get(3, key), Some((lit("x"), 1 /* ts */, 1 /* diff */)));
807            let key = &lit("b");
808            assert_eq!(arr.get(3, key), Some((lit("y"), 2 /* ts */, 1 /* diff */)));
809            let key = &lit("c");
810            assert_eq!(arr.get(3, key), Some((lit("z"), 3 /* ts */, 1 /* diff */)));
811        }
812    }
813
814    #[test]
815    fn test_reduce_expire_keys() {
816        let mut arr = Arrangement::default();
817        let expire_state = KeyExpiryManager {
818            event_ts_to_key: Default::default(),
819            key_expiration_duration: Some(10),
820            event_timestamp_from_row: Some(ScalarExpr::Column(0)),
821        };
822        arr.expire_state = Some(expire_state);
823        arr.full_arrangement = true;
824
825        let arr = ArrangeHandler::from(arr);
826
827        let updates: Vec<KeyValDiffRow> = vec![
828            (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */),
829            (kv(lit(2i64), lit("y")), 2 /* ts */, 1 /* diff */),
830            (kv(lit(3i64), lit("z")), 3 /* ts */, 1 /* diff */),
831        ];
832        {
833            let mut arr = arr.write();
834            arr.apply_updates(0, updates.clone()).unwrap();
835            // repeat the same updates means having multiple updates for the same key
836            arr.apply_updates(0, updates).unwrap();
837
838            assert_eq!(
839                arr.get_updates_in_range(1..=1),
840                vec![
841                    (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */),
842                    (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */)
843                ]
844            );
845            assert_eq!(arr.spine.len(), 3);
846            arr.compact_to(1).unwrap();
847            assert_eq!(arr.spine.len(), 3);
848        }
849
850        {
851            let mut arr = arr.write();
852            assert_eq!(arr.spine.len(), 3);
853
854            arr.truncate_expired_keys(11);
855            assert_eq!(arr.spine.len(), 3);
856            let key = &lit(1i64);
857            assert_eq!(arr.get(11, key), Some((lit("x"), 1 /* ts */, 2 /* diff */)));
858            let key = &lit(2i64);
859            assert_eq!(arr.get(11, key), Some((lit("y"), 2 /* ts */, 2 /* diff */)));
860            let key = &lit(3i64);
861            assert_eq!(arr.get(11, key), Some((lit("z"), 3 /* ts */, 2 /* diff */)));
862
863            arr.truncate_expired_keys(12);
864            assert_eq!(arr.spine.len(), 3);
865            let key = &lit(1i64);
866            assert_eq!(arr.get(12, key), None);
867            let key = &lit(2i64);
868            assert_eq!(arr.get(12, key), Some((lit("y"), 2 /* ts */, 2 /* diff */)));
869            let key = &lit(3i64);
870            assert_eq!(arr.get(12, key), Some((lit("z"), 3 /* ts */, 2 /* diff */)));
871            assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 2);
872
873            arr.truncate_expired_keys(13);
874            assert_eq!(arr.spine.len(), 3);
875            let key = &lit(1i64);
876            assert_eq!(arr.get(13, key), None);
877            let key = &lit(2i64);
878            assert_eq!(arr.get(13, key), None);
879            let key = &lit(3i64);
880            assert_eq!(arr.get(13, key), Some((lit("z"), 3 /* ts */, 2 /* diff */)));
881            assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 1);
882        }
883    }
884
885    #[test]
886    fn test_apply_expired_keys() {
887        // apply updates with a expired key
888        let mut arr = Arrangement::default();
889        let expire_state = KeyExpiryManager {
890            event_ts_to_key: Default::default(),
891            key_expiration_duration: Some(10),
892            event_timestamp_from_row: Some(ScalarExpr::Column(0)),
893        };
894        arr.expire_state = Some(expire_state);
895
896        let arr = ArrangeHandler::from(arr);
897
898        let updates: Vec<KeyValDiffRow> = vec![
899            (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */),
900            (kv(lit(2i64), lit("y")), 2 /* ts */, 1 /* diff */),
901        ];
902        {
903            let mut arr = arr.write();
904            let expired_by = arr.apply_updates(12, updates).unwrap();
905            assert_eq!(expired_by, Some(1));
906
907            let key = &lit(1i64);
908            assert_eq!(arr.get(12, key), None);
909            let key = &lit(2i64);
910            assert_eq!(arr.get(12, key), Some((lit("y"), 2 /* ts */, 1 /* diff */)));
911        }
912    }
913
914    /// test if split_spine_le get ranges that are not aligned with batch boundaries
915    /// this split_spine_le can correctly retrieve all updates in the range, including updates that are in the batches
916    /// near the boundary of input range
917    #[test]
918    fn test_split_off() {
919        let mut arr = Arrangement::default();
920        // manually create batch ..=1 and 2..=3
921        arr.spine.insert(1, Batch::default());
922        arr.spine.insert(3, Batch::default());
923
924        let updates = vec![(kv(lit("a"), lit("x")), 2 /* ts */, 1 /* diff */)];
925        // updates falls into the range of 2..=3
926        arr.apply_updates(2, updates).unwrap();
927
928        let mut arr1 = arr.clone();
929        {
930            assert_eq!(arr.get_next_update_time(&1), Some(2));
931            // split expect to take batch ..=1 and create a new batch 2..=2 (which contains update)
932            let split = &arr.split_spine_le(&2);
933            assert_eq!(split.len(), 2);
934            assert_eq!(split[&2].len(), 1);
935
936            assert_eq!(arr.get_next_update_time(&1), None);
937        }
938
939        {
940            // take all updates with timestamp <=1, will get no updates
941            let split = &arr1.split_spine_le(&1);
942            assert_eq!(split.len(), 1);
943            assert_eq!(split[&1].len(), 0);
944        }
945    }
946
947    /// test if get ranges is not aligned with boundary of batch,
948    /// whether can get correct result
949    #[test]
950    fn test_get_by_range() {
951        let mut arr = Arrangement::default();
952
953        // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
954        // TODO(discord9): manually set batch
955        let updates: Vec<KeyValDiffRow> = vec![
956            (kv(lit("a"), lit("")), 2 /* ts */, 1 /* diff */),
957            (kv(lit("a"), lit("")), 1 /* ts */, 1 /* diff */),
958            (kv(lit("b"), lit("")), 4 /* ts */, 1 /* diff */),
959            (kv(lit("c"), lit("")), 3 /* ts */, 1 /* diff */),
960            (kv(lit("c"), lit("")), 6 /* ts */, 1 /* diff */),
961            (kv(lit("a"), lit("")), 5 /* ts */, 1 /* diff */),
962        ];
963        arr.apply_updates(0, updates).unwrap();
964        assert_eq!(
965            arr.get_updates_in_range(2..=5),
966            vec![
967                (kv(lit("a"), lit("")), 2 /* ts */, 1 /* diff */),
968                (kv(lit("b"), lit("")), 4 /* ts */, 1 /* diff */),
969                (kv(lit("c"), lit("")), 3 /* ts */, 1 /* diff */),
970                (kv(lit("a"), lit("")), 5 /* ts */, 1 /* diff */),
971            ]
972        );
973    }
974
975    /// test if get with range unaligned with batch boundary
976    /// can get correct result
977    #[test]
978    fn test_get_unaligned() {
979        let mut arr = Arrangement::default();
980
981        // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
982        // TODO(discord9): manually set batch
983        let key = &lit("a");
984        let updates: Vec<KeyValDiffRow> = vec![
985            (kv(key, lit(1)), 2 /* ts */, 1 /* diff */),
986            (kv(key, lit(2)), 1 /* ts */, 1 /* diff */),
987            (kv(key, lit(3)), 4 /* ts */, 1 /* diff */),
988            (kv(key, lit(4)), 3 /* ts */, 1 /* diff */),
989            (kv(key, lit(5)), 6 /* ts */, 1 /* diff */),
990            (kv(key, lit(6)), 5 /* ts */, 1 /* diff */),
991        ];
992        arr.apply_updates(0, updates).unwrap();
993        // aligned with batch boundary
994        assert_eq!(arr.get(2, key), Some((lit(1), 2 /* ts */, 1 /* diff */)));
995        // unaligned with batch boundary
996        assert_eq!(arr.get(3, key), Some((lit(4), 3 /* ts */, 1 /* diff */)));
997    }
998
999    /// test if out of order updates can be sorted correctly
1000    #[test]
1001    fn test_out_of_order_apply_updates() {
1002        let mut arr = Arrangement::default();
1003
1004        let key = &lit("a");
1005        let updates: Vec<KeyValDiffRow> = vec![
1006            (kv(key, lit(5)), 6 /* ts */, 1 /* diff */),
1007            (kv(key, lit(2)), 2 /* ts */, -1 /* diff */),
1008            (kv(key, lit(1)), 2 /* ts */, 1 /* diff */),
1009            (kv(key, lit(2)), 1 /* ts */, 1 /* diff */),
1010            (kv(key, lit(3)), 4 /* ts */, 1 /* diff */),
1011            (kv(key, lit(4)), 3 /* ts */, 1 /* diff */),
1012            (kv(key, lit(6)), 5 /* ts */, 1 /* diff */),
1013        ];
1014        arr.apply_updates(0, updates.clone()).unwrap();
1015        let sorted = updates
1016            .iter()
1017            .sorted_by_key(|(_, ts, _)| *ts)
1018            .cloned()
1019            .collect_vec();
1020        assert_eq!(arr.get_updates_in_range(1..7), sorted);
1021    }
1022
1023    #[test]
1024    fn test_full_arrangement_get_from_first_entry() {
1025        let mut arr = Arrangement::default();
1026        // will form {3: [1, 2, 3]}
1027        let updates = vec![
1028            (kv(lit("a"), lit("x")), 3 /* ts */, 1 /* diff */),
1029            (kv(lit("b"), lit("y")), 1 /* ts */, 1 /* diff */),
1030            (kv(lit("b"), lit("y")), 2 /* ts */, -1 /* diff */),
1031        ];
1032        arr.apply_updates(0, updates).unwrap();
1033        assert_eq!(arr.get(2, &lit("b")), None /* deleted */);
1034        arr.full_arrangement = true;
1035        assert_eq!(arr.get(2, &lit("b")), None /* still deleted */);
1036
1037        arr.compact_to(1).unwrap();
1038
1039        assert_eq!(
1040            arr.get(1, &lit("b")),
1041            Some((lit("y"), 1, 1)) /* fast path */
1042        );
1043    }
1044}