1use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21 InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22 RegionRequestHeader,
23};
24use api::v1::{
25 AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26 RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31 default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
32 TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
39use common_query::Output;
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::prelude::*;
48use snafu::ResultExt;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53 LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
56use store_api::storage::{RegionId, TableId};
57use table::metadata::TableInfo;
58use table::requests::{
59 InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
60 TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
61};
62use table::table_reference::TableReference;
63use table::TableRef;
64
65use crate::error::{
66 CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
67 InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
68};
69use crate::expr_helper;
70use crate::region_req_factory::RegionRequestFactory;
71use crate::req_convert::common::preprocess_row_insert_requests;
72use crate::req_convert::insert::{
73 fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
74};
75use crate::statement::StatementExecutor;
76
77pub struct Inserter {
78 catalog_manager: CatalogManagerRef,
79 pub(crate) partition_manager: PartitionRuleManagerRef,
80 pub(crate) node_manager: NodeManagerRef,
81 table_flownode_set_cache: TableFlownodeSetCacheRef,
82}
83
84pub type InserterRef = Arc<Inserter>;
85
86#[derive(Clone)]
88enum AutoCreateTableType {
89 Logical(String),
91 Physical,
93 Log,
95 LastNonNull,
97 Trace,
99}
100
101impl AutoCreateTableType {
102 fn as_str(&self) -> &'static str {
103 match self {
104 AutoCreateTableType::Logical(_) => "logical",
105 AutoCreateTableType::Physical => "physical",
106 AutoCreateTableType::Log => "log",
107 AutoCreateTableType::LastNonNull => "last_non_null",
108 AutoCreateTableType::Trace => "trace",
109 }
110 }
111}
112
113#[derive(Clone)]
120pub struct InstantAndNormalInsertRequests {
121 pub normal_requests: RegionInsertRequests,
123 pub instant_requests: RegionInsertRequests,
126}
127
128impl Inserter {
129 pub fn new(
130 catalog_manager: CatalogManagerRef,
131 partition_manager: PartitionRuleManagerRef,
132 node_manager: NodeManagerRef,
133 table_flownode_set_cache: TableFlownodeSetCacheRef,
134 ) -> Self {
135 Self {
136 catalog_manager,
137 partition_manager,
138 node_manager,
139 table_flownode_set_cache,
140 }
141 }
142
143 pub async fn handle_column_inserts(
144 &self,
145 requests: InsertRequests,
146 ctx: QueryContextRef,
147 statement_executor: &StatementExecutor,
148 ) -> Result<Output> {
149 let row_inserts = ColumnToRow::convert(requests)?;
150 self.handle_row_inserts(row_inserts, ctx, statement_executor)
151 .await
152 }
153
154 pub async fn handle_row_inserts(
156 &self,
157 mut requests: RowInsertRequests,
158 ctx: QueryContextRef,
159 statement_executor: &StatementExecutor,
160 ) -> Result<Output> {
161 preprocess_row_insert_requests(&mut requests.inserts)?;
162 self.handle_row_inserts_with_create_type(
163 requests,
164 ctx,
165 statement_executor,
166 AutoCreateTableType::Physical,
167 )
168 .await
169 }
170
171 pub async fn handle_log_inserts(
173 &self,
174 requests: RowInsertRequests,
175 ctx: QueryContextRef,
176 statement_executor: &StatementExecutor,
177 ) -> Result<Output> {
178 self.handle_row_inserts_with_create_type(
179 requests,
180 ctx,
181 statement_executor,
182 AutoCreateTableType::Log,
183 )
184 .await
185 }
186
187 pub async fn handle_trace_inserts(
188 &self,
189 requests: RowInsertRequests,
190 ctx: QueryContextRef,
191 statement_executor: &StatementExecutor,
192 ) -> Result<Output> {
193 self.handle_row_inserts_with_create_type(
194 requests,
195 ctx,
196 statement_executor,
197 AutoCreateTableType::Trace,
198 )
199 .await
200 }
201
202 pub async fn handle_last_non_null_inserts(
204 &self,
205 requests: RowInsertRequests,
206 ctx: QueryContextRef,
207 statement_executor: &StatementExecutor,
208 ) -> Result<Output> {
209 self.handle_row_inserts_with_create_type(
210 requests,
211 ctx,
212 statement_executor,
213 AutoCreateTableType::LastNonNull,
214 )
215 .await
216 }
217
218 async fn handle_row_inserts_with_create_type(
220 &self,
221 mut requests: RowInsertRequests,
222 ctx: QueryContextRef,
223 statement_executor: &StatementExecutor,
224 create_type: AutoCreateTableType,
225 ) -> Result<Output> {
226 requests.inserts.retain(|req| {
228 req.rows
229 .as_ref()
230 .map(|r| !r.rows.is_empty())
231 .unwrap_or_default()
232 });
233 validate_column_count_match(&requests)?;
234
235 let CreateAlterTableResult {
236 instant_table_ids,
237 table_infos,
238 } = self
239 .create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
240 .await?;
241
242 let name_to_info = table_infos
243 .values()
244 .map(|info| (info.name.clone(), info.clone()))
245 .collect::<HashMap<_, _>>();
246 let inserts = RowToRegion::new(
247 name_to_info,
248 instant_table_ids,
249 self.partition_manager.as_ref(),
250 )
251 .convert(requests)
252 .await?;
253
254 self.do_request(inserts, &table_infos, &ctx).await
255 }
256
257 pub async fn handle_metric_row_inserts(
259 &self,
260 mut requests: RowInsertRequests,
261 ctx: QueryContextRef,
262 statement_executor: &StatementExecutor,
263 physical_table: String,
264 ) -> Result<Output> {
265 requests.inserts.retain(|req| {
267 req.rows
268 .as_ref()
269 .map(|r| !r.rows.is_empty())
270 .unwrap_or_default()
271 });
272 validate_column_count_match(&requests)?;
273
274 self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
276 .await?;
277
278 let CreateAlterTableResult {
280 instant_table_ids,
281 table_infos,
282 } = self
283 .create_or_alter_tables_on_demand(
284 &requests,
285 &ctx,
286 AutoCreateTableType::Logical(physical_table.to_string()),
287 statement_executor,
288 )
289 .await?;
290 let name_to_info = table_infos
291 .values()
292 .map(|info| (info.name.clone(), info.clone()))
293 .collect::<HashMap<_, _>>();
294 let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
295 .convert(requests)
296 .await?;
297
298 self.do_request(inserts, &table_infos, &ctx).await
299 }
300
301 pub async fn handle_table_insert(
302 &self,
303 request: TableInsertRequest,
304 ctx: QueryContextRef,
305 ) -> Result<Output> {
306 let catalog = request.catalog_name.as_str();
307 let schema = request.schema_name.as_str();
308 let table_name = request.table_name.as_str();
309 let table = self.get_table(catalog, schema, table_name).await?;
310 let table = table.with_context(|| TableNotFoundSnafu {
311 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
312 })?;
313 let table_info = table.table_info();
314
315 let inserts = TableToRegion::new(&table_info, &self.partition_manager)
316 .convert(request)
317 .await?;
318
319 let table_infos =
320 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
321
322 self.do_request(inserts, &table_infos, &ctx).await
323 }
324
325 pub async fn handle_statement_insert(
326 &self,
327 insert: &Insert,
328 ctx: &QueryContextRef,
329 ) -> Result<Output> {
330 let (inserts, table_info) =
331 StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
332 .convert(insert, ctx)
333 .await?;
334
335 let table_infos =
336 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
337
338 self.do_request(inserts, &table_infos, ctx).await
339 }
340}
341
342impl Inserter {
343 async fn do_request(
344 &self,
345 requests: InstantAndNormalInsertRequests,
346 table_infos: &HashMap<TableId, Arc<TableInfo>>,
347 ctx: &QueryContextRef,
348 ) -> Result<Output> {
349 let requests = fill_reqs_with_impure_default(table_infos, requests)?;
351
352 let write_cost = write_meter!(
353 ctx.current_catalog(),
354 ctx.current_schema(),
355 requests,
356 ctx.channel() as u8
357 );
358 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
359 tracing_context: TracingContext::from_current_span().to_w3c(),
360 dbname: ctx.get_db_string(),
361 ..Default::default()
362 });
363
364 let InstantAndNormalInsertRequests {
365 normal_requests,
366 instant_requests,
367 } = requests;
368
369 let flow_mirror_task = FlowMirrorTask::new(
371 &self.table_flownode_set_cache,
372 normal_requests
373 .requests
374 .iter()
375 .chain(instant_requests.requests.iter()),
376 )
377 .await?;
378 flow_mirror_task.detach(self.node_manager.clone())?;
379
380 let write_tasks = self
382 .group_requests_by_peer(normal_requests)
383 .await?
384 .into_iter()
385 .map(|(peer, inserts)| {
386 let node_manager = self.node_manager.clone();
387 let request = request_factory.build_insert(inserts);
388 common_runtime::spawn_global(async move {
389 node_manager
390 .datanode(&peer)
391 .await
392 .handle(request)
393 .await
394 .context(RequestInsertsSnafu)
395 })
396 });
397 let results = future::try_join_all(write_tasks)
398 .await
399 .context(JoinTaskSnafu)?;
400 let affected_rows = results
401 .into_iter()
402 .map(|resp| resp.map(|r| r.affected_rows))
403 .sum::<Result<AffectedRows>>()?;
404 crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
405 Ok(Output::new(
406 OutputData::AffectedRows(affected_rows),
407 OutputMeta::new_with_cost(write_cost as _),
408 ))
409 }
410
411 async fn group_requests_by_peer(
412 &self,
413 requests: RegionInsertRequests,
414 ) -> Result<HashMap<Peer, RegionInsertRequests>> {
415 let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
418 for req in requests.requests {
419 let region_id = RegionId::from_u64(req.region_id);
420 requests_per_region
421 .entry(region_id)
422 .or_default()
423 .requests
424 .push(req);
425 }
426
427 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
428
429 for (region_id, reqs) in requests_per_region {
430 let peer = self
431 .partition_manager
432 .find_region_leader(region_id)
433 .await
434 .context(FindRegionLeaderSnafu)?;
435 inserts
436 .entry(peer)
437 .or_default()
438 .requests
439 .extend(reqs.requests);
440 }
441
442 Ok(inserts)
443 }
444
445 async fn create_or_alter_tables_on_demand(
452 &self,
453 requests: &RowInsertRequests,
454 ctx: &QueryContextRef,
455 auto_create_table_type: AutoCreateTableType,
456 statement_executor: &StatementExecutor,
457 ) -> Result<CreateAlterTableResult> {
458 let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
459 .with_label_values(&[auto_create_table_type.as_str()])
460 .start_timer();
461
462 let catalog = ctx.current_catalog();
463 let schema = ctx.current_schema();
464
465 let mut table_infos = HashMap::new();
466 let auto_create_table_hint = ctx
468 .extension(AUTO_CREATE_TABLE_KEY)
469 .map(|v| v.parse::<bool>())
470 .transpose()
471 .map_err(|_| {
472 InvalidInsertRequestSnafu {
473 reason: "`auto_create_table` hint must be a boolean",
474 }
475 .build()
476 })?
477 .unwrap_or(true);
478 if !auto_create_table_hint {
479 let mut instant_table_ids = HashSet::new();
480 for req in &requests.inserts {
481 let table = self
482 .get_table(catalog, &schema, &req.table_name)
483 .await?
484 .context(InvalidInsertRequestSnafu {
485 reason: format!(
486 "Table `{}` does not exist, and `auto_create_table` hint is disabled",
487 req.table_name
488 ),
489 })?;
490 let table_info = table.table_info();
491 if table_info.is_ttl_instant_table() {
492 instant_table_ids.insert(table_info.table_id());
493 }
494 table_infos.insert(table_info.table_id(), table.table_info());
495 }
496 let ret = CreateAlterTableResult {
497 instant_table_ids,
498 table_infos,
499 };
500 return Ok(ret);
501 }
502
503 let mut create_tables = vec![];
504 let mut alter_tables = vec![];
505 let mut instant_table_ids = HashSet::new();
506
507 for req in &requests.inserts {
508 match self.get_table(catalog, &schema, &req.table_name).await? {
509 Some(table) => {
510 let table_info = table.table_info();
511 if table_info.is_ttl_instant_table() {
512 instant_table_ids.insert(table_info.table_id());
513 }
514 table_infos.insert(table_info.table_id(), table.table_info());
515 if let Some(alter_expr) =
516 self.get_alter_table_expr_on_demand(req, &table, ctx)?
517 {
518 alter_tables.push(alter_expr);
519 }
520 }
521 None => {
522 let create_expr =
523 self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
524 create_tables.push(create_expr);
525 }
526 }
527 }
528
529 match auto_create_table_type {
530 AutoCreateTableType::Logical(_) => {
531 if !create_tables.is_empty() {
532 let tables = self
534 .create_logical_tables(create_tables, ctx, statement_executor)
535 .await?;
536
537 for table in tables {
538 let table_info = table.table_info();
539 if table_info.is_ttl_instant_table() {
540 instant_table_ids.insert(table_info.table_id());
541 }
542 table_infos.insert(table_info.table_id(), table.table_info());
543 }
544 }
545 if !alter_tables.is_empty() {
546 statement_executor
548 .alter_logical_tables(alter_tables, ctx.clone())
549 .await?;
550 }
551 }
552 AutoCreateTableType::Physical
553 | AutoCreateTableType::Log
554 | AutoCreateTableType::LastNonNull => {
555 for create_table in create_tables {
558 let table = self
559 .create_physical_table(create_table, None, ctx, statement_executor)
560 .await?;
561 let table_info = table.table_info();
562 if table_info.is_ttl_instant_table() {
563 instant_table_ids.insert(table_info.table_id());
564 }
565 table_infos.insert(table_info.table_id(), table.table_info());
566 }
567 for alter_expr in alter_tables.into_iter() {
568 statement_executor
569 .alter_table_inner(alter_expr, ctx.clone())
570 .await?;
571 }
572 }
573
574 AutoCreateTableType::Trace => {
575 let trace_table_name = ctx
576 .extension(TRACE_TABLE_NAME_SESSION_KEY)
577 .unwrap_or(TRACE_TABLE_NAME);
578
579 for mut create_table in create_tables {
582 if create_table.table_name == trace_services_table_name(trace_table_name) {
583 create_table
585 .table_options
586 .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
587 let table = self
588 .create_physical_table(create_table, None, ctx, statement_executor)
589 .await?;
590 let table_info = table.table_info();
591 if table_info.is_ttl_instant_table() {
592 instant_table_ids.insert(table_info.table_id());
593 }
594 table_infos.insert(table_info.table_id(), table.table_info());
595 } else {
596 let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
599 .context(CreatePartitionRulesSnafu)?;
600 let index_columns =
605 [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
606 for index_column in index_columns {
607 if let Some(col) = create_table
608 .column_defs
609 .iter_mut()
610 .find(|c| c.name == index_column)
611 {
612 col.options =
613 options_from_skipping(&SkippingIndexOptions::default())
614 .context(ColumnOptionsSnafu)?;
615 } else {
616 warn!(
617 "Column {} not found when creating index for trace table: {}.",
618 index_column, create_table.table_name
619 );
620 }
621 }
622
623 create_table.table_options.insert(
625 TABLE_DATA_MODEL.to_string(),
626 TABLE_DATA_MODEL_TRACE_V1.to_string(),
627 );
628
629 let table = self
630 .create_physical_table(
631 create_table,
632 Some(partitions),
633 ctx,
634 statement_executor,
635 )
636 .await?;
637 let table_info = table.table_info();
638 if table_info.is_ttl_instant_table() {
639 instant_table_ids.insert(table_info.table_id());
640 }
641 table_infos.insert(table_info.table_id(), table.table_info());
642 }
643 }
644 for alter_expr in alter_tables.into_iter() {
645 statement_executor
646 .alter_table_inner(alter_expr, ctx.clone())
647 .await?;
648 }
649 }
650 }
651
652 Ok(CreateAlterTableResult {
653 instant_table_ids,
654 table_infos,
655 })
656 }
657
658 async fn create_physical_table_on_demand(
659 &self,
660 ctx: &QueryContextRef,
661 physical_table: String,
662 statement_executor: &StatementExecutor,
663 ) -> Result<()> {
664 let catalog_name = ctx.current_catalog();
665 let schema_name = ctx.current_schema();
666
667 if self
669 .get_table(catalog_name, &schema_name, &physical_table)
670 .await?
671 .is_some()
672 {
673 return Ok(());
674 }
675
676 let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
677 info!("Physical metric table `{table_reference}` does not exist, try creating table");
678
679 let default_schema = vec![
681 ColumnSchema {
682 column_name: GREPTIME_TIMESTAMP.to_string(),
683 datatype: ColumnDataType::TimestampMillisecond as _,
684 semantic_type: SemanticType::Timestamp as _,
685 datatype_extension: None,
686 options: None,
687 },
688 ColumnSchema {
689 column_name: GREPTIME_VALUE.to_string(),
690 datatype: ColumnDataType::Float64 as _,
691 semantic_type: SemanticType::Field as _,
692 datatype_extension: None,
693 options: None,
694 },
695 ];
696 let create_table_expr =
697 &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
698
699 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
700 create_table_expr
701 .table_options
702 .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
703
704 let res = statement_executor
706 .create_table_inner(create_table_expr, None, ctx.clone())
707 .await;
708
709 match res {
710 Ok(_) => {
711 info!("Successfully created table {table_reference}",);
712 Ok(())
713 }
714 Err(err) => {
715 error!(err; "Failed to create table {table_reference}");
716 Err(err)
717 }
718 }
719 }
720
721 async fn get_table(
722 &self,
723 catalog: &str,
724 schema: &str,
725 table: &str,
726 ) -> Result<Option<TableRef>> {
727 self.catalog_manager
728 .table(catalog, schema, table, None)
729 .await
730 .context(CatalogSnafu)
731 }
732
733 fn get_create_table_expr_on_demand(
734 &self,
735 req: &RowInsertRequest,
736 create_type: &AutoCreateTableType,
737 ctx: &QueryContextRef,
738 ) -> Result<CreateTableExpr> {
739 let mut table_options = Vec::with_capacity(4);
740 for key in VALID_TABLE_OPTION_KEYS {
741 if let Some(value) = ctx.extension(key) {
742 table_options.push((key, value));
743 }
744 }
745
746 let mut engine_name = default_engine();
747 match create_type {
748 AutoCreateTableType::Logical(physical_table) => {
749 engine_name = METRIC_ENGINE_NAME;
750 table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
751 }
752 AutoCreateTableType::Physical => {
753 if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
754 table_options.push((APPEND_MODE_KEY, append_mode));
755 }
756 if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
757 table_options.push((MERGE_MODE_KEY, merge_mode));
758 }
759 }
760 AutoCreateTableType::Log => {
763 table_options.push((APPEND_MODE_KEY, "true"));
764 }
765 AutoCreateTableType::LastNonNull => {
766 table_options.push((MERGE_MODE_KEY, "last_non_null"));
767 }
768 AutoCreateTableType::Trace => {
769 table_options.push((APPEND_MODE_KEY, "true"));
770 }
771 }
772
773 let schema = ctx.current_schema();
774 let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
775 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
777 let mut create_table_expr =
778 build_create_table_expr(&table_ref, request_schema, engine_name)?;
779
780 info!("Table `{table_ref}` does not exist, try creating table");
781 for (k, v) in table_options {
782 create_table_expr
783 .table_options
784 .insert(k.to_string(), v.to_string());
785 }
786
787 Ok(create_table_expr)
788 }
789
790 fn get_alter_table_expr_on_demand(
793 &self,
794 req: &RowInsertRequest,
795 table: &TableRef,
796 ctx: &QueryContextRef,
797 ) -> Result<Option<AlterTableExpr>> {
798 let catalog_name = ctx.current_catalog();
799 let schema_name = ctx.current_schema();
800 let table_name = table.table_info().name.clone();
801
802 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
803 let column_exprs = ColumnExpr::from_column_schemas(request_schema);
804 let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
805 let Some(add_columns) = add_columns else {
806 return Ok(None);
807 };
808
809 Ok(Some(AlterTableExpr {
810 catalog_name: catalog_name.to_string(),
811 schema_name: schema_name.to_string(),
812 table_name: table_name.to_string(),
813 kind: Some(Kind::AddColumns(add_columns)),
814 }))
815 }
816
817 async fn create_physical_table(
819 &self,
820 mut create_table_expr: CreateTableExpr,
821 partitions: Option<Partitions>,
822 ctx: &QueryContextRef,
823 statement_executor: &StatementExecutor,
824 ) -> Result<TableRef> {
825 {
826 let table_ref = TableReference::full(
827 &create_table_expr.catalog_name,
828 &create_table_expr.schema_name,
829 &create_table_expr.table_name,
830 );
831
832 info!("Table `{table_ref}` does not exist, try creating table");
833 }
834 let res = statement_executor
835 .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
836 .await;
837
838 let table_ref = TableReference::full(
839 &create_table_expr.catalog_name,
840 &create_table_expr.schema_name,
841 &create_table_expr.table_name,
842 );
843
844 match res {
845 Ok(table) => {
846 info!(
847 "Successfully created table {} with options: {:?}",
848 table_ref, create_table_expr.table_options,
849 );
850 Ok(table)
851 }
852 Err(err) => {
853 error!(err; "Failed to create table {}", table_ref);
854 Err(err)
855 }
856 }
857 }
858
859 async fn create_logical_tables(
860 &self,
861 create_table_exprs: Vec<CreateTableExpr>,
862 ctx: &QueryContextRef,
863 statement_executor: &StatementExecutor,
864 ) -> Result<Vec<TableRef>> {
865 let res = statement_executor
866 .create_logical_tables(&create_table_exprs, ctx.clone())
867 .await;
868
869 match res {
870 Ok(res) => {
871 info!("Successfully created logical tables");
872 Ok(res)
873 }
874 Err(err) => {
875 let failed_tables = create_table_exprs
876 .into_iter()
877 .map(|expr| {
878 format!(
879 "{}.{}.{}",
880 expr.catalog_name, expr.schema_name, expr.table_name
881 )
882 })
883 .collect::<Vec<_>>();
884 error!(
885 err;
886 "Failed to create logical tables {:?}",
887 failed_tables
888 );
889 Err(err)
890 }
891 }
892 }
893}
894
895fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
896 for request in &requests.inserts {
897 let rows = request.rows.as_ref().unwrap();
898 let column_count = rows.schema.len();
899 rows.rows.iter().try_for_each(|r| {
900 ensure!(
901 r.values.len() == column_count,
902 InvalidInsertRequestSnafu {
903 reason: format!(
904 "column count mismatch, columns: {}, values: {}",
905 column_count,
906 r.values.len()
907 )
908 }
909 );
910 Ok(())
911 })?;
912 }
913 Ok(())
914}
915
916fn build_create_table_expr(
917 table: &TableReference,
918 request_schema: &[ColumnSchema],
919 engine: &str,
920) -> Result<CreateTableExpr> {
921 expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
922}
923
924struct CreateAlterTableResult {
926 instant_table_ids: HashSet<TableId>,
928 table_infos: HashMap<TableId, Arc<TableInfo>>,
930}
931
932struct FlowMirrorTask {
933 requests: HashMap<Peer, RegionInsertRequests>,
934 num_rows: usize,
935}
936
937impl FlowMirrorTask {
938 async fn new(
939 cache: &TableFlownodeSetCacheRef,
940 requests: impl Iterator<Item = &RegionInsertRequest>,
941 ) -> Result<Self> {
942 let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
943 HashMap::new();
944 let mut num_rows = 0;
945
946 for req in requests {
947 let table_id = RegionId::from_u64(req.region_id).table_id();
948 match src_table_reqs.get_mut(&table_id) {
949 Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
950 Some(None) => continue,
952 _ => {
953 let peers = cache
954 .get(table_id)
955 .await
956 .context(RequestInsertsSnafu)?
957 .unwrap_or_default()
958 .values()
959 .cloned()
960 .collect::<Vec<_>>();
961
962 if !peers.is_empty() {
963 let mut reqs = RegionInsertRequests::default();
964 reqs.requests.push(req.clone());
965 num_rows += reqs
966 .requests
967 .iter()
968 .map(|r| r.rows.as_ref().unwrap().rows.len())
969 .sum::<usize>();
970 src_table_reqs.insert(table_id, Some((peers, reqs)));
971 } else {
972 src_table_reqs.insert(table_id, None);
974 }
975 }
976 }
977 }
978
979 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
980
981 for (_table_id, (peers, reqs)) in src_table_reqs
982 .into_iter()
983 .filter_map(|(k, v)| v.map(|v| (k, v)))
984 {
985 if peers.len() == 1 {
986 inserts
988 .entry(peers[0].clone())
989 .or_default()
990 .requests
991 .extend(reqs.requests);
992 continue;
993 } else {
994 for flownode in peers {
996 inserts
997 .entry(flownode.clone())
998 .or_default()
999 .requests
1000 .extend(reqs.requests.clone());
1001 }
1002 }
1003 }
1004
1005 Ok(Self {
1006 requests: inserts,
1007 num_rows,
1008 })
1009 }
1010
1011 fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1012 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1013 for (peer, inserts) in self.requests {
1014 let node_manager = node_manager.clone();
1015 common_runtime::spawn_global(async move {
1016 let result = node_manager
1017 .flownode(&peer)
1018 .await
1019 .handle_inserts(inserts)
1020 .await
1021 .context(RequestInsertsSnafu);
1022
1023 match result {
1024 Ok(resp) => {
1025 let affected_rows = resp.affected_rows;
1026 crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1027 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1028 }
1029 Err(err) => {
1030 error!(err; "Failed to insert data into flownode {}", peer);
1031 }
1032 }
1033 });
1034 }
1035
1036 Ok(())
1037 }
1038}