1use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21 InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22 RegionRequestHeader,
23};
24use api::v1::{
25 AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26 RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31 PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32 TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33 trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54 LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57 APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TWCS_TIME_WINDOW,
58};
59use store_api::storage::{RegionId, TableId};
60use table::TableRef;
61use table::metadata::TableInfo;
62use table::requests::{
63 AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
64 TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
65};
66use table::table_reference::TableReference;
67
68use crate::error::{
69 CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
70 InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
71};
72use crate::expr_helper;
73use crate::region_req_factory::RegionRequestFactory;
74use crate::req_convert::common::preprocess_row_insert_requests;
75use crate::req_convert::insert::{
76 ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
77};
78use crate::statement::StatementExecutor;
79
80pub struct Inserter {
81 catalog_manager: CatalogManagerRef,
82 pub(crate) partition_manager: PartitionRuleManagerRef,
83 pub(crate) node_manager: NodeManagerRef,
84 pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
85}
86
87pub type InserterRef = Arc<Inserter>;
88
89#[derive(Clone)]
91pub enum AutoCreateTableType {
92 Logical(String),
94 Physical,
96 Log,
98 LastNonNull,
100 Trace,
102}
103
104impl AutoCreateTableType {
105 fn as_str(&self) -> &'static str {
106 match self {
107 AutoCreateTableType::Logical(_) => "logical",
108 AutoCreateTableType::Physical => "physical",
109 AutoCreateTableType::Log => "log",
110 AutoCreateTableType::LastNonNull => "last_non_null",
111 AutoCreateTableType::Trace => "trace",
112 }
113 }
114}
115
116#[derive(Clone)]
123pub struct InstantAndNormalInsertRequests {
124 pub normal_requests: RegionInsertRequests,
126 pub instant_requests: RegionInsertRequests,
129}
130
131impl Inserter {
132 pub fn new(
133 catalog_manager: CatalogManagerRef,
134 partition_manager: PartitionRuleManagerRef,
135 node_manager: NodeManagerRef,
136 table_flownode_set_cache: TableFlownodeSetCacheRef,
137 ) -> Self {
138 Self {
139 catalog_manager,
140 partition_manager,
141 node_manager,
142 table_flownode_set_cache,
143 }
144 }
145
146 pub async fn handle_column_inserts(
147 &self,
148 requests: InsertRequests,
149 ctx: QueryContextRef,
150 statement_executor: &StatementExecutor,
151 ) -> Result<Output> {
152 let row_inserts = ColumnToRow::convert(requests)?;
153 self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
154 .await
155 }
156
157 pub async fn handle_row_inserts(
159 &self,
160 mut requests: RowInsertRequests,
161 ctx: QueryContextRef,
162 statement_executor: &StatementExecutor,
163 accommodate_existing_schema: bool,
164 is_single_value: bool,
165 ) -> Result<Output> {
166 preprocess_row_insert_requests(&mut requests.inserts)?;
167 self.handle_row_inserts_with_create_type(
168 requests,
169 ctx,
170 statement_executor,
171 AutoCreateTableType::Physical,
172 accommodate_existing_schema,
173 is_single_value,
174 )
175 .await
176 }
177
178 pub async fn handle_log_inserts(
180 &self,
181 requests: RowInsertRequests,
182 ctx: QueryContextRef,
183 statement_executor: &StatementExecutor,
184 ) -> Result<Output> {
185 self.handle_row_inserts_with_create_type(
186 requests,
187 ctx,
188 statement_executor,
189 AutoCreateTableType::Log,
190 false,
191 false,
192 )
193 .await
194 }
195
196 pub async fn handle_trace_inserts(
197 &self,
198 requests: RowInsertRequests,
199 ctx: QueryContextRef,
200 statement_executor: &StatementExecutor,
201 ) -> Result<Output> {
202 self.handle_row_inserts_with_create_type(
203 requests,
204 ctx,
205 statement_executor,
206 AutoCreateTableType::Trace,
207 false,
208 false,
209 )
210 .await
211 }
212
213 pub async fn handle_last_non_null_inserts(
215 &self,
216 requests: RowInsertRequests,
217 ctx: QueryContextRef,
218 statement_executor: &StatementExecutor,
219 accommodate_existing_schema: bool,
220 is_single_value: bool,
221 ) -> Result<Output> {
222 self.handle_row_inserts_with_create_type(
223 requests,
224 ctx,
225 statement_executor,
226 AutoCreateTableType::LastNonNull,
227 accommodate_existing_schema,
228 is_single_value,
229 )
230 .await
231 }
232
233 async fn handle_row_inserts_with_create_type(
235 &self,
236 mut requests: RowInsertRequests,
237 ctx: QueryContextRef,
238 statement_executor: &StatementExecutor,
239 create_type: AutoCreateTableType,
240 accommodate_existing_schema: bool,
241 is_single_value: bool,
242 ) -> Result<Output> {
243 requests.inserts.retain(|req| {
245 req.rows
246 .as_ref()
247 .map(|r| !r.rows.is_empty())
248 .unwrap_or_default()
249 });
250 validate_column_count_match(&requests)?;
251
252 let CreateAlterTableResult {
253 instant_table_ids,
254 table_infos,
255 } = self
256 .create_or_alter_tables_on_demand(
257 &mut requests,
258 &ctx,
259 create_type,
260 statement_executor,
261 accommodate_existing_schema,
262 is_single_value,
263 )
264 .await?;
265
266 let name_to_info = table_infos
267 .values()
268 .map(|info| (info.name.clone(), info.clone()))
269 .collect::<HashMap<_, _>>();
270 let inserts = RowToRegion::new(
271 name_to_info,
272 instant_table_ids,
273 self.partition_manager.as_ref(),
274 )
275 .convert(requests)
276 .await?;
277
278 self.do_request(inserts, &table_infos, &ctx).await
279 }
280
281 pub async fn handle_metric_row_inserts(
283 &self,
284 mut requests: RowInsertRequests,
285 ctx: QueryContextRef,
286 statement_executor: &StatementExecutor,
287 physical_table: String,
288 ) -> Result<Output> {
289 requests.inserts.retain(|req| {
291 req.rows
292 .as_ref()
293 .map(|r| !r.rows.is_empty())
294 .unwrap_or_default()
295 });
296 validate_column_count_match(&requests)?;
297
298 self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
300 .await?;
301
302 let CreateAlterTableResult {
304 instant_table_ids,
305 table_infos,
306 } = self
307 .create_or_alter_tables_on_demand(
308 &mut requests,
309 &ctx,
310 AutoCreateTableType::Logical(physical_table.clone()),
311 statement_executor,
312 true,
313 true,
314 )
315 .await?;
316 let name_to_info = table_infos
317 .values()
318 .map(|info| (info.name.clone(), info.clone()))
319 .collect::<HashMap<_, _>>();
320 let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
321 .convert(requests)
322 .await?;
323
324 self.do_request(inserts, &table_infos, &ctx).await
325 }
326
327 pub async fn handle_table_insert(
328 &self,
329 request: TableInsertRequest,
330 ctx: QueryContextRef,
331 ) -> Result<Output> {
332 let catalog = request.catalog_name.as_str();
333 let schema = request.schema_name.as_str();
334 let table_name = request.table_name.as_str();
335 let table = self.get_table(catalog, schema, table_name).await?;
336 let table = table.with_context(|| TableNotFoundSnafu {
337 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
338 })?;
339 let table_info = table.table_info();
340
341 let inserts = TableToRegion::new(&table_info, &self.partition_manager)
342 .convert(request)
343 .await?;
344
345 let table_infos =
346 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
347
348 self.do_request(inserts, &table_infos, &ctx).await
349 }
350
351 pub async fn handle_statement_insert(
352 &self,
353 insert: &Insert,
354 ctx: &QueryContextRef,
355 ) -> Result<Output> {
356 let (inserts, table_info) =
357 StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
358 .convert(insert, ctx)
359 .await?;
360
361 let table_infos =
362 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
363
364 self.do_request(inserts, &table_infos, ctx).await
365 }
366}
367
368impl Inserter {
369 async fn do_request(
370 &self,
371 requests: InstantAndNormalInsertRequests,
372 table_infos: &HashMap<TableId, Arc<TableInfo>>,
373 ctx: &QueryContextRef,
374 ) -> Result<Output> {
375 let requests = fill_reqs_with_impure_default(table_infos, requests)?;
377
378 let write_cost = write_meter!(
379 ctx.current_catalog(),
380 ctx.current_schema(),
381 requests,
382 ctx.channel() as u8
383 );
384 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
385 tracing_context: TracingContext::from_current_span().to_w3c(),
386 dbname: ctx.get_db_string(),
387 ..Default::default()
388 });
389
390 let InstantAndNormalInsertRequests {
391 normal_requests,
392 instant_requests,
393 } = requests;
394
395 let flow_mirror_task = FlowMirrorTask::new(
397 &self.table_flownode_set_cache,
398 normal_requests
399 .requests
400 .iter()
401 .chain(instant_requests.requests.iter()),
402 )
403 .await?;
404 flow_mirror_task.detach(self.node_manager.clone())?;
405
406 let write_tasks = self
408 .group_requests_by_peer(normal_requests)
409 .await?
410 .into_iter()
411 .map(|(peer, inserts)| {
412 let node_manager = self.node_manager.clone();
413 let request = request_factory.build_insert(inserts);
414 common_runtime::spawn_global(async move {
415 node_manager
416 .datanode(&peer)
417 .await
418 .handle(request)
419 .await
420 .context(RequestInsertsSnafu)
421 })
422 });
423 let results = future::try_join_all(write_tasks)
424 .await
425 .context(JoinTaskSnafu)?;
426 let affected_rows = results
427 .into_iter()
428 .map(|resp| resp.map(|r| r.affected_rows))
429 .sum::<Result<AffectedRows>>()?;
430 crate::metrics::DIST_INGEST_ROW_COUNT
431 .with_label_values(&[ctx.get_db_string().as_str()])
432 .inc_by(affected_rows as u64);
433 Ok(Output::new(
434 OutputData::AffectedRows(affected_rows),
435 OutputMeta::new_with_cost(write_cost as _),
436 ))
437 }
438
439 async fn group_requests_by_peer(
440 &self,
441 requests: RegionInsertRequests,
442 ) -> Result<HashMap<Peer, RegionInsertRequests>> {
443 let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
446 for req in requests.requests {
447 let region_id = RegionId::from_u64(req.region_id);
448 requests_per_region
449 .entry(region_id)
450 .or_default()
451 .requests
452 .push(req);
453 }
454
455 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
456
457 for (region_id, reqs) in requests_per_region {
458 let peer = self
459 .partition_manager
460 .find_region_leader(region_id)
461 .await
462 .context(FindRegionLeaderSnafu)?;
463 inserts
464 .entry(peer)
465 .or_default()
466 .requests
467 .extend(reqs.requests);
468 }
469
470 Ok(inserts)
471 }
472
473 async fn create_or_alter_tables_on_demand(
486 &self,
487 requests: &mut RowInsertRequests,
488 ctx: &QueryContextRef,
489 auto_create_table_type: AutoCreateTableType,
490 statement_executor: &StatementExecutor,
491 accommodate_existing_schema: bool,
492 is_single_value: bool,
493 ) -> Result<CreateAlterTableResult> {
494 let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
495 .with_label_values(&[auto_create_table_type.as_str()])
496 .start_timer();
497
498 let catalog = ctx.current_catalog();
499 let schema = ctx.current_schema();
500
501 let mut table_infos = HashMap::new();
502 let auto_create_table_hint = ctx
504 .extension(AUTO_CREATE_TABLE_KEY)
505 .map(|v| v.parse::<bool>())
506 .transpose()
507 .map_err(|_| {
508 InvalidInsertRequestSnafu {
509 reason: "`auto_create_table` hint must be a boolean",
510 }
511 .build()
512 })?
513 .unwrap_or(true);
514 if !auto_create_table_hint {
515 let mut instant_table_ids = HashSet::new();
516 for req in &requests.inserts {
517 let table = self
518 .get_table(catalog, &schema, &req.table_name)
519 .await?
520 .context(InvalidInsertRequestSnafu {
521 reason: format!(
522 "Table `{}` does not exist, and `auto_create_table` hint is disabled",
523 req.table_name
524 ),
525 })?;
526 let table_info = table.table_info();
527 if table_info.is_ttl_instant_table() {
528 instant_table_ids.insert(table_info.table_id());
529 }
530 table_infos.insert(table_info.table_id(), table.table_info());
531 }
532 let ret = CreateAlterTableResult {
533 instant_table_ids,
534 table_infos,
535 };
536 return Ok(ret);
537 }
538
539 let mut create_tables = vec![];
540 let mut alter_tables = vec![];
541 let mut instant_table_ids = HashSet::new();
542
543 for req in &mut requests.inserts {
544 match self.get_table(catalog, &schema, &req.table_name).await? {
545 Some(table) => {
546 let table_info = table.table_info();
547 if table_info.is_ttl_instant_table() {
548 instant_table_ids.insert(table_info.table_id());
549 }
550 table_infos.insert(table_info.table_id(), table.table_info());
551 if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
552 req,
553 &table,
554 ctx,
555 accommodate_existing_schema,
556 is_single_value,
557 )? {
558 alter_tables.push(alter_expr);
559 }
560 }
561 None => {
562 let create_expr =
563 self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
564 create_tables.push(create_expr);
565 }
566 }
567 }
568
569 match auto_create_table_type {
570 AutoCreateTableType::Logical(_) => {
571 if !create_tables.is_empty() {
572 let tables = self
574 .create_logical_tables(create_tables, ctx, statement_executor)
575 .await?;
576
577 for table in tables {
578 let table_info = table.table_info();
579 if table_info.is_ttl_instant_table() {
580 instant_table_ids.insert(table_info.table_id());
581 }
582 table_infos.insert(table_info.table_id(), table.table_info());
583 }
584 }
585 if !alter_tables.is_empty() {
586 statement_executor
588 .alter_logical_tables(alter_tables, ctx.clone())
589 .await?;
590 }
591 }
592 AutoCreateTableType::Physical
593 | AutoCreateTableType::Log
594 | AutoCreateTableType::LastNonNull => {
595 for create_table in create_tables {
598 let table = self
599 .create_physical_table(create_table, None, ctx, statement_executor)
600 .await?;
601 let table_info = table.table_info();
602 if table_info.is_ttl_instant_table() {
603 instant_table_ids.insert(table_info.table_id());
604 }
605 table_infos.insert(table_info.table_id(), table.table_info());
606 }
607 for alter_expr in alter_tables.into_iter() {
608 statement_executor
609 .alter_table_inner(alter_expr, ctx.clone())
610 .await?;
611 }
612 }
613
614 AutoCreateTableType::Trace => {
615 let trace_table_name = ctx
616 .extension(TRACE_TABLE_NAME_SESSION_KEY)
617 .unwrap_or(TRACE_TABLE_NAME);
618
619 for mut create_table in create_tables {
622 if create_table.table_name == trace_services_table_name(trace_table_name)
623 || create_table.table_name == trace_operations_table_name(trace_table_name)
624 {
625 create_table
627 .table_options
628 .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
629 let table = self
630 .create_physical_table(create_table, None, ctx, statement_executor)
631 .await?;
632 let table_info = table.table_info();
633 if table_info.is_ttl_instant_table() {
634 instant_table_ids.insert(table_info.table_id());
635 }
636 table_infos.insert(table_info.table_id(), table.table_info());
637 } else {
638 let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
641 .context(CreatePartitionRulesSnafu)?;
642 let index_columns =
647 [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
648 for index_column in index_columns {
649 if let Some(col) = create_table
650 .column_defs
651 .iter_mut()
652 .find(|c| c.name == index_column)
653 {
654 col.options =
655 options_from_skipping(&SkippingIndexOptions::default())
656 .context(ColumnOptionsSnafu)?;
657 } else {
658 warn!(
659 "Column {} not found when creating index for trace table: {}.",
660 index_column, create_table.table_name
661 );
662 }
663 }
664
665 create_table.table_options.insert(
667 TABLE_DATA_MODEL.to_string(),
668 TABLE_DATA_MODEL_TRACE_V1.to_string(),
669 );
670
671 let table = self
672 .create_physical_table(
673 create_table,
674 Some(partitions),
675 ctx,
676 statement_executor,
677 )
678 .await?;
679 let table_info = table.table_info();
680 if table_info.is_ttl_instant_table() {
681 instant_table_ids.insert(table_info.table_id());
682 }
683 table_infos.insert(table_info.table_id(), table.table_info());
684 }
685 }
686 for alter_expr in alter_tables.into_iter() {
687 statement_executor
688 .alter_table_inner(alter_expr, ctx.clone())
689 .await?;
690 }
691 }
692 }
693
694 Ok(CreateAlterTableResult {
695 instant_table_ids,
696 table_infos,
697 })
698 }
699
700 async fn create_physical_table_on_demand(
701 &self,
702 ctx: &QueryContextRef,
703 physical_table: String,
704 statement_executor: &StatementExecutor,
705 ) -> Result<()> {
706 let catalog_name = ctx.current_catalog();
707 let schema_name = ctx.current_schema();
708
709 if self
711 .get_table(catalog_name, &schema_name, &physical_table)
712 .await?
713 .is_some()
714 {
715 return Ok(());
716 }
717
718 let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
719 info!("Physical metric table `{table_reference}` does not exist, try creating table");
720
721 let default_schema = vec![
723 ColumnSchema {
724 column_name: greptime_timestamp().to_string(),
725 datatype: ColumnDataType::TimestampMillisecond as _,
726 semantic_type: SemanticType::Timestamp as _,
727 datatype_extension: None,
728 options: None,
729 },
730 ColumnSchema {
731 column_name: greptime_value().to_string(),
732 datatype: ColumnDataType::Float64 as _,
733 semantic_type: SemanticType::Field as _,
734 datatype_extension: None,
735 options: None,
736 },
737 ];
738 let create_table_expr =
739 &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
740
741 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
742 create_table_expr
743 .table_options
744 .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
745
746 let res = statement_executor
748 .create_table_inner(create_table_expr, None, ctx.clone())
749 .await;
750
751 match res {
752 Ok(_) => {
753 info!("Successfully created table {table_reference}",);
754 Ok(())
755 }
756 Err(err) => {
757 error!(err; "Failed to create table {table_reference}");
758 Err(err)
759 }
760 }
761 }
762
763 async fn get_table(
764 &self,
765 catalog: &str,
766 schema: &str,
767 table: &str,
768 ) -> Result<Option<TableRef>> {
769 self.catalog_manager
770 .table(catalog, schema, table, None)
771 .await
772 .context(CatalogSnafu)
773 }
774
775 fn get_create_table_expr_on_demand(
776 &self,
777 req: &RowInsertRequest,
778 create_type: &AutoCreateTableType,
779 ctx: &QueryContextRef,
780 ) -> Result<CreateTableExpr> {
781 let mut table_options = std::collections::HashMap::with_capacity(4);
782 fill_table_options_for_create(&mut table_options, create_type, ctx);
783
784 let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
785 METRIC_ENGINE_NAME
787 } else {
788 default_engine()
789 };
790
791 let schema = ctx.current_schema();
792 let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
793 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
795 let mut create_table_expr =
796 build_create_table_expr(&table_ref, request_schema, engine_name)?;
797
798 info!("Table `{table_ref}` does not exist, try creating table");
799 create_table_expr.table_options.extend(table_options);
800 Ok(create_table_expr)
801 }
802
803 fn get_alter_table_expr_on_demand(
811 &self,
812 req: &mut RowInsertRequest,
813 table: &TableRef,
814 ctx: &QueryContextRef,
815 accommodate_existing_schema: bool,
816 is_single_value: bool,
817 ) -> Result<Option<AlterTableExpr>> {
818 let catalog_name = ctx.current_catalog();
819 let schema_name = ctx.current_schema();
820 let table_name = table.table_info().name.clone();
821
822 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
823 let column_exprs = ColumnExpr::from_column_schemas(request_schema);
824 let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
825 let Some(mut add_columns) = add_columns else {
826 return Ok(None);
827 };
828
829 if accommodate_existing_schema {
831 let table_schema = table.schema();
832 let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
834 let mut field_col_name = None;
836 if is_single_value {
837 let mut multiple_field_cols = false;
838 table.field_columns().for_each(|col| {
839 if field_col_name.is_none() {
840 field_col_name = Some(col.name.clone());
841 } else {
842 multiple_field_cols = true;
843 }
844 });
845 if multiple_field_cols {
846 field_col_name = None;
847 }
848 }
849
850 if let Some(rows) = req.rows.as_mut() {
852 for col in &mut rows.schema {
853 match col.semantic_type {
854 x if x == SemanticType::Timestamp as i32 => {
855 if let Some(ref ts_name) = ts_col_name
856 && col.column_name != *ts_name
857 {
858 col.column_name = ts_name.clone();
859 }
860 }
861 x if x == SemanticType::Field as i32 => {
862 if let Some(ref field_name) = field_col_name
863 && col.column_name != *field_name
864 {
865 col.column_name = field_name.clone();
866 }
867 }
868 _ => {}
869 }
870 }
871 }
872
873 add_columns.add_columns.retain(|col| {
875 let def = col.column_def.as_ref().unwrap();
876 def.semantic_type == SemanticType::Tag as i32
877 || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
878 });
879
880 if add_columns.add_columns.is_empty() {
881 return Ok(None);
882 }
883 }
884
885 Ok(Some(AlterTableExpr {
886 catalog_name: catalog_name.to_string(),
887 schema_name: schema_name.clone(),
888 table_name: table_name.clone(),
889 kind: Some(Kind::AddColumns(add_columns)),
890 }))
891 }
892
893 async fn create_physical_table(
895 &self,
896 mut create_table_expr: CreateTableExpr,
897 partitions: Option<Partitions>,
898 ctx: &QueryContextRef,
899 statement_executor: &StatementExecutor,
900 ) -> Result<TableRef> {
901 {
902 let table_ref = TableReference::full(
903 &create_table_expr.catalog_name,
904 &create_table_expr.schema_name,
905 &create_table_expr.table_name,
906 );
907
908 info!("Table `{table_ref}` does not exist, try creating table");
909 }
910 let res = statement_executor
911 .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
912 .await;
913
914 let table_ref = TableReference::full(
915 &create_table_expr.catalog_name,
916 &create_table_expr.schema_name,
917 &create_table_expr.table_name,
918 );
919
920 match res {
921 Ok(table) => {
922 info!(
923 "Successfully created table {} with options: {:?}",
924 table_ref, create_table_expr.table_options,
925 );
926 Ok(table)
927 }
928 Err(err) => {
929 error!(err; "Failed to create table {}", table_ref);
930 Err(err)
931 }
932 }
933 }
934
935 async fn create_logical_tables(
936 &self,
937 create_table_exprs: Vec<CreateTableExpr>,
938 ctx: &QueryContextRef,
939 statement_executor: &StatementExecutor,
940 ) -> Result<Vec<TableRef>> {
941 let res = statement_executor
942 .create_logical_tables(&create_table_exprs, ctx.clone())
943 .await;
944
945 match res {
946 Ok(res) => {
947 info!("Successfully created logical tables");
948 Ok(res)
949 }
950 Err(err) => {
951 let failed_tables = create_table_exprs
952 .into_iter()
953 .map(|expr| {
954 format!(
955 "{}.{}.{}",
956 expr.catalog_name, expr.schema_name, expr.table_name
957 )
958 })
959 .collect::<Vec<_>>();
960 error!(
961 err;
962 "Failed to create logical tables {:?}",
963 failed_tables
964 );
965 Err(err)
966 }
967 }
968 }
969
970 pub fn node_manager(&self) -> &NodeManagerRef {
971 &self.node_manager
972 }
973
974 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
975 &self.partition_manager
976 }
977}
978
979fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
980 for request in &requests.inserts {
981 let rows = request.rows.as_ref().unwrap();
982 let column_count = rows.schema.len();
983 rows.rows.iter().try_for_each(|r| {
984 ensure!(
985 r.values.len() == column_count,
986 InvalidInsertRequestSnafu {
987 reason: format!(
988 "column count mismatch, columns: {}, values: {}",
989 column_count,
990 r.values.len()
991 )
992 }
993 );
994 Ok(())
995 })?;
996 }
997 Ok(())
998}
999
1000pub fn fill_table_options_for_create(
1002 table_options: &mut std::collections::HashMap<String, String>,
1003 create_type: &AutoCreateTableType,
1004 ctx: &QueryContextRef,
1005) {
1006 for key in VALID_TABLE_OPTION_KEYS {
1007 if let Some(value) = ctx.extension(key) {
1008 table_options.insert(key.to_string(), value.to_string());
1009 }
1010 }
1011
1012 match create_type {
1013 AutoCreateTableType::Logical(physical_table) => {
1014 table_options.insert(
1015 LOGICAL_TABLE_METADATA_KEY.to_string(),
1016 physical_table.clone(),
1017 );
1018 }
1019 AutoCreateTableType::Physical => {
1020 if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1021 table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1022 }
1023 if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1024 table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1025 }
1026 if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1027 table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1028 table_options.insert(
1030 COMPACTION_TYPE.to_string(),
1031 COMPACTION_TYPE_TWCS.to_string(),
1032 );
1033 }
1034 }
1035 AutoCreateTableType::Log => {
1038 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1039 }
1040 AutoCreateTableType::LastNonNull => {
1041 table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1042 }
1043 AutoCreateTableType::Trace => {
1044 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1045 }
1046 }
1047}
1048
1049pub fn build_create_table_expr(
1050 table: &TableReference,
1051 request_schema: &[ColumnSchema],
1052 engine: &str,
1053) -> Result<CreateTableExpr> {
1054 expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1055}
1056
1057struct CreateAlterTableResult {
1059 instant_table_ids: HashSet<TableId>,
1061 table_infos: HashMap<TableId, Arc<TableInfo>>,
1063}
1064
1065struct FlowMirrorTask {
1066 requests: HashMap<Peer, RegionInsertRequests>,
1067 num_rows: usize,
1068}
1069
1070impl FlowMirrorTask {
1071 async fn new(
1072 cache: &TableFlownodeSetCacheRef,
1073 requests: impl Iterator<Item = &RegionInsertRequest>,
1074 ) -> Result<Self> {
1075 let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1076 HashMap::new();
1077 let mut num_rows = 0;
1078
1079 for req in requests {
1080 let table_id = RegionId::from_u64(req.region_id).table_id();
1081 match src_table_reqs.get_mut(&table_id) {
1082 Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1083 Some(None) => continue,
1085 _ => {
1086 let peers = cache
1088 .get(table_id)
1089 .await
1090 .context(RequestInsertsSnafu)?
1091 .unwrap_or_default()
1092 .values()
1093 .cloned()
1094 .collect::<HashSet<_>>()
1095 .into_iter()
1096 .collect::<Vec<_>>();
1097
1098 if !peers.is_empty() {
1099 let mut reqs = RegionInsertRequests::default();
1100 reqs.requests.push(req.clone());
1101 num_rows += reqs
1102 .requests
1103 .iter()
1104 .map(|r| r.rows.as_ref().unwrap().rows.len())
1105 .sum::<usize>();
1106 src_table_reqs.insert(table_id, Some((peers, reqs)));
1107 } else {
1108 src_table_reqs.insert(table_id, None);
1110 }
1111 }
1112 }
1113 }
1114
1115 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1116
1117 for (_table_id, (peers, reqs)) in src_table_reqs
1118 .into_iter()
1119 .filter_map(|(k, v)| v.map(|v| (k, v)))
1120 {
1121 if peers.len() == 1 {
1122 inserts
1124 .entry(peers[0].clone())
1125 .or_default()
1126 .requests
1127 .extend(reqs.requests);
1128 continue;
1129 } else {
1130 for flownode in peers {
1132 inserts
1133 .entry(flownode.clone())
1134 .or_default()
1135 .requests
1136 .extend(reqs.requests.clone());
1137 }
1138 }
1139 }
1140
1141 Ok(Self {
1142 requests: inserts,
1143 num_rows,
1144 })
1145 }
1146
1147 fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1148 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1149 for (peer, inserts) in self.requests {
1150 let node_manager = node_manager.clone();
1151 common_runtime::spawn_global(async move {
1152 let result = node_manager
1153 .flownode(&peer)
1154 .await
1155 .handle_inserts(inserts)
1156 .await
1157 .context(RequestInsertsSnafu);
1158
1159 match result {
1160 Ok(resp) => {
1161 let affected_rows = resp.affected_rows;
1162 crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1163 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1164 }
1165 Err(err) => {
1166 error!(err; "Failed to insert data into flownode {}", peer);
1167 }
1168 }
1169 });
1170 }
1171
1172 Ok(())
1173 }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use std::sync::Arc;
1179
1180 use api::v1::helper::{field_column_schema, time_index_column_schema};
1181 use api::v1::{RowInsertRequest, Rows, Value};
1182 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1183 use common_meta::cache::new_table_flownode_set_cache;
1184 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1185 use common_meta::test_util::MockDatanodeManager;
1186 use datatypes::data_type::ConcreteDataType;
1187 use datatypes::schema::ColumnSchema;
1188 use moka::future::Cache;
1189 use session::context::QueryContext;
1190 use table::TableRef;
1191 use table::dist_table::DummyDataSource;
1192 use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1193
1194 use super::*;
1195 use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1196
1197 fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1198 let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1199 ColumnSchema::new(
1200 ts_name,
1201 ConcreteDataType::timestamp_millisecond_datatype(),
1202 false,
1203 )
1204 .with_time_index(true),
1205 ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1206 ])
1207 .unwrap()
1208 .build()
1209 .unwrap();
1210 let meta = TableMetaBuilder::empty()
1211 .schema(Arc::new(schema))
1212 .primary_key_indices(vec![])
1213 .value_indices(vec![1])
1214 .engine("mito")
1215 .next_column_id(0)
1216 .options(Default::default())
1217 .created_on(Default::default())
1218 .region_numbers(vec![0])
1219 .build()
1220 .unwrap();
1221 let info = Arc::new(
1222 TableInfoBuilder::default()
1223 .table_id(1)
1224 .table_version(0)
1225 .name("test_table")
1226 .schema_name(DEFAULT_SCHEMA_NAME)
1227 .catalog_name(DEFAULT_CATALOG_NAME)
1228 .desc(None)
1229 .table_type(TableType::Base)
1230 .meta(meta)
1231 .build()
1232 .unwrap(),
1233 );
1234 Arc::new(table::Table::new(
1235 info,
1236 table::metadata::FilterPushDownType::Unsupported,
1237 Arc::new(DummyDataSource),
1238 ))
1239 }
1240
1241 #[tokio::test]
1242 async fn test_accommodate_existing_schema_logic() {
1243 let ts_name = "my_ts";
1244 let field_name = "my_field";
1245 let table = make_table_ref_with_schema(ts_name, field_name);
1246
1247 let mut req = RowInsertRequest {
1249 table_name: "test_table".to_string(),
1250 rows: Some(Rows {
1251 schema: vec![
1252 time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1253 field_column_schema("field_wrong", ColumnDataType::Float64),
1254 ],
1255 rows: vec![api::v1::Row {
1256 values: vec![Value::default(), Value::default()],
1257 }],
1258 }),
1259 };
1260 let ctx = Arc::new(QueryContext::with(
1261 DEFAULT_CATALOG_NAME,
1262 DEFAULT_SCHEMA_NAME,
1263 ));
1264
1265 let kv_backend = prepare_mocked_backend().await;
1266 let inserter = Inserter::new(
1267 catalog::memory::MemoryCatalogManager::new(),
1268 create_partition_rule_manager(kv_backend.clone()).await,
1269 Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1270 Arc::new(new_table_flownode_set_cache(
1271 String::new(),
1272 Cache::new(100),
1273 kv_backend.clone(),
1274 )),
1275 );
1276 let alter_expr = inserter
1277 .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1278 .unwrap();
1279 assert!(alter_expr.is_none());
1280
1281 let req_schema = req.rows.as_ref().unwrap().schema.clone();
1283 assert_eq!(req_schema[0].column_name, ts_name);
1284 assert_eq!(req_schema[1].column_name, field_name);
1285 }
1286}