pub struct Inserter {
catalog_manager: CatalogManagerRef,
pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef,
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
}
Fields§
§catalog_manager: CatalogManagerRef
§partition_manager: PartitionRuleManagerRef
§node_manager: NodeManagerRef
§table_flownode_set_cache: TableFlownodeSetCacheRef
Implementations§
Source§impl Inserter
impl Inserter
Sourcepub async fn handle_bulk_insert(
&self,
table: TableRef,
decoder: &mut FlightDecoder,
data: FlightData,
) -> Result<AffectedRows>
pub async fn handle_bulk_insert( &self, table: TableRef, decoder: &mut FlightDecoder, data: FlightData, ) -> Result<AffectedRows>
Handle bulk insert request.
fn maybe_update_flow_dirty_window( &self, table_info: TableInfoRef, record_batch: RecordBatch, )
Source§impl Inserter
impl Inserter
pub fn new( catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, table_flownode_set_cache: TableFlownodeSetCacheRef, ) -> Self
pub async fn handle_column_inserts( &self, requests: InsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, ) -> Result<Output>
Sourcepub async fn handle_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output>
pub async fn handle_row_inserts( &self, requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result<Output>
Handles row inserts request and creates a physical table on demand.
Sourcepub async fn handle_log_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output>
pub async fn handle_log_inserts( &self, requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, ) -> Result<Output>
Handles row inserts request and creates a log table on demand.
pub async fn handle_trace_inserts( &self, requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, ) -> Result<Output>
Sourcepub async fn handle_last_non_null_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output>
pub async fn handle_last_non_null_inserts( &self, requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result<Output>
Handles row inserts request and creates a table with last_non_null
merge mode on demand.
Sourceasync fn handle_row_inserts_with_create_type(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output>
async fn handle_row_inserts_with_create_type( &self, requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, create_type: AutoCreateTableType, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result<Output>
Handles row inserts request with specified AutoCreateTableType.
Sourcepub async fn handle_metric_row_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
physical_table: String,
) -> Result<Output>
pub async fn handle_metric_row_inserts( &self, requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, physical_table: String, ) -> Result<Output>
Handles row inserts request with metric engine.
pub async fn handle_table_insert( &self, request: TableInsertRequest, ctx: QueryContextRef, ) -> Result<Output>
pub async fn handle_statement_insert( &self, insert: &Insert, ctx: &QueryContextRef, ) -> Result<Output>
Source§impl Inserter
impl Inserter
async fn do_request( &self, requests: InstantAndNormalInsertRequests, table_infos: &HashMap<TableId, Arc<TableInfo>>, ctx: &QueryContextRef, ) -> Result<Output>
async fn group_requests_by_peer( &self, requests: RegionInsertRequests, ) -> Result<HashMap<Peer, RegionInsertRequests>>
Sourceasync fn create_or_alter_tables_on_demand(
&self,
requests: &mut RowInsertRequests,
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<CreateAlterTableResult>
async fn create_or_alter_tables_on_demand( &self, requests: &mut RowInsertRequests, ctx: &QueryContextRef, auto_create_table_type: AutoCreateTableType, statement_executor: &StatementExecutor, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result<CreateAlterTableResult>
Creates or alter tables on demand:
- if table does not exist, create table by inferred CreateExpr
- if table exist, check if schema matches. If any new column found, alter table by inferred
AlterExpr
Returns a mapping from table name to table id, where table name is the table name involved in the requests. This mapping is used in the conversion of RowToRegion.
accommodate_existing_schema
is used to determine if the existing schema should override the new schema.
It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
remote write. This will modify the RowInsertRequests
in place.
is_single_value
indicates whether the default schema only contains single value column so we can accommodate it.
async fn create_physical_table_on_demand( &self, ctx: &QueryContextRef, physical_table: String, statement_executor: &StatementExecutor, ) -> Result<()>
async fn get_table( &self, catalog: &str, schema: &str, table: &str, ) -> Result<Option<TableRef>>
fn get_create_table_expr_on_demand( &self, req: &RowInsertRequest, create_type: &AutoCreateTableType, ctx: &QueryContextRef, ) -> Result<CreateTableExpr>
Sourcefn get_alter_table_expr_on_demand(
&self,
req: &mut RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Option<AlterTableExpr>>
fn get_alter_table_expr_on_demand( &self, req: &mut RowInsertRequest, table: &TableRef, ctx: &QueryContextRef, accommodate_existing_schema: bool, is_single_value: bool, ) -> Result<Option<AlterTableExpr>>
Returns an alter table expression if it finds new columns in the request.
When accommodate_existing_schema
is false, it always adds columns if not exist.
When accommodate_existing_schema
is true, it may modify the input req
to
accommodate it with existing schema. See create_or_alter_tables_on_demand
for more details.
When accommodate_existing_schema
is true and is_single_value
is true, it also consider fields when modifying the
input req
.
Sourceasync fn create_physical_table(
&self,
create_table_expr: CreateTableExpr,
partitions: Option<Partitions>,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef>
async fn create_physical_table( &self, create_table_expr: CreateTableExpr, partitions: Option<Partitions>, ctx: &QueryContextRef, statement_executor: &StatementExecutor, ) -> Result<TableRef>
Creates a table with options.
async fn create_logical_tables( &self, create_table_exprs: Vec<CreateTableExpr>, ctx: &QueryContextRef, statement_executor: &StatementExecutor, ) -> Result<Vec<TableRef>>
pub fn node_manager(&self) -> &NodeManagerRef
pub fn partition_manager(&self) -> &PartitionRuleManagerRef
Auto Trait Implementations§
impl Freeze for Inserter
impl !RefUnwindSafe for Inserter
impl Send for Inserter
impl Sync for Inserter
impl Unpin for Inserter
impl !UnwindSafe for Inserter
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T, V> Convert<T> for Vwhere
V: Into<T>,
impl<T, V> Convert<T> for Vwhere
V: Into<T>,
fn convert(value: Self) -> T
fn convert_box(value: Box<Self>) -> Box<T>
fn convert_vec(value: Vec<Self>) -> Vec<T>
fn convert_vec_box(value: Vec<Box<Self>>) -> Vec<Box<T>>
fn convert_matrix(value: Vec<Vec<Self>>) -> Vec<Vec<T>>
fn convert_option(value: Option<Self>) -> Option<T>
fn convert_option_box(value: Option<Box<Self>>) -> Option<Box<T>>
fn convert_option_vec(value: Option<Vec<Self>>) -> Option<Vec<T>>
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
self
from the equivalent element of its
superset. Read more§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
self
is actually part of its subset T
(and can be converted to it).§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
self.to_subset
but without any property checks. Always succeeds.§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
self
to the equivalent element of its superset.§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.