flow

Struct FlowWorkerManager

Source
pub struct FlowWorkerManager {
Show 13 fields pub worker_handles: Vec<WorkerHandle>, worker_selector: Mutex<usize>, pub query_engine: Arc<dyn QueryEngine>, table_info_source: ManagedTableSource, frontend_invoker: RwLock<Option<FrontendInvoker>>, node_context: RwLock<FlownodeContext>, refill_tasks: RwLock<BTreeMap<u64, RefillTask>>, flow_err_collectors: RwLock<BTreeMap<u64, ErrCollector>>, src_send_buf_lens: RwLock<BTreeMap<TableId, Receiver<usize>>>, tick_manager: FlowTickManager, node_id: Option<u32>, flush_lock: RwLock<()>, state_report_handler: RwLock<Option<Receiver<Sender<FlowStat>>>>,
}
Expand description

FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread

The choice of timestamp is just using current system timestamp for now

Fields§

§worker_handles: Vec<WorkerHandle>

The handler to the worker that will run the dataflow which is !Send so a handle is used

§worker_selector: Mutex<usize>

The selector to select a worker to run the dataflow

§query_engine: Arc<dyn QueryEngine>

The query engine that will be used to parse the query and convert it to a dataflow plan

§table_info_source: ManagedTableSource

Getting table name and table schema from table info manager

§frontend_invoker: RwLock<Option<FrontendInvoker>>§node_context: RwLock<FlownodeContext>

contains mapping from table name to global id, and table schema

§refill_tasks: RwLock<BTreeMap<u64, RefillTask>>

Contains all refill tasks

§flow_err_collectors: RwLock<BTreeMap<u64, ErrCollector>>§src_send_buf_lens: RwLock<BTreeMap<TableId, Receiver<usize>>>§tick_manager: FlowTickManager§node_id: Option<u32>§flush_lock: RwLock<()>

Lock for flushing, will be read by handle_inserts and write by flush_flow

So that a series of event like inserts -> flush can be handled correctly

§state_report_handler: RwLock<Option<Receiver<Sender<FlowStat>>>>

receive a oneshot sender to send state size report

Implementations§

Source§

impl FlowWorkerManager

Source

pub async fn create_and_start_refill_flow_tasks( self: &FlowWorkerManagerRef, flow_metadata_manager: &FlowMetadataManagerRef, catalog_manager: &CatalogManagerRef, ) -> Result<(), Error>

Create and start refill flow tasks in background

Source

pub async fn create_refill_flow_tasks( &self, flow_metadata_manager: &FlowMetadataManagerRef, catalog_manager: &CatalogManagerRef, ) -> Result<Vec<RefillTask>, Error>

Create a series of tasks to refill flow

Source

pub(crate) async fn starting_refill_flows( self: &FlowWorkerManagerRef, tasks: Vec<RefillTask>, ) -> Result<(), Error>

Starting to refill flows, if any error occurs, will rebuild the flow and retry

Source§

impl FlowWorkerManager

Source

pub async fn gen_state_report(&self) -> FlowStat

Source§

impl FlowWorkerManager

Source

pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle

Get a worker handle for creating flow, using round robin to select a worker

Source

pub(crate) async fn create_table_from_relation( &self, flow_name: &str, table_name: &[String; 3], relation_desc: &RelationDesc, ) -> Result<bool, Error>

Create table from given schema(will adjust to add auto column if needed), return true if table is created

Source

pub(crate) async fn try_fetch_existing_table( &self, table_name: &[String; 3], ) -> Result<Option<(bool, Vec<ColumnSchema>)>, Error>

Try fetch table with adjusted schema(added auto column if needed)

Source

pub(crate) async fn submit_create_sink_table_ddl( &self, create_table: CreateTableExpr, ) -> Result<(), Error>

submit a create table ddl

Source§

impl FlowWorkerManager

Building FlownodeManager

Source

pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker)

set frontend invoker

Source

pub fn new( node_id: Option<u32>, query_engine: Arc<dyn QueryEngine>, table_meta: TableMetadataManagerRef, ) -> Self

Create without setting frontend_invoker

Source

pub async fn with_state_report_handler( self, handler: Receiver<Sender<FlowStat>>, ) -> Self

Source

pub fn new_with_workers<'s>( node_id: Option<u32>, query_engine: Arc<dyn QueryEngine>, table_meta: TableMetadataManagerRef, num_workers: usize, ) -> (Self, Vec<Worker<'s>>)

Create a flownode manager with one worker

Source

pub fn add_worker_handle(&mut self, handle: WorkerHandle)

add a worker handler to manager, meaning this corresponding worker is under it’s manage

Source§

impl FlowWorkerManager

This impl block contains methods to send writeback requests to frontend

Source

pub async fn send_writeback_requests(&self) -> Result<usize, Error>

Return the number of requests it made

Source

pub async fn generate_writeback_request( &self, ) -> Result<BTreeMap<[String; 3], Vec<DiffRequest>>, Error>

Generate writeback request for all sink table

Source

async fn fetch_table_pk_schema( &self, table_name: &[String; 3], ) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error>

Fetch table schema and primary key from table info source, if table not exist return None

Source

async fn adjust_auto_created_table_schema( &self, schema: &RelationDesc, ) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error>

return (primary keys, schema and if the table have a placeholder timestamp column) schema of the table comes from flow’s output plan

adjust to add update_at column and ts placeholder if needed

Source§

impl FlowWorkerManager

Flow Runtime related methods

Source

async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>>

Start state report handler, which will receive a sender from HeartbeatTask to send state size report back

if heartbeat task is shutdown, this future will exit too

Source

pub fn run_background( self: Arc<Self>, shutdown: Option<Receiver<()>>, ) -> JoinHandle<()>

run in common_runtime background runtime

Source

pub async fn log_all_errors(&self)

log all flow errors

Source

pub async fn run(&self, shutdown: Option<Receiver<()>>)

Trigger dataflow running, and then send writeback request to the source sender

note that this method didn’t handle input mirror request, as this should be handled by grpc server

Source

pub async fn run_available(&self, blocking: bool) -> Result<usize, Error>

Run all available subgraph in the flow node This will try to run all dataflow in this node

set blocking to true to wait until worker finish running false to just trigger run and return immediately return numbers of rows send to worker(Inaccuary) TODO(discord9): add flag for subgraph that have input since last run

Source

pub async fn handle_write_request( &self, region_id: RegionId, rows: Vec<(Row, i64, i64)>, batch_datatypes: &[ConcreteDataType], ) -> Result<(), Error>

send write request to related source sender

Source§

impl FlowWorkerManager

Create&Remove flow

Source

pub async fn remove_flow(&self, flow_id: u64) -> Result<(), Error>

remove a flow by it’s id

Source

pub async fn create_flow( &self, args: CreateFlowArgs, ) -> Result<Option<u64>, Error>

Return task id if a new task is created, otherwise return None

steps to create task:

  1. parse query into typed plan(and optional parse expire_after expr)
  2. render source/sink with output table id and used input table id

Trait Implementations§

Source§

impl Flownode for FlowWorkerManager

Source§

fn handle<'life0, 'async_trait>( &'life0 self, request: FlowRequest, ) -> Pin<Box<dyn Future<Output = Result<FlowResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn handle_inserts<'life0, 'async_trait>( &'life0 self, request: InsertRequests, ) -> Pin<Box<dyn Future<Output = Result<FlowResponse>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Conv for T

§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
§

impl<T, V> Convert<T> for V
where V: Into<T>,

§

fn convert(value: Self) -> T

§

fn convert_box(value: Box<Self>) -> Box<T>

§

fn convert_vec(value: Vec<Self>) -> Vec<T>

§

fn convert_vec_box(value: Vec<Box<Self>>) -> Vec<Box<T>>

§

fn convert_matrix(value: Vec<Vec<Self>>) -> Vec<Vec<T>>

§

fn convert_option(value: Option<Self>) -> Option<T>

§

fn convert_option_box(value: Option<Box<Self>>) -> Option<Box<T>>

§

fn convert_option_vec(value: Option<Vec<Self>>) -> Option<Vec<T>>

§

impl<Choices> CoproductSubsetter<CNil, HNil> for Choices

§

type Remainder = Choices

§

fn subset( self, ) -> Result<CNil, <Choices as CoproductSubsetter<CNil, HNil>>::Remainder>

Extract a subset of the possible types in a coproduct (or get the remaining possibilities) Read more
§

impl<T> FmtForward for T

§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
§

impl<T, U, I> LiftInto<U, I> for T
where U: LiftFrom<T, I>,

§

fn lift_into(self) -> U

Performs the indexed conversion.
§

impl<T> Pipe for T
where T: ?Sized,

§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
§

impl<T> Pointable for T

§

const ALIGN: usize

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<Source> Sculptor<HNil, HNil> for Source

§

type Remainder = Source

§

fn sculpt(self) -> (HNil, <Source as Sculptor<HNil, HNil>>::Remainder)

Consumes the current HList and returns an HList with the requested shape. Read more
§

impl<SS, SP> SupersetOf<SS> for SP
where SS: SubsetOf<SP>,

§

fn to_subset(&self) -> Option<SS>

The inverse inclusion map: attempts to construct self from the equivalent element of its superset. Read more
§

fn is_in_subset(&self) -> bool

Checks if self is actually part of its subset T (and can be converted to it).
§

fn to_subset_unchecked(&self) -> SS

Use with care! Same as self.to_subset but without any property checks. Always succeeds.
§

fn from_subset(element: &SS) -> SP

The inclusion map: converts self to the equivalent element of its superset.
§

impl<T> Tap for T

§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
§

impl<T> TryConv for T

§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<G1, G2> Within<G2> for G1
where G2: Contains<G1>,

§

fn is_within(&self, b: &G2) -> bool

§

impl<T> Any for T
where T: Any,

§

impl<T> ErasedDestructor for T
where T: 'static,

§

impl<T> MaybeSend for T
where T: Send,

§

impl<T> MaybeSendSync for T