pub struct Arrangement {
name: Vec<String>,
spine: BTreeMap<i64, BTreeMap<Row, SmallVec<[(Row, i64, i64); 2]>>>,
full_arrangement: bool,
is_written: bool,
expire_state: Option<KeyExpiryManager>,
last_compaction_time: Option<i64>,
estimated_size: usize,
last_size_update: Instant,
size_update_interval: Duration,
}
Expand description
A shared state of key-value pair for various state in dataflow execution.
i.e: Mfp operator with temporal filter need to store it’s future output so that it can add now, and delete later.
To get all needed updates in a time span, use [get_updates_in_range
].
And reduce operator need full state of it’s output, so that it can query (and modify by calling [apply_updates
])
existing state, also need a way to expire keys. To get a key’s current value, use [get
] with time being now
so it’s like:
mfp operator -> arrange(store futures only, no expire) -> reduce operator <-> arrange(full, with key expiring time) -> output
Note the two way arrow between reduce operator and arrange, it’s because reduce operator need to query existing state and also need to update existing state.
Fields§
§name: Vec<String>
A name or identifier for the arrangement which can be used for debugging or logging purposes. This field is not critical to the functionality but aids in monitoring and management of arrangements.
spine: BTreeMap<i64, BTreeMap<Row, SmallVec<[(Row, i64, i64); 2]>>>
Manages a collection of pending updates in a BTreeMap
where each key is a timestamp and each value is a Batch
of updates.
Updates are grouped into batched based on their timestamps.
Each batch covers a range of time from the last key (exclusive) to the current key (inclusive).
- Updates with a timestamp (
update_ts
) that falls between two keys are placed in the batch of the higher key. For example, if the keys are1, 5, 7, 9
andupdate_ts
is6
, the update goes into the batch with key7
. - Updates with a timestamp before the first key are categorized under the first key.
- Updates with a timestamp greater than the highest key result in a new batch being created with that timestamp as the key.
The first key represents the current state and includes consolidated updates from the past. It is always set to now
.
Each key should have only one update per batch with a diff=1
for the batch representing the current time (now
).
Since updates typically occur as a delete followed by an insert, a small vector of size 2 is used to store updates for efficiency.
TODO(discord9): Consider balancing the batch size?
full_arrangement: bool
Indicates whether the arrangement maintains a complete history of updates.
true
: Maintains all past and future updates, necessary for full state reconstruction at any point in time.false
: Only future updates are retained, optimizing for scenarios where past state is irrelevant and conserving resources. Useful for case likemap -> arrange -> reduce
.
is_written: bool
Indicates whether the arrangement has been modified since its creation.
true
: The arrangement has been written to, meaning it has received updates. Cloning this arrangement is generally unsafe as it may lead to inconsistencies if the clone is modified independently. However, cloning is safe when both the original and the clone require a full arrangement, as this ensures consistency.false
: The arrangement is in its initial state and has not been modified. It can be safely cloned and shared without concerns of carrying over unintended state changes.
expire_state: Option<KeyExpiryManager>
Manage the expire state of the arrangement.
last_compaction_time: Option<i64>
The time that the last compaction happened, also known as the current time.
estimated_size: usize
Estimated size of the arrangement in heap size.
last_size_update: Instant
§size_update_interval: Duration
Implementations§
Source§impl Arrangement
impl Arrangement
fn compute_size(&self) -> usize
fn update_and_fetch_size(&mut self) -> usize
Source§impl Arrangement
impl Arrangement
pub fn new_with_name(name: Vec<String>) -> Self
pub fn get_expire_state(&self) -> Option<&KeyExpiryManager>
pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager)
Sourcepub fn apply_updates(
&mut self,
now: i64,
updates: Vec<((Row, Row), i64, i64)>,
) -> Result<Option<i64>, EvalError>
pub fn apply_updates( &mut self, now: i64, updates: Vec<((Row, Row), i64, i64)>, ) -> Result<Option<i64>, EvalError>
Apply updates into spine, with no respect of whether the updates are in futures, past, or now.
Return the maximum expire time (already expire by how much time) of all updates if any keys is already expired.
Sourcepub fn get_next_update_time(&self, now: &i64) -> Option<i64>
pub fn get_next_update_time(&self, now: &i64) -> Option<i64>
Find out the time of next update in the future that is the next update with timestamp > now
.
Sourcepub fn last_compaction_time(&self) -> Option<i64>
pub fn last_compaction_time(&self) -> Option<i64>
Get the last compaction time.
Sourcefn split_spine_le(
&mut self,
split_ts: &i64,
) -> BTreeMap<i64, BTreeMap<Row, SmallVec<[(Row, i64, i64); 2]>>>
fn split_spine_le( &mut self, split_ts: &i64, ) -> BTreeMap<i64, BTreeMap<Row, SmallVec<[(Row, i64, i64); 2]>>>
Split spine off at split_ts
, and return the spine that’s before split_ts
(including split_ts
).
Sourcefn split_batch_at(&mut self, split_ts: &i64)
fn split_batch_at(&mut self, split_ts: &i64)
Split the batch at split_ts
into two parts.
Sourcepub fn compact_to(&mut self, now: i64) -> Result<Option<i64>, EvalError>
pub fn compact_to(&mut self, now: i64) -> Result<Option<i64>, EvalError>
Advance time to now
and consolidate all older (now
included) updates to the first key.
Return the maximum expire time(already expire by how much time) of all updates if any keys is already expired.
Sourcepub fn get_updates_in_range<R: RangeBounds<i64> + Clone>(
&self,
range: R,
) -> Vec<((Row, Row), i64, i64)>
pub fn get_updates_in_range<R: RangeBounds<i64> + Clone>( &self, range: R, ) -> Vec<((Row, Row), i64, i64)>
Get the updates of the arrangement from the given range of time.
Sourcepub fn truncate_expired_keys(&mut self, now: i64)
pub fn truncate_expired_keys(&mut self, now: i64)
Expire keys in now that are older than expire_time, intended for reducing memory usage and limit late data arrive
Trait Implementations§
Source§impl Clone for Arrangement
impl Clone for Arrangement
Source§fn clone(&self) -> Arrangement
fn clone(&self) -> Arrangement
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreSource§impl Debug for Arrangement
impl Debug for Arrangement
Source§impl Default for Arrangement
impl Default for Arrangement
Source§impl GetSize for Arrangement
impl GetSize for Arrangement
Source§fn get_heap_size(&self) -> usize
fn get_heap_size(&self) -> usize
§fn get_stack_size() -> usize
fn get_stack_size() -> usize
§fn get_heap_size_with_tracker<T>(&self, tracker: T) -> (usize, T)where
T: GetSizeTracker,
fn get_heap_size_with_tracker<T>(&self, tracker: T) -> (usize, T)where
T: GetSizeTracker,
tracker
. Read more§fn get_size_with_tracker<T>(&self, tracker: T) -> (usize, T)where
T: GetSizeTracker,
fn get_size_with_tracker<T>(&self, tracker: T) -> (usize, T)where
T: GetSizeTracker,
tracker
. Read moreSource§impl Ord for Arrangement
impl Ord for Arrangement
Source§fn cmp(&self, other: &Arrangement) -> Ordering
fn cmp(&self, other: &Arrangement) -> Ordering
1.21.0 · Source§fn max(self, other: Self) -> Selfwhere
Self: Sized,
fn max(self, other: Self) -> Selfwhere
Self: Sized,
Source§impl PartialEq for Arrangement
impl PartialEq for Arrangement
Source§impl PartialOrd for Arrangement
impl PartialOrd for Arrangement
impl Eq for Arrangement
impl StructuralPartialEq for Arrangement
Auto Trait Implementations§
impl !Freeze for Arrangement
impl !RefUnwindSafe for Arrangement
impl Send for Arrangement
impl Sync for Arrangement
impl Unpin for Arrangement
impl !UnwindSafe for Arrangement
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<Q, K> Comparable<K> for Q
impl<Q, K> Comparable<K> for Q
§impl<T> Conv for T
impl<T> Conv for T
§impl<T, V> Convert<T> for Vwhere
V: Into<T>,
impl<T, V> Convert<T> for Vwhere
V: Into<T>,
fn convert(value: Self) -> T
fn convert_box(value: Box<Self>) -> Box<T>
fn convert_vec(value: Vec<Self>) -> Vec<T>
fn convert_vec_box(value: Vec<Box<Self>>) -> Vec<Box<T>>
fn convert_matrix(value: Vec<Vec<Self>>) -> Vec<Vec<T>>
fn convert_option(value: Option<Self>) -> Option<T>
fn convert_option_box(value: Option<Box<Self>>) -> Option<Box<T>>
fn convert_option_vec(value: Option<Vec<Self>>) -> Option<Vec<T>>
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
§fn equivalent(&self, key: &K) -> bool
fn equivalent(&self, key: &K) -> bool
key
and return true
if they are equal.Source§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
Source§fn equivalent(&self, key: &K) -> bool
fn equivalent(&self, key: &K) -> bool
key
and return true
if they are equal.§impl<Q, K> Equivalent<K> for Q
impl<Q, K> Equivalent<K> for Q
§fn equivalent(&self, key: &K) -> bool
fn equivalent(&self, key: &K) -> bool
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
self
from the equivalent element of its
superset. Read more§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
self
is actually part of its subset T
(and can be converted to it).§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
self.to_subset
but without any property checks. Always succeeds.§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
self
to the equivalent element of its superset.§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.