1use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21 InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22 RegionRequestHeader,
23};
24use api::v1::{
25 AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26 RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31 default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
32 TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
39use common_query::Output;
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::prelude::*;
48use snafu::ResultExt;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53 LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
56use store_api::storage::{RegionId, TableId};
57use table::metadata::TableInfo;
58use table::requests::{
59 InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
60 TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
61};
62use table::table_reference::TableReference;
63use table::TableRef;
64
65use crate::error::{
66 CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
67 InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
68};
69use crate::expr_helper;
70use crate::region_req_factory::RegionRequestFactory;
71use crate::req_convert::common::preprocess_row_insert_requests;
72use crate::req_convert::insert::{
73 fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
74};
75use crate::statement::StatementExecutor;
76
77pub struct Inserter {
78 catalog_manager: CatalogManagerRef,
79 pub(crate) partition_manager: PartitionRuleManagerRef,
80 pub(crate) node_manager: NodeManagerRef,
81 pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
82}
83
84pub type InserterRef = Arc<Inserter>;
85
86#[derive(Clone)]
88pub enum AutoCreateTableType {
89 Logical(String),
91 Physical,
93 Log,
95 LastNonNull,
97 Trace,
99}
100
101impl AutoCreateTableType {
102 fn as_str(&self) -> &'static str {
103 match self {
104 AutoCreateTableType::Logical(_) => "logical",
105 AutoCreateTableType::Physical => "physical",
106 AutoCreateTableType::Log => "log",
107 AutoCreateTableType::LastNonNull => "last_non_null",
108 AutoCreateTableType::Trace => "trace",
109 }
110 }
111}
112
113#[derive(Clone)]
120pub struct InstantAndNormalInsertRequests {
121 pub normal_requests: RegionInsertRequests,
123 pub instant_requests: RegionInsertRequests,
126}
127
128impl Inserter {
129 pub fn new(
130 catalog_manager: CatalogManagerRef,
131 partition_manager: PartitionRuleManagerRef,
132 node_manager: NodeManagerRef,
133 table_flownode_set_cache: TableFlownodeSetCacheRef,
134 ) -> Self {
135 Self {
136 catalog_manager,
137 partition_manager,
138 node_manager,
139 table_flownode_set_cache,
140 }
141 }
142
143 pub async fn handle_column_inserts(
144 &self,
145 requests: InsertRequests,
146 ctx: QueryContextRef,
147 statement_executor: &StatementExecutor,
148 ) -> Result<Output> {
149 let row_inserts = ColumnToRow::convert(requests)?;
150 self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
151 .await
152 }
153
154 pub async fn handle_row_inserts(
156 &self,
157 mut requests: RowInsertRequests,
158 ctx: QueryContextRef,
159 statement_executor: &StatementExecutor,
160 accommodate_existing_schema: bool,
161 is_single_value: bool,
162 ) -> Result<Output> {
163 preprocess_row_insert_requests(&mut requests.inserts)?;
164 self.handle_row_inserts_with_create_type(
165 requests,
166 ctx,
167 statement_executor,
168 AutoCreateTableType::Physical,
169 accommodate_existing_schema,
170 is_single_value,
171 )
172 .await
173 }
174
175 pub async fn handle_log_inserts(
177 &self,
178 requests: RowInsertRequests,
179 ctx: QueryContextRef,
180 statement_executor: &StatementExecutor,
181 ) -> Result<Output> {
182 self.handle_row_inserts_with_create_type(
183 requests,
184 ctx,
185 statement_executor,
186 AutoCreateTableType::Log,
187 false,
188 false,
189 )
190 .await
191 }
192
193 pub async fn handle_trace_inserts(
194 &self,
195 requests: RowInsertRequests,
196 ctx: QueryContextRef,
197 statement_executor: &StatementExecutor,
198 ) -> Result<Output> {
199 self.handle_row_inserts_with_create_type(
200 requests,
201 ctx,
202 statement_executor,
203 AutoCreateTableType::Trace,
204 false,
205 false,
206 )
207 .await
208 }
209
210 pub async fn handle_last_non_null_inserts(
212 &self,
213 requests: RowInsertRequests,
214 ctx: QueryContextRef,
215 statement_executor: &StatementExecutor,
216 accommodate_existing_schema: bool,
217 is_single_value: bool,
218 ) -> Result<Output> {
219 self.handle_row_inserts_with_create_type(
220 requests,
221 ctx,
222 statement_executor,
223 AutoCreateTableType::LastNonNull,
224 accommodate_existing_schema,
225 is_single_value,
226 )
227 .await
228 }
229
230 async fn handle_row_inserts_with_create_type(
232 &self,
233 mut requests: RowInsertRequests,
234 ctx: QueryContextRef,
235 statement_executor: &StatementExecutor,
236 create_type: AutoCreateTableType,
237 accommodate_existing_schema: bool,
238 is_single_value: bool,
239 ) -> Result<Output> {
240 requests.inserts.retain(|req| {
242 req.rows
243 .as_ref()
244 .map(|r| !r.rows.is_empty())
245 .unwrap_or_default()
246 });
247 validate_column_count_match(&requests)?;
248
249 let CreateAlterTableResult {
250 instant_table_ids,
251 table_infos,
252 } = self
253 .create_or_alter_tables_on_demand(
254 &mut requests,
255 &ctx,
256 create_type,
257 statement_executor,
258 accommodate_existing_schema,
259 is_single_value,
260 )
261 .await?;
262
263 let name_to_info = table_infos
264 .values()
265 .map(|info| (info.name.clone(), info.clone()))
266 .collect::<HashMap<_, _>>();
267 let inserts = RowToRegion::new(
268 name_to_info,
269 instant_table_ids,
270 self.partition_manager.as_ref(),
271 )
272 .convert(requests)
273 .await?;
274
275 self.do_request(inserts, &table_infos, &ctx).await
276 }
277
278 pub async fn handle_metric_row_inserts(
280 &self,
281 mut requests: RowInsertRequests,
282 ctx: QueryContextRef,
283 statement_executor: &StatementExecutor,
284 physical_table: String,
285 ) -> Result<Output> {
286 requests.inserts.retain(|req| {
288 req.rows
289 .as_ref()
290 .map(|r| !r.rows.is_empty())
291 .unwrap_or_default()
292 });
293 validate_column_count_match(&requests)?;
294
295 self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
297 .await?;
298
299 let CreateAlterTableResult {
301 instant_table_ids,
302 table_infos,
303 } = self
304 .create_or_alter_tables_on_demand(
305 &mut requests,
306 &ctx,
307 AutoCreateTableType::Logical(physical_table.to_string()),
308 statement_executor,
309 true,
310 true,
311 )
312 .await?;
313 let name_to_info = table_infos
314 .values()
315 .map(|info| (info.name.clone(), info.clone()))
316 .collect::<HashMap<_, _>>();
317 let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
318 .convert(requests)
319 .await?;
320
321 self.do_request(inserts, &table_infos, &ctx).await
322 }
323
324 pub async fn handle_table_insert(
325 &self,
326 request: TableInsertRequest,
327 ctx: QueryContextRef,
328 ) -> Result<Output> {
329 let catalog = request.catalog_name.as_str();
330 let schema = request.schema_name.as_str();
331 let table_name = request.table_name.as_str();
332 let table = self.get_table(catalog, schema, table_name).await?;
333 let table = table.with_context(|| TableNotFoundSnafu {
334 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
335 })?;
336 let table_info = table.table_info();
337
338 let inserts = TableToRegion::new(&table_info, &self.partition_manager)
339 .convert(request)
340 .await?;
341
342 let table_infos =
343 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
344
345 self.do_request(inserts, &table_infos, &ctx).await
346 }
347
348 pub async fn handle_statement_insert(
349 &self,
350 insert: &Insert,
351 ctx: &QueryContextRef,
352 ) -> Result<Output> {
353 let (inserts, table_info) =
354 StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
355 .convert(insert, ctx)
356 .await?;
357
358 let table_infos =
359 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
360
361 self.do_request(inserts, &table_infos, ctx).await
362 }
363}
364
365impl Inserter {
366 async fn do_request(
367 &self,
368 requests: InstantAndNormalInsertRequests,
369 table_infos: &HashMap<TableId, Arc<TableInfo>>,
370 ctx: &QueryContextRef,
371 ) -> Result<Output> {
372 let requests = fill_reqs_with_impure_default(table_infos, requests)?;
374
375 let write_cost = write_meter!(
376 ctx.current_catalog(),
377 ctx.current_schema(),
378 requests,
379 ctx.channel() as u8
380 );
381 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
382 tracing_context: TracingContext::from_current_span().to_w3c(),
383 dbname: ctx.get_db_string(),
384 ..Default::default()
385 });
386
387 let InstantAndNormalInsertRequests {
388 normal_requests,
389 instant_requests,
390 } = requests;
391
392 let flow_mirror_task = FlowMirrorTask::new(
394 &self.table_flownode_set_cache,
395 normal_requests
396 .requests
397 .iter()
398 .chain(instant_requests.requests.iter()),
399 )
400 .await?;
401 flow_mirror_task.detach(self.node_manager.clone())?;
402
403 let write_tasks = self
405 .group_requests_by_peer(normal_requests)
406 .await?
407 .into_iter()
408 .map(|(peer, inserts)| {
409 let node_manager = self.node_manager.clone();
410 let request = request_factory.build_insert(inserts);
411 common_runtime::spawn_global(async move {
412 node_manager
413 .datanode(&peer)
414 .await
415 .handle(request)
416 .await
417 .context(RequestInsertsSnafu)
418 })
419 });
420 let results = future::try_join_all(write_tasks)
421 .await
422 .context(JoinTaskSnafu)?;
423 let affected_rows = results
424 .into_iter()
425 .map(|resp| resp.map(|r| r.affected_rows))
426 .sum::<Result<AffectedRows>>()?;
427 crate::metrics::DIST_INGEST_ROW_COUNT
428 .with_label_values(&[ctx.get_db_string().as_str()])
429 .inc_by(affected_rows as u64);
430 Ok(Output::new(
431 OutputData::AffectedRows(affected_rows),
432 OutputMeta::new_with_cost(write_cost as _),
433 ))
434 }
435
436 async fn group_requests_by_peer(
437 &self,
438 requests: RegionInsertRequests,
439 ) -> Result<HashMap<Peer, RegionInsertRequests>> {
440 let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
443 for req in requests.requests {
444 let region_id = RegionId::from_u64(req.region_id);
445 requests_per_region
446 .entry(region_id)
447 .or_default()
448 .requests
449 .push(req);
450 }
451
452 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
453
454 for (region_id, reqs) in requests_per_region {
455 let peer = self
456 .partition_manager
457 .find_region_leader(region_id)
458 .await
459 .context(FindRegionLeaderSnafu)?;
460 inserts
461 .entry(peer)
462 .or_default()
463 .requests
464 .extend(reqs.requests);
465 }
466
467 Ok(inserts)
468 }
469
470 async fn create_or_alter_tables_on_demand(
483 &self,
484 requests: &mut RowInsertRequests,
485 ctx: &QueryContextRef,
486 auto_create_table_type: AutoCreateTableType,
487 statement_executor: &StatementExecutor,
488 accommodate_existing_schema: bool,
489 is_single_value: bool,
490 ) -> Result<CreateAlterTableResult> {
491 let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
492 .with_label_values(&[auto_create_table_type.as_str()])
493 .start_timer();
494
495 let catalog = ctx.current_catalog();
496 let schema = ctx.current_schema();
497
498 let mut table_infos = HashMap::new();
499 let auto_create_table_hint = ctx
501 .extension(AUTO_CREATE_TABLE_KEY)
502 .map(|v| v.parse::<bool>())
503 .transpose()
504 .map_err(|_| {
505 InvalidInsertRequestSnafu {
506 reason: "`auto_create_table` hint must be a boolean",
507 }
508 .build()
509 })?
510 .unwrap_or(true);
511 if !auto_create_table_hint {
512 let mut instant_table_ids = HashSet::new();
513 for req in &requests.inserts {
514 let table = self
515 .get_table(catalog, &schema, &req.table_name)
516 .await?
517 .context(InvalidInsertRequestSnafu {
518 reason: format!(
519 "Table `{}` does not exist, and `auto_create_table` hint is disabled",
520 req.table_name
521 ),
522 })?;
523 let table_info = table.table_info();
524 if table_info.is_ttl_instant_table() {
525 instant_table_ids.insert(table_info.table_id());
526 }
527 table_infos.insert(table_info.table_id(), table.table_info());
528 }
529 let ret = CreateAlterTableResult {
530 instant_table_ids,
531 table_infos,
532 };
533 return Ok(ret);
534 }
535
536 let mut create_tables = vec![];
537 let mut alter_tables = vec![];
538 let mut instant_table_ids = HashSet::new();
539
540 for req in &mut requests.inserts {
541 match self.get_table(catalog, &schema, &req.table_name).await? {
542 Some(table) => {
543 let table_info = table.table_info();
544 if table_info.is_ttl_instant_table() {
545 instant_table_ids.insert(table_info.table_id());
546 }
547 table_infos.insert(table_info.table_id(), table.table_info());
548 if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
549 req,
550 &table,
551 ctx,
552 accommodate_existing_schema,
553 is_single_value,
554 )? {
555 alter_tables.push(alter_expr);
556 }
557 }
558 None => {
559 let create_expr =
560 self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
561 create_tables.push(create_expr);
562 }
563 }
564 }
565
566 match auto_create_table_type {
567 AutoCreateTableType::Logical(_) => {
568 if !create_tables.is_empty() {
569 let tables = self
571 .create_logical_tables(create_tables, ctx, statement_executor)
572 .await?;
573
574 for table in tables {
575 let table_info = table.table_info();
576 if table_info.is_ttl_instant_table() {
577 instant_table_ids.insert(table_info.table_id());
578 }
579 table_infos.insert(table_info.table_id(), table.table_info());
580 }
581 }
582 if !alter_tables.is_empty() {
583 statement_executor
585 .alter_logical_tables(alter_tables, ctx.clone())
586 .await?;
587 }
588 }
589 AutoCreateTableType::Physical
590 | AutoCreateTableType::Log
591 | AutoCreateTableType::LastNonNull => {
592 for create_table in create_tables {
595 let table = self
596 .create_physical_table(create_table, None, ctx, statement_executor)
597 .await?;
598 let table_info = table.table_info();
599 if table_info.is_ttl_instant_table() {
600 instant_table_ids.insert(table_info.table_id());
601 }
602 table_infos.insert(table_info.table_id(), table.table_info());
603 }
604 for alter_expr in alter_tables.into_iter() {
605 statement_executor
606 .alter_table_inner(alter_expr, ctx.clone())
607 .await?;
608 }
609 }
610
611 AutoCreateTableType::Trace => {
612 let trace_table_name = ctx
613 .extension(TRACE_TABLE_NAME_SESSION_KEY)
614 .unwrap_or(TRACE_TABLE_NAME);
615
616 for mut create_table in create_tables {
619 if create_table.table_name == trace_services_table_name(trace_table_name) {
620 create_table
622 .table_options
623 .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
624 let table = self
625 .create_physical_table(create_table, None, ctx, statement_executor)
626 .await?;
627 let table_info = table.table_info();
628 if table_info.is_ttl_instant_table() {
629 instant_table_ids.insert(table_info.table_id());
630 }
631 table_infos.insert(table_info.table_id(), table.table_info());
632 } else {
633 let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
636 .context(CreatePartitionRulesSnafu)?;
637 let index_columns =
642 [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
643 for index_column in index_columns {
644 if let Some(col) = create_table
645 .column_defs
646 .iter_mut()
647 .find(|c| c.name == index_column)
648 {
649 col.options =
650 options_from_skipping(&SkippingIndexOptions::default())
651 .context(ColumnOptionsSnafu)?;
652 } else {
653 warn!(
654 "Column {} not found when creating index for trace table: {}.",
655 index_column, create_table.table_name
656 );
657 }
658 }
659
660 create_table.table_options.insert(
662 TABLE_DATA_MODEL.to_string(),
663 TABLE_DATA_MODEL_TRACE_V1.to_string(),
664 );
665
666 let table = self
667 .create_physical_table(
668 create_table,
669 Some(partitions),
670 ctx,
671 statement_executor,
672 )
673 .await?;
674 let table_info = table.table_info();
675 if table_info.is_ttl_instant_table() {
676 instant_table_ids.insert(table_info.table_id());
677 }
678 table_infos.insert(table_info.table_id(), table.table_info());
679 }
680 }
681 for alter_expr in alter_tables.into_iter() {
682 statement_executor
683 .alter_table_inner(alter_expr, ctx.clone())
684 .await?;
685 }
686 }
687 }
688
689 Ok(CreateAlterTableResult {
690 instant_table_ids,
691 table_infos,
692 })
693 }
694
695 async fn create_physical_table_on_demand(
696 &self,
697 ctx: &QueryContextRef,
698 physical_table: String,
699 statement_executor: &StatementExecutor,
700 ) -> Result<()> {
701 let catalog_name = ctx.current_catalog();
702 let schema_name = ctx.current_schema();
703
704 if self
706 .get_table(catalog_name, &schema_name, &physical_table)
707 .await?
708 .is_some()
709 {
710 return Ok(());
711 }
712
713 let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
714 info!("Physical metric table `{table_reference}` does not exist, try creating table");
715
716 let default_schema = vec![
718 ColumnSchema {
719 column_name: GREPTIME_TIMESTAMP.to_string(),
720 datatype: ColumnDataType::TimestampMillisecond as _,
721 semantic_type: SemanticType::Timestamp as _,
722 datatype_extension: None,
723 options: None,
724 },
725 ColumnSchema {
726 column_name: GREPTIME_VALUE.to_string(),
727 datatype: ColumnDataType::Float64 as _,
728 semantic_type: SemanticType::Field as _,
729 datatype_extension: None,
730 options: None,
731 },
732 ];
733 let create_table_expr =
734 &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
735
736 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
737 create_table_expr
738 .table_options
739 .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
740
741 let res = statement_executor
743 .create_table_inner(create_table_expr, None, ctx.clone())
744 .await;
745
746 match res {
747 Ok(_) => {
748 info!("Successfully created table {table_reference}",);
749 Ok(())
750 }
751 Err(err) => {
752 error!(err; "Failed to create table {table_reference}");
753 Err(err)
754 }
755 }
756 }
757
758 async fn get_table(
759 &self,
760 catalog: &str,
761 schema: &str,
762 table: &str,
763 ) -> Result<Option<TableRef>> {
764 self.catalog_manager
765 .table(catalog, schema, table, None)
766 .await
767 .context(CatalogSnafu)
768 }
769
770 fn get_create_table_expr_on_demand(
771 &self,
772 req: &RowInsertRequest,
773 create_type: &AutoCreateTableType,
774 ctx: &QueryContextRef,
775 ) -> Result<CreateTableExpr> {
776 let mut table_options = std::collections::HashMap::with_capacity(4);
777 fill_table_options_for_create(&mut table_options, create_type, ctx);
778
779 let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
780 METRIC_ENGINE_NAME
782 } else {
783 default_engine()
784 };
785
786 let schema = ctx.current_schema();
787 let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
788 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
790 let mut create_table_expr =
791 build_create_table_expr(&table_ref, request_schema, engine_name)?;
792
793 info!("Table `{table_ref}` does not exist, try creating table");
794 create_table_expr.table_options.extend(table_options);
795 Ok(create_table_expr)
796 }
797
798 fn get_alter_table_expr_on_demand(
806 &self,
807 req: &mut RowInsertRequest,
808 table: &TableRef,
809 ctx: &QueryContextRef,
810 accommodate_existing_schema: bool,
811 is_single_value: bool,
812 ) -> Result<Option<AlterTableExpr>> {
813 let catalog_name = ctx.current_catalog();
814 let schema_name = ctx.current_schema();
815 let table_name = table.table_info().name.clone();
816
817 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
818 let column_exprs = ColumnExpr::from_column_schemas(request_schema);
819 let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
820 let Some(mut add_columns) = add_columns else {
821 return Ok(None);
822 };
823
824 if accommodate_existing_schema {
826 let table_schema = table.schema();
827 let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
829 let mut field_col_name = None;
831 if is_single_value {
832 let mut multiple_field_cols = false;
833 table.field_columns().for_each(|col| {
834 if field_col_name.is_none() {
835 field_col_name = Some(col.name.clone());
836 } else {
837 multiple_field_cols = true;
838 }
839 });
840 if multiple_field_cols {
841 field_col_name = None;
842 }
843 }
844
845 if let Some(rows) = req.rows.as_mut() {
847 for col in &mut rows.schema {
848 match col.semantic_type {
849 x if x == SemanticType::Timestamp as i32 => {
850 if let Some(ref ts_name) = ts_col_name {
851 if col.column_name != *ts_name {
852 col.column_name = ts_name.clone();
853 }
854 }
855 }
856 x if x == SemanticType::Field as i32 => {
857 if let Some(ref field_name) = field_col_name {
858 if col.column_name != *field_name {
859 col.column_name = field_name.clone();
860 }
861 }
862 }
863 _ => {}
864 }
865 }
866 }
867
868 add_columns.add_columns.retain(|col| {
870 let def = col.column_def.as_ref().unwrap();
871 def.semantic_type == SemanticType::Tag as i32
872 || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
873 });
874
875 if add_columns.add_columns.is_empty() {
876 return Ok(None);
877 }
878 }
879
880 Ok(Some(AlterTableExpr {
881 catalog_name: catalog_name.to_string(),
882 schema_name: schema_name.to_string(),
883 table_name: table_name.to_string(),
884 kind: Some(Kind::AddColumns(add_columns)),
885 }))
886 }
887
888 async fn create_physical_table(
890 &self,
891 mut create_table_expr: CreateTableExpr,
892 partitions: Option<Partitions>,
893 ctx: &QueryContextRef,
894 statement_executor: &StatementExecutor,
895 ) -> Result<TableRef> {
896 {
897 let table_ref = TableReference::full(
898 &create_table_expr.catalog_name,
899 &create_table_expr.schema_name,
900 &create_table_expr.table_name,
901 );
902
903 info!("Table `{table_ref}` does not exist, try creating table");
904 }
905 let res = statement_executor
906 .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
907 .await;
908
909 let table_ref = TableReference::full(
910 &create_table_expr.catalog_name,
911 &create_table_expr.schema_name,
912 &create_table_expr.table_name,
913 );
914
915 match res {
916 Ok(table) => {
917 info!(
918 "Successfully created table {} with options: {:?}",
919 table_ref, create_table_expr.table_options,
920 );
921 Ok(table)
922 }
923 Err(err) => {
924 error!(err; "Failed to create table {}", table_ref);
925 Err(err)
926 }
927 }
928 }
929
930 async fn create_logical_tables(
931 &self,
932 create_table_exprs: Vec<CreateTableExpr>,
933 ctx: &QueryContextRef,
934 statement_executor: &StatementExecutor,
935 ) -> Result<Vec<TableRef>> {
936 let res = statement_executor
937 .create_logical_tables(&create_table_exprs, ctx.clone())
938 .await;
939
940 match res {
941 Ok(res) => {
942 info!("Successfully created logical tables");
943 Ok(res)
944 }
945 Err(err) => {
946 let failed_tables = create_table_exprs
947 .into_iter()
948 .map(|expr| {
949 format!(
950 "{}.{}.{}",
951 expr.catalog_name, expr.schema_name, expr.table_name
952 )
953 })
954 .collect::<Vec<_>>();
955 error!(
956 err;
957 "Failed to create logical tables {:?}",
958 failed_tables
959 );
960 Err(err)
961 }
962 }
963 }
964
965 pub fn node_manager(&self) -> &NodeManagerRef {
966 &self.node_manager
967 }
968
969 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
970 &self.partition_manager
971 }
972}
973
974fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
975 for request in &requests.inserts {
976 let rows = request.rows.as_ref().unwrap();
977 let column_count = rows.schema.len();
978 rows.rows.iter().try_for_each(|r| {
979 ensure!(
980 r.values.len() == column_count,
981 InvalidInsertRequestSnafu {
982 reason: format!(
983 "column count mismatch, columns: {}, values: {}",
984 column_count,
985 r.values.len()
986 )
987 }
988 );
989 Ok(())
990 })?;
991 }
992 Ok(())
993}
994
995pub fn fill_table_options_for_create(
997 table_options: &mut std::collections::HashMap<String, String>,
998 create_type: &AutoCreateTableType,
999 ctx: &QueryContextRef,
1000) {
1001 for key in VALID_TABLE_OPTION_KEYS {
1002 if let Some(value) = ctx.extension(key) {
1003 table_options.insert(key.to_string(), value.to_string());
1004 }
1005 }
1006
1007 match create_type {
1008 AutoCreateTableType::Logical(physical_table) => {
1009 table_options.insert(
1010 LOGICAL_TABLE_METADATA_KEY.to_string(),
1011 physical_table.to_string(),
1012 );
1013 }
1014 AutoCreateTableType::Physical => {
1015 if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1016 table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1017 }
1018 if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1019 table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1020 }
1021 }
1022 AutoCreateTableType::Log => {
1025 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1026 }
1027 AutoCreateTableType::LastNonNull => {
1028 table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1029 }
1030 AutoCreateTableType::Trace => {
1031 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1032 }
1033 }
1034}
1035
1036pub fn build_create_table_expr(
1037 table: &TableReference,
1038 request_schema: &[ColumnSchema],
1039 engine: &str,
1040) -> Result<CreateTableExpr> {
1041 expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1042}
1043
1044struct CreateAlterTableResult {
1046 instant_table_ids: HashSet<TableId>,
1048 table_infos: HashMap<TableId, Arc<TableInfo>>,
1050}
1051
1052struct FlowMirrorTask {
1053 requests: HashMap<Peer, RegionInsertRequests>,
1054 num_rows: usize,
1055}
1056
1057impl FlowMirrorTask {
1058 async fn new(
1059 cache: &TableFlownodeSetCacheRef,
1060 requests: impl Iterator<Item = &RegionInsertRequest>,
1061 ) -> Result<Self> {
1062 let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1063 HashMap::new();
1064 let mut num_rows = 0;
1065
1066 for req in requests {
1067 let table_id = RegionId::from_u64(req.region_id).table_id();
1068 match src_table_reqs.get_mut(&table_id) {
1069 Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1070 Some(None) => continue,
1072 _ => {
1073 let peers = cache
1075 .get(table_id)
1076 .await
1077 .context(RequestInsertsSnafu)?
1078 .unwrap_or_default()
1079 .values()
1080 .cloned()
1081 .collect::<HashSet<_>>()
1082 .into_iter()
1083 .collect::<Vec<_>>();
1084
1085 if !peers.is_empty() {
1086 let mut reqs = RegionInsertRequests::default();
1087 reqs.requests.push(req.clone());
1088 num_rows += reqs
1089 .requests
1090 .iter()
1091 .map(|r| r.rows.as_ref().unwrap().rows.len())
1092 .sum::<usize>();
1093 src_table_reqs.insert(table_id, Some((peers, reqs)));
1094 } else {
1095 src_table_reqs.insert(table_id, None);
1097 }
1098 }
1099 }
1100 }
1101
1102 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1103
1104 for (_table_id, (peers, reqs)) in src_table_reqs
1105 .into_iter()
1106 .filter_map(|(k, v)| v.map(|v| (k, v)))
1107 {
1108 if peers.len() == 1 {
1109 inserts
1111 .entry(peers[0].clone())
1112 .or_default()
1113 .requests
1114 .extend(reqs.requests);
1115 continue;
1116 } else {
1117 for flownode in peers {
1119 inserts
1120 .entry(flownode.clone())
1121 .or_default()
1122 .requests
1123 .extend(reqs.requests.clone());
1124 }
1125 }
1126 }
1127
1128 Ok(Self {
1129 requests: inserts,
1130 num_rows,
1131 })
1132 }
1133
1134 fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1135 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1136 for (peer, inserts) in self.requests {
1137 let node_manager = node_manager.clone();
1138 common_runtime::spawn_global(async move {
1139 let result = node_manager
1140 .flownode(&peer)
1141 .await
1142 .handle_inserts(inserts)
1143 .await
1144 .context(RequestInsertsSnafu);
1145
1146 match result {
1147 Ok(resp) => {
1148 let affected_rows = resp.affected_rows;
1149 crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1150 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1151 }
1152 Err(err) => {
1153 error!(err; "Failed to insert data into flownode {}", peer);
1154 }
1155 }
1156 });
1157 }
1158
1159 Ok(())
1160 }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165 use std::sync::Arc;
1166
1167 use api::v1::helper::{field_column_schema, time_index_column_schema};
1168 use api::v1::{RowInsertRequest, Rows, Value};
1169 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1170 use common_meta::cache::new_table_flownode_set_cache;
1171 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1172 use common_meta::test_util::MockDatanodeManager;
1173 use datatypes::data_type::ConcreteDataType;
1174 use datatypes::schema::ColumnSchema;
1175 use moka::future::Cache;
1176 use session::context::QueryContext;
1177 use table::dist_table::DummyDataSource;
1178 use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1179 use table::TableRef;
1180
1181 use super::*;
1182 use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1183
1184 fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1185 let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1186 ColumnSchema::new(
1187 ts_name,
1188 ConcreteDataType::timestamp_millisecond_datatype(),
1189 false,
1190 )
1191 .with_time_index(true),
1192 ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1193 ])
1194 .unwrap()
1195 .build()
1196 .unwrap();
1197 let meta = TableMetaBuilder::empty()
1198 .schema(Arc::new(schema))
1199 .primary_key_indices(vec![])
1200 .value_indices(vec![1])
1201 .engine("mito")
1202 .next_column_id(0)
1203 .options(Default::default())
1204 .created_on(Default::default())
1205 .region_numbers(vec![0])
1206 .build()
1207 .unwrap();
1208 let info = Arc::new(
1209 TableInfoBuilder::default()
1210 .table_id(1)
1211 .table_version(0)
1212 .name("test_table")
1213 .schema_name(DEFAULT_SCHEMA_NAME)
1214 .catalog_name(DEFAULT_CATALOG_NAME)
1215 .desc(None)
1216 .table_type(TableType::Base)
1217 .meta(meta)
1218 .build()
1219 .unwrap(),
1220 );
1221 Arc::new(table::Table::new(
1222 info,
1223 table::metadata::FilterPushDownType::Unsupported,
1224 Arc::new(DummyDataSource),
1225 ))
1226 }
1227
1228 #[tokio::test]
1229 async fn test_accommodate_existing_schema_logic() {
1230 let ts_name = "my_ts";
1231 let field_name = "my_field";
1232 let table = make_table_ref_with_schema(ts_name, field_name);
1233
1234 let mut req = RowInsertRequest {
1236 table_name: "test_table".to_string(),
1237 rows: Some(Rows {
1238 schema: vec![
1239 time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1240 field_column_schema("field_wrong", ColumnDataType::Float64),
1241 ],
1242 rows: vec![api::v1::Row {
1243 values: vec![Value::default(), Value::default()],
1244 }],
1245 }),
1246 };
1247 let ctx = Arc::new(QueryContext::with(
1248 DEFAULT_CATALOG_NAME,
1249 DEFAULT_SCHEMA_NAME,
1250 ));
1251
1252 let kv_backend = prepare_mocked_backend().await;
1253 let inserter = Inserter::new(
1254 catalog::memory::MemoryCatalogManager::new(),
1255 create_partition_rule_manager(kv_backend.clone()).await,
1256 Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1257 Arc::new(new_table_flownode_set_cache(
1258 String::new(),
1259 Cache::new(100),
1260 kv_backend.clone(),
1261 )),
1262 );
1263 let alter_expr = inserter
1264 .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1265 .unwrap();
1266 assert!(alter_expr.is_none());
1267
1268 let req_schema = req.rows.as_ref().unwrap().schema.clone();
1270 assert_eq!(req_schema[0].column_name, ts_name);
1271 assert_eq!(req_schema[1].column_name, field_name);
1272 }
1273}