use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
RegionRequestHeader,
};
use api::v1::{
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
RowInsertRequest, RowInsertRequests, SemanticType,
};
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_meta::cache::TableFlownodeSetCacheRef;
use common_meta::node_manager::{AffectedRows, NodeManagerRef};
use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use futures_util::future;
use meter_macros::write_meter;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use snafu::ResultExt;
use sql::statements::insert::Insert;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfoRef;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, FindNewColumnsOnInsertionSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu,
JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
};
use crate::expr_factory::CreateExprFactory;
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::common::preprocess_row_insert_requests;
use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion};
use crate::statement::StatementExecutor;
pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef,
}
pub type InserterRef = Arc<Inserter>;
#[derive(Clone)]
enum AutoCreateTableType {
Logical(String),
Physical,
Log,
LastNonNull,
}
impl AutoCreateTableType {
fn as_str(&self) -> &'static str {
match self {
AutoCreateTableType::Logical(_) => "logical",
AutoCreateTableType::Physical => "physical",
AutoCreateTableType::Log => "log",
AutoCreateTableType::LastNonNull => "last_non_null",
}
}
}
#[derive(Clone)]
pub struct InstantAndNormalInsertRequests {
pub normal_requests: RegionInsertRequests,
pub instant_requests: RegionInsertRequests,
}
impl Inserter {
pub fn new(
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef,
) -> Self {
Self {
catalog_manager,
partition_manager,
node_manager,
table_flownode_set_cache,
}
}
pub async fn handle_column_inserts(
&self,
requests: InsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor)
.await
}
pub async fn handle_row_inserts(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
preprocess_row_insert_requests(&mut requests.inserts)?;
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Physical,
)
.await
}
pub async fn handle_log_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Log,
)
.await
}
pub async fn handle_last_non_null_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::LastNonNull,
)
.await
}
async fn handle_row_inserts_with_create_type(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
) -> Result<Output> {
requests.inserts.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_column_count_match(&requests)?;
let (tables_info, instant_table_ids) = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.await?;
let inserts = RowToRegion::new(
tables_info,
instant_table_ids,
self.partition_manager.as_ref(),
)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
}
pub async fn handle_metric_row_inserts(
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
physical_table: String,
) -> Result<Output> {
requests.inserts.retain(|req| {
req.rows
.as_ref()
.map(|r| !r.rows.is_empty())
.unwrap_or_default()
});
validate_column_count_match(&requests)?;
self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
.await?;
let (tables_info, instant_table_ids) = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts = RowToRegion::new(tables_info, instant_table_ids, &self.partition_manager)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
}
pub async fn handle_table_insert(
&self,
request: TableInsertRequest,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table_name = request.table_name.as_str();
let table = self.get_table(catalog, schema, table_name).await?;
let table = table.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
})?;
let table_info = table.table_info();
let inserts = TableToRegion::new(&table_info, &self.partition_manager)
.convert(request)
.await?;
self.do_request(inserts, &ctx).await
}
pub async fn handle_statement_insert(
&self,
insert: &Insert,
ctx: &QueryContextRef,
) -> Result<Output> {
let inserts =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx)
.await?;
self.do_request(inserts, ctx).await
}
}
impl Inserter {
async fn do_request(
&self,
requests: InstantAndNormalInsertRequests,
ctx: &QueryContextRef,
) -> Result<Output> {
let write_cost = write_meter!(
ctx.current_catalog(),
ctx.current_schema(),
requests,
ctx.channel() as u8
);
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
..Default::default()
});
let InstantAndNormalInsertRequests {
normal_requests,
instant_requests,
} = requests;
match self
.mirror_flow_node_requests(
normal_requests
.requests
.iter()
.chain(instant_requests.requests.iter()),
)
.await
{
Ok(flow_requests) => {
let node_manager = self.node_manager.clone();
let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.context(RequestInsertsSnafu)
})
});
match future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
Ok(ret) => {
let affected_rows = ret
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<u64>>()
.unwrap_or(0);
crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
}
Err(err) => {
warn!(err; "Failed to insert data into flownode");
}
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}
let write_tasks = self
.group_requests_by_peer(normal_requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
let node_manager = self.node_manager.clone();
let request = request_factory.build_insert(inserts);
common_runtime::spawn_global(async move {
node_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestInsertsSnafu)
})
});
let results = future::try_join_all(write_tasks)
.await
.context(JoinTaskSnafu)?;
let affected_rows = results
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<AffectedRows>>()?;
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
Ok(Output::new(
OutputData::AffectedRows(affected_rows),
OutputMeta::new_with_cost(write_cost as _),
))
}
async fn mirror_flow_node_requests<'it, 'zelf: 'it>(
&'zelf self,
requests: impl Iterator<Item = &'it RegionInsertRequest>,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
Some(None) => continue,
_ => {
let peers = self
.table_flownode_set_cache
.get(table_id)
.await
.context(RequestInsertsSnafu)?
.unwrap_or_default()
.values()
.cloned()
.collect::<Vec<_>>();
if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
src_table_reqs.insert(table_id, None);
}
}
}
}
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
for (_table_id, (peers, reqs)) in src_table_reqs
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
if peers.len() == 1 {
inserts
.entry(peers[0].clone())
.or_default()
.requests
.extend(reqs.requests);
continue;
} else {
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
}
Ok(inserts)
}
async fn group_requests_by_peer(
&self,
requests: RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
for req in requests.requests {
let region_id = RegionId::from_u64(req.region_id);
requests_per_region
.entry(region_id)
.or_default()
.requests
.push(req);
}
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
for (region_id, reqs) in requests_per_region {
let peer = self
.partition_manager
.find_region_leader(region_id)
.await
.context(FindRegionLeaderSnafu)?;
inserts
.entry(peer)
.or_default()
.requests
.extend(reqs.requests);
}
Ok(inserts)
}
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
) -> Result<(HashMap<String, TableInfoRef>, HashSet<TableId>)> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
.start_timer();
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let mut tables_info = HashMap::with_capacity(requests.inserts.len());
let auto_create_table_hint = ctx
.extension(AUTO_CREATE_TABLE_KEY)
.map(|v| v.parse::<bool>())
.transpose()
.map_err(|_| {
InvalidInsertRequestSnafu {
reason: "`auto_create_table` hint must be a boolean",
}
.build()
})?
.unwrap_or(true);
if !auto_create_table_hint {
let mut instant_table_ids = HashSet::new();
for req in &requests.inserts {
let table = self
.get_table(catalog, &schema, &req.table_name)
.await?
.context(InvalidInsertRequestSnafu {
reason: format!(
"Table `{}` does not exist, and `auto_create_table` hint is disabled",
req.table_name
),
})?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
}
return Ok((tables_info, instant_table_ids));
}
let mut create_tables = vec![];
let mut alter_tables = vec![];
let mut instant_table_ids = HashSet::new();
for req in &requests.inserts {
match self.get_table(catalog, &schema, &req.table_name).await? {
Some(table) => {
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
alter_tables.push(alter_expr);
}
}
None => {
let create_expr =
self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
create_tables.push(create_expr);
}
}
}
match auto_create_table_type {
AutoCreateTableType::Logical(_) => {
if !create_tables.is_empty() {
let tables = self
.create_logical_tables(create_tables, ctx, statement_executor)
.await?;
for table in tables {
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
}
}
if !alter_tables.is_empty() {
statement_executor
.alter_logical_tables(alter_tables, ctx.clone())
.await?;
}
}
AutoCreateTableType::Physical
| AutoCreateTableType::Log
| AutoCreateTableType::LastNonNull => {
for create_table in create_tables {
let table = self
.create_physical_table(create_table, ctx, statement_executor)
.await?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
tables_info.insert(table_info.name.clone(), table_info);
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
.await?;
}
}
}
Ok((tables_info, instant_table_ids))
}
async fn create_physical_table_on_demand(
&self,
ctx: &QueryContextRef,
physical_table: String,
statement_executor: &StatementExecutor,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
if self
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
info!("Physical metric table `{table_reference}` does not exist, try creating table");
let default_schema = vec![
ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as _,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
options: None,
},
];
let create_table_expr =
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr
.table_options
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
.await;
match res {
Ok(_) => {
info!("Successfully created table {table_reference}",);
Ok(())
}
Err(err) => {
error!(err; "Failed to create table {table_reference}");
Err(err)
}
}
}
async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> Result<Option<TableRef>> {
self.catalog_manager
.table(catalog, schema, table, None)
.await
.context(CatalogSnafu)
}
fn get_create_table_expr_on_demand(
&self,
req: &RowInsertRequest,
create_type: &AutoCreateTableType,
ctx: &QueryContextRef,
) -> Result<CreateTableExpr> {
let mut table_options = Vec::with_capacity(4);
if let Some(ttl) = ctx.extension(TTL_KEY) {
table_options.push((TTL_KEY, ttl));
}
let mut engine_name = default_engine();
match create_type {
AutoCreateTableType::Logical(physical_table) => {
engine_name = METRIC_ENGINE_NAME;
table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
}
AutoCreateTableType::Physical => {
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
table_options.push((APPEND_MODE_KEY, append_mode));
}
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
table_options.push((MERGE_MODE_KEY, merge_mode));
}
}
AutoCreateTableType::Log => {
table_options.push((APPEND_MODE_KEY, "true"));
}
AutoCreateTableType::LastNonNull => {
table_options.push((MERGE_MODE_KEY, "last_non_null"));
}
}
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr =
build_create_table_expr(&table_ref, request_schema, engine_name)?;
info!("Table `{table_ref}` does not exist, try creating table");
for (k, v) in table_options {
create_table_expr
.table_options
.insert(k.to_string(), v.to_string());
}
Ok(create_table_expr)
}
fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
) -> Result<Option<AlterTableExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let table_name = table.table_info().name.clone();
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
let add_columns = extract_new_columns(&table.schema(), column_exprs)
.context(FindNewColumnsOnInsertionSnafu)?;
let Some(add_columns) = add_columns else {
return Ok(None);
};
Ok(Some(AlterTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
}))
}
async fn create_physical_table(
&self,
mut create_table_expr: CreateTableExpr,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
{
let table_ref = TableReference::full(
&create_table_expr.catalog_name,
&create_table_expr.schema_name,
&create_table_expr.table_name,
);
info!("Table `{table_ref}` does not exist, try creating table");
}
let res = statement_executor
.create_table_inner(&mut create_table_expr, None, ctx.clone())
.await;
let table_ref = TableReference::full(
&create_table_expr.catalog_name,
&create_table_expr.schema_name,
&create_table_expr.table_name,
);
match res {
Ok(table) => {
info!(
"Successfully created table {} with options: {:?}",
table_ref, create_table_expr.table_options,
);
Ok(table)
}
Err(err) => {
error!(err; "Failed to create table {}", table_ref);
Err(err)
}
}
}
async fn create_logical_tables(
&self,
create_table_exprs: Vec<CreateTableExpr>,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Vec<TableRef>> {
let res = statement_executor
.create_logical_tables(&create_table_exprs, ctx.clone())
.await;
match res {
Ok(res) => {
info!("Successfully created logical tables");
Ok(res)
}
Err(err) => {
let failed_tables = create_table_exprs
.into_iter()
.map(|expr| {
format!(
"{}.{}.{}",
expr.catalog_name, expr.schema_name, expr.table_name
)
})
.collect::<Vec<_>>();
error!(
err;
"Failed to create logical tables {:?}",
failed_tables
);
Err(err)
}
}
}
}
fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
for request in &requests.inserts {
let rows = request.rows.as_ref().unwrap();
let column_count = rows.schema.len();
rows.rows.iter().try_for_each(|r| {
ensure!(
r.values.len() == column_count,
InvalidInsertRequestSnafu {
reason: format!(
"column count mismatch, columns: {}, values: {}",
column_count,
r.values.len()
)
}
);
Ok(())
})?;
}
Ok(())
}
fn build_create_table_expr(
table: &TableReference,
request_schema: &[ColumnSchema],
engine: &str,
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None)
}