pub struct Context<'referred, 'df> {
pub id: GlobalId,
pub df: &'referred mut Hydroflow<'df>,
pub compute_state: &'referred mut DataflowState,
pub input_collection: BTreeMap<GlobalId, CollectionBundle>,
pub local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
pub input_collection_batch: BTreeMap<GlobalId, CollectionBundle<Batch>>,
pub local_scope_batch: Vec<BTreeMap<LocalId, CollectionBundle<Batch>>>,
pub err_collector: ErrCollector,
}
Expand description
The Context for build a Operator with id of GlobalId
Fields§
§id: GlobalId
§df: &'referred mut Hydroflow<'df>
§compute_state: &'referred mut DataflowState
§input_collection: BTreeMap<GlobalId, CollectionBundle>
a list of all collections being used in the operator
TODO(discord9): remove extra clone by counting usage and remove it on last usage?
local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>
used by Get
/Let
Plan for getting/setting local variables
TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead
input_collection_batch: BTreeMap<GlobalId, CollectionBundle<Batch>>
a list of all collections being used in the operator
TODO(discord9): remove extra clone by counting usage and remove it on last usage?
local_scope_batch: Vec<BTreeMap<LocalId, CollectionBundle<Batch>>>
used by Get
/Let
Plan for getting/setting local variables
TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead
err_collector: ErrCollector
Implementations§
Source§impl Context<'_, '_>
impl Context<'_, '_>
Sourcepub fn render_mfp_batch(
&mut self,
input: Box<TypedPlan>,
mfp: MapFilterProject,
_output_type: &RelationType,
) -> Result<CollectionBundle<Batch>, Error>
pub fn render_mfp_batch( &mut self, input: Box<TypedPlan>, mfp: MapFilterProject, _output_type: &RelationType, ) -> Result<CollectionBundle<Batch>, Error>
Like render_mfp
but in batch mode
Sourcepub fn render_mfp(
&mut self,
input: Box<TypedPlan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle, Error>
pub fn render_mfp( &mut self, input: Box<TypedPlan>, mfp: MapFilterProject, ) -> Result<CollectionBundle, Error>
render MapFilterProject, will only emit the rows
once. Assume all incoming row’s sys time being `now`` and ignore the row’s stated sys time
TODO(discord9): schedule mfp operator to run when temporal filter need
MapFilterProject
(mfp
for short) is scheduled to run when there is enough amount of input updates
or when a future update in it’s output buffer(a Arrangement
) is supposed to emit now.
Source§impl Context<'_, '_>
impl Context<'_, '_>
const REDUCE_BATCH: &'static str = "reduce_batch"
const REDUCE: &'static str = "reduce"
Sourcepub fn render_reduce_batch(
&mut self,
input: Box<TypedPlan>,
key_val_plan: &KeyValPlan,
reduce_plan: &ReducePlan,
output_type: &RelationType,
) -> Result<CollectionBundle<Batch>, Error>
pub fn render_reduce_batch( &mut self, input: Box<TypedPlan>, key_val_plan: &KeyValPlan, reduce_plan: &ReducePlan, output_type: &RelationType, ) -> Result<CollectionBundle<Batch>, Error>
Like render_reduce
, but for batch mode, and only barebone implementation
no support for distinct aggregation for now
Sourcepub fn render_reduce(
&mut self,
input: Box<TypedPlan>,
key_val_plan: KeyValPlan,
reduce_plan: ReducePlan,
output_type: RelationType,
) -> Result<CollectionBundle, Error>
pub fn render_reduce( &mut self, input: Box<TypedPlan>, key_val_plan: KeyValPlan, reduce_plan: ReducePlan, output_type: RelationType, ) -> Result<CollectionBundle, Error>
render Plan::Reduce
into executable dataflow
Sourcefn add_accum_distinct_input_arrange(
&mut self,
reduce_plan: &ReducePlan,
) -> Option<Vec<ArrangeHandler>>
fn add_accum_distinct_input_arrange( &mut self, reduce_plan: &ReducePlan, ) -> Option<Vec<ArrangeHandler>>
Contrast to it name, it’s for adding distinct input for
accumulable reduce plan with distinct input,
like select COUNT(DISTINCT col) from table
The return value is optional a list of arrangement, which is created for distinct input, and should be the same length as the distinct aggregation in accumulable reduce plan
Source§impl Context<'_, '_>
impl Context<'_, '_>
Sourcepub fn render_source_batch(
&mut self,
src_recv: Receiver<Batch>,
) -> Result<CollectionBundle<Batch>, Error>
pub fn render_source_batch( &mut self, src_recv: Receiver<Batch>, ) -> Result<CollectionBundle<Batch>, Error>
simply send the batch to downstream, without fancy features like buffering
Sourcepub fn render_source(
&mut self,
src_recv: Receiver<(Row, i64, i64)>,
) -> Result<CollectionBundle, Error>
pub fn render_source( &mut self, src_recv: Receiver<(Row, i64, i64)>, ) -> Result<CollectionBundle, Error>
Render a source which comes from brocast channel into the dataflow
will immediately send updates not greater than now
and buffer the rest in arrangement
pub fn render_unbounded_sink_batch( &mut self, bundle: CollectionBundle<Batch>, sender: UnboundedSender<Batch>, )
pub fn render_unbounded_sink( &mut self, bundle: CollectionBundle, sender: UnboundedSender<(Row, i64, i64)>, )
Source§impl Context<'_, '_>
impl Context<'_, '_>
pub fn insert_global(&mut self, id: GlobalId, collection: CollectionBundle)
pub fn insert_local(&mut self, id: LocalId, collection: CollectionBundle)
pub fn insert_global_batch( &mut self, id: GlobalId, collection: CollectionBundle<Batch>, )
pub fn insert_local_batch( &mut self, id: LocalId, collection: CollectionBundle<Batch>, )
Source§impl Context<'_, '_>
impl Context<'_, '_>
Sourcepub fn render_plan_batch(
&mut self,
plan: TypedPlan,
) -> Result<CollectionBundle<Batch>, Error>
pub fn render_plan_batch( &mut self, plan: TypedPlan, ) -> Result<CollectionBundle<Batch>, Error>
Like render_plan
but in Batch Mode
Sourcepub fn render_plan(
&mut self,
plan: TypedPlan,
) -> Result<CollectionBundle, Error>
pub fn render_plan( &mut self, plan: TypedPlan, ) -> Result<CollectionBundle, Error>
Interpret plan to dataflow and prepare them for execution
return the output handler of this plan
Sourcepub fn render_constant_batch(
&mut self,
rows: Vec<(Row, i64, i64)>,
output_type: &RelationType,
) -> CollectionBundle<Batch>
pub fn render_constant_batch( &mut self, rows: Vec<(Row, i64, i64)>, output_type: &RelationType, ) -> CollectionBundle<Batch>
render Constant, take all rows that have a timestamp not greater than the current time This function is primarily used for testing Always assume input is sorted by timestamp
Sourcepub fn render_constant(
&mut self,
rows: Vec<(Row, i64, i64)>,
) -> CollectionBundle
pub fn render_constant( &mut self, rows: Vec<(Row, i64, i64)>, ) -> CollectionBundle
render Constant, take all rows that have a timestamp not greater than the current time
Always assume input is sorted by timestamp
pub fn get_batch_by_id( &mut self, id: Id, ) -> Result<CollectionBundle<Batch>, Error>
pub fn get_by_id(&mut self, id: Id) -> Result<CollectionBundle, Error>
Trait Implementations§
Auto Trait Implementations§
impl<'referred, 'df> Freeze for Context<'referred, 'df>
impl<'referred, 'df> !RefUnwindSafe for Context<'referred, 'df>
impl<'referred, 'df> !Send for Context<'referred, 'df>
impl<'referred, 'df> !Sync for Context<'referred, 'df>
impl<'referred, 'df> Unpin for Context<'referred, 'df>
impl<'referred, 'df> !UnwindSafe for Context<'referred, 'df>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T, V> Convert<T> for Vwhere
V: Into<T>,
impl<T, V> Convert<T> for Vwhere
V: Into<T>,
fn convert(value: Self) -> T
fn convert_box(value: Box<Self>) -> Box<T>
fn convert_vec(value: Vec<Self>) -> Vec<T>
fn convert_vec_box(value: Vec<Box<Self>>) -> Vec<Box<T>>
fn convert_matrix(value: Vec<Vec<Self>>) -> Vec<Vec<T>>
fn convert_option(value: Option<Self>) -> Option<T>
fn convert_option_box(value: Option<Box<Self>>) -> Option<Box<T>>
fn convert_option_vec(value: Option<Vec<Self>>) -> Option<Vec<T>>
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
self
from the equivalent element of its
superset. Read more§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
self
is actually part of its subset T
(and can be converted to it).§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
self.to_subset
but without any property checks. Always succeeds.§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
self
to the equivalent element of its superset.§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.