1use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21 InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22 RegionRequestHeader,
23};
24use api::v1::{
25 AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26 RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31 PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32 TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_services_table_name,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::Output;
39use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::ResultExt;
48use snafu::prelude::*;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53 LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{
56 APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TWCS_TIME_WINDOW,
57};
58use store_api::storage::{RegionId, TableId};
59use table::TableRef;
60use table::metadata::TableInfo;
61use table::requests::{
62 AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
63 TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
64};
65use table::table_reference::TableReference;
66
67use crate::error::{
68 CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
69 InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
70};
71use crate::expr_helper;
72use crate::region_req_factory::RegionRequestFactory;
73use crate::req_convert::common::preprocess_row_insert_requests;
74use crate::req_convert::insert::{
75 ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
76};
77use crate::statement::StatementExecutor;
78
79pub struct Inserter {
80 catalog_manager: CatalogManagerRef,
81 pub(crate) partition_manager: PartitionRuleManagerRef,
82 pub(crate) node_manager: NodeManagerRef,
83 pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
84}
85
86pub type InserterRef = Arc<Inserter>;
87
88#[derive(Clone)]
90pub enum AutoCreateTableType {
91 Logical(String),
93 Physical,
95 Log,
97 LastNonNull,
99 Trace,
101}
102
103impl AutoCreateTableType {
104 fn as_str(&self) -> &'static str {
105 match self {
106 AutoCreateTableType::Logical(_) => "logical",
107 AutoCreateTableType::Physical => "physical",
108 AutoCreateTableType::Log => "log",
109 AutoCreateTableType::LastNonNull => "last_non_null",
110 AutoCreateTableType::Trace => "trace",
111 }
112 }
113}
114
115#[derive(Clone)]
122pub struct InstantAndNormalInsertRequests {
123 pub normal_requests: RegionInsertRequests,
125 pub instant_requests: RegionInsertRequests,
128}
129
130impl Inserter {
131 pub fn new(
132 catalog_manager: CatalogManagerRef,
133 partition_manager: PartitionRuleManagerRef,
134 node_manager: NodeManagerRef,
135 table_flownode_set_cache: TableFlownodeSetCacheRef,
136 ) -> Self {
137 Self {
138 catalog_manager,
139 partition_manager,
140 node_manager,
141 table_flownode_set_cache,
142 }
143 }
144
145 pub async fn handle_column_inserts(
146 &self,
147 requests: InsertRequests,
148 ctx: QueryContextRef,
149 statement_executor: &StatementExecutor,
150 ) -> Result<Output> {
151 let row_inserts = ColumnToRow::convert(requests)?;
152 self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
153 .await
154 }
155
156 pub async fn handle_row_inserts(
158 &self,
159 mut requests: RowInsertRequests,
160 ctx: QueryContextRef,
161 statement_executor: &StatementExecutor,
162 accommodate_existing_schema: bool,
163 is_single_value: bool,
164 ) -> Result<Output> {
165 preprocess_row_insert_requests(&mut requests.inserts)?;
166 self.handle_row_inserts_with_create_type(
167 requests,
168 ctx,
169 statement_executor,
170 AutoCreateTableType::Physical,
171 accommodate_existing_schema,
172 is_single_value,
173 )
174 .await
175 }
176
177 pub async fn handle_log_inserts(
179 &self,
180 requests: RowInsertRequests,
181 ctx: QueryContextRef,
182 statement_executor: &StatementExecutor,
183 ) -> Result<Output> {
184 self.handle_row_inserts_with_create_type(
185 requests,
186 ctx,
187 statement_executor,
188 AutoCreateTableType::Log,
189 false,
190 false,
191 )
192 .await
193 }
194
195 pub async fn handle_trace_inserts(
196 &self,
197 requests: RowInsertRequests,
198 ctx: QueryContextRef,
199 statement_executor: &StatementExecutor,
200 ) -> Result<Output> {
201 self.handle_row_inserts_with_create_type(
202 requests,
203 ctx,
204 statement_executor,
205 AutoCreateTableType::Trace,
206 false,
207 false,
208 )
209 .await
210 }
211
212 pub async fn handle_last_non_null_inserts(
214 &self,
215 requests: RowInsertRequests,
216 ctx: QueryContextRef,
217 statement_executor: &StatementExecutor,
218 accommodate_existing_schema: bool,
219 is_single_value: bool,
220 ) -> Result<Output> {
221 self.handle_row_inserts_with_create_type(
222 requests,
223 ctx,
224 statement_executor,
225 AutoCreateTableType::LastNonNull,
226 accommodate_existing_schema,
227 is_single_value,
228 )
229 .await
230 }
231
232 async fn handle_row_inserts_with_create_type(
234 &self,
235 mut requests: RowInsertRequests,
236 ctx: QueryContextRef,
237 statement_executor: &StatementExecutor,
238 create_type: AutoCreateTableType,
239 accommodate_existing_schema: bool,
240 is_single_value: bool,
241 ) -> Result<Output> {
242 requests.inserts.retain(|req| {
244 req.rows
245 .as_ref()
246 .map(|r| !r.rows.is_empty())
247 .unwrap_or_default()
248 });
249 validate_column_count_match(&requests)?;
250
251 let CreateAlterTableResult {
252 instant_table_ids,
253 table_infos,
254 } = self
255 .create_or_alter_tables_on_demand(
256 &mut requests,
257 &ctx,
258 create_type,
259 statement_executor,
260 accommodate_existing_schema,
261 is_single_value,
262 )
263 .await?;
264
265 let name_to_info = table_infos
266 .values()
267 .map(|info| (info.name.clone(), info.clone()))
268 .collect::<HashMap<_, _>>();
269 let inserts = RowToRegion::new(
270 name_to_info,
271 instant_table_ids,
272 self.partition_manager.as_ref(),
273 )
274 .convert(requests)
275 .await?;
276
277 self.do_request(inserts, &table_infos, &ctx).await
278 }
279
280 pub async fn handle_metric_row_inserts(
282 &self,
283 mut requests: RowInsertRequests,
284 ctx: QueryContextRef,
285 statement_executor: &StatementExecutor,
286 physical_table: String,
287 ) -> Result<Output> {
288 requests.inserts.retain(|req| {
290 req.rows
291 .as_ref()
292 .map(|r| !r.rows.is_empty())
293 .unwrap_or_default()
294 });
295 validate_column_count_match(&requests)?;
296
297 self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
299 .await?;
300
301 let CreateAlterTableResult {
303 instant_table_ids,
304 table_infos,
305 } = self
306 .create_or_alter_tables_on_demand(
307 &mut requests,
308 &ctx,
309 AutoCreateTableType::Logical(physical_table.to_string()),
310 statement_executor,
311 true,
312 true,
313 )
314 .await?;
315 let name_to_info = table_infos
316 .values()
317 .map(|info| (info.name.clone(), info.clone()))
318 .collect::<HashMap<_, _>>();
319 let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
320 .convert(requests)
321 .await?;
322
323 self.do_request(inserts, &table_infos, &ctx).await
324 }
325
326 pub async fn handle_table_insert(
327 &self,
328 request: TableInsertRequest,
329 ctx: QueryContextRef,
330 ) -> Result<Output> {
331 let catalog = request.catalog_name.as_str();
332 let schema = request.schema_name.as_str();
333 let table_name = request.table_name.as_str();
334 let table = self.get_table(catalog, schema, table_name).await?;
335 let table = table.with_context(|| TableNotFoundSnafu {
336 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
337 })?;
338 let table_info = table.table_info();
339
340 let inserts = TableToRegion::new(&table_info, &self.partition_manager)
341 .convert(request)
342 .await?;
343
344 let table_infos =
345 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
346
347 self.do_request(inserts, &table_infos, &ctx).await
348 }
349
350 pub async fn handle_statement_insert(
351 &self,
352 insert: &Insert,
353 ctx: &QueryContextRef,
354 ) -> Result<Output> {
355 let (inserts, table_info) =
356 StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
357 .convert(insert, ctx)
358 .await?;
359
360 let table_infos =
361 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
362
363 self.do_request(inserts, &table_infos, ctx).await
364 }
365}
366
367impl Inserter {
368 async fn do_request(
369 &self,
370 requests: InstantAndNormalInsertRequests,
371 table_infos: &HashMap<TableId, Arc<TableInfo>>,
372 ctx: &QueryContextRef,
373 ) -> Result<Output> {
374 let requests = fill_reqs_with_impure_default(table_infos, requests)?;
376
377 let write_cost = write_meter!(
378 ctx.current_catalog(),
379 ctx.current_schema(),
380 requests,
381 ctx.channel() as u8
382 );
383 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
384 tracing_context: TracingContext::from_current_span().to_w3c(),
385 dbname: ctx.get_db_string(),
386 ..Default::default()
387 });
388
389 let InstantAndNormalInsertRequests {
390 normal_requests,
391 instant_requests,
392 } = requests;
393
394 let flow_mirror_task = FlowMirrorTask::new(
396 &self.table_flownode_set_cache,
397 normal_requests
398 .requests
399 .iter()
400 .chain(instant_requests.requests.iter()),
401 )
402 .await?;
403 flow_mirror_task.detach(self.node_manager.clone())?;
404
405 let write_tasks = self
407 .group_requests_by_peer(normal_requests)
408 .await?
409 .into_iter()
410 .map(|(peer, inserts)| {
411 let node_manager = self.node_manager.clone();
412 let request = request_factory.build_insert(inserts);
413 common_runtime::spawn_global(async move {
414 node_manager
415 .datanode(&peer)
416 .await
417 .handle(request)
418 .await
419 .context(RequestInsertsSnafu)
420 })
421 });
422 let results = future::try_join_all(write_tasks)
423 .await
424 .context(JoinTaskSnafu)?;
425 let affected_rows = results
426 .into_iter()
427 .map(|resp| resp.map(|r| r.affected_rows))
428 .sum::<Result<AffectedRows>>()?;
429 crate::metrics::DIST_INGEST_ROW_COUNT
430 .with_label_values(&[ctx.get_db_string().as_str()])
431 .inc_by(affected_rows as u64);
432 Ok(Output::new(
433 OutputData::AffectedRows(affected_rows),
434 OutputMeta::new_with_cost(write_cost as _),
435 ))
436 }
437
438 async fn group_requests_by_peer(
439 &self,
440 requests: RegionInsertRequests,
441 ) -> Result<HashMap<Peer, RegionInsertRequests>> {
442 let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
445 for req in requests.requests {
446 let region_id = RegionId::from_u64(req.region_id);
447 requests_per_region
448 .entry(region_id)
449 .or_default()
450 .requests
451 .push(req);
452 }
453
454 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
455
456 for (region_id, reqs) in requests_per_region {
457 let peer = self
458 .partition_manager
459 .find_region_leader(region_id)
460 .await
461 .context(FindRegionLeaderSnafu)?;
462 inserts
463 .entry(peer)
464 .or_default()
465 .requests
466 .extend(reqs.requests);
467 }
468
469 Ok(inserts)
470 }
471
472 async fn create_or_alter_tables_on_demand(
485 &self,
486 requests: &mut RowInsertRequests,
487 ctx: &QueryContextRef,
488 auto_create_table_type: AutoCreateTableType,
489 statement_executor: &StatementExecutor,
490 accommodate_existing_schema: bool,
491 is_single_value: bool,
492 ) -> Result<CreateAlterTableResult> {
493 let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
494 .with_label_values(&[auto_create_table_type.as_str()])
495 .start_timer();
496
497 let catalog = ctx.current_catalog();
498 let schema = ctx.current_schema();
499
500 let mut table_infos = HashMap::new();
501 let auto_create_table_hint = ctx
503 .extension(AUTO_CREATE_TABLE_KEY)
504 .map(|v| v.parse::<bool>())
505 .transpose()
506 .map_err(|_| {
507 InvalidInsertRequestSnafu {
508 reason: "`auto_create_table` hint must be a boolean",
509 }
510 .build()
511 })?
512 .unwrap_or(true);
513 if !auto_create_table_hint {
514 let mut instant_table_ids = HashSet::new();
515 for req in &requests.inserts {
516 let table = self
517 .get_table(catalog, &schema, &req.table_name)
518 .await?
519 .context(InvalidInsertRequestSnafu {
520 reason: format!(
521 "Table `{}` does not exist, and `auto_create_table` hint is disabled",
522 req.table_name
523 ),
524 })?;
525 let table_info = table.table_info();
526 if table_info.is_ttl_instant_table() {
527 instant_table_ids.insert(table_info.table_id());
528 }
529 table_infos.insert(table_info.table_id(), table.table_info());
530 }
531 let ret = CreateAlterTableResult {
532 instant_table_ids,
533 table_infos,
534 };
535 return Ok(ret);
536 }
537
538 let mut create_tables = vec![];
539 let mut alter_tables = vec![];
540 let mut instant_table_ids = HashSet::new();
541
542 for req in &mut requests.inserts {
543 match self.get_table(catalog, &schema, &req.table_name).await? {
544 Some(table) => {
545 let table_info = table.table_info();
546 if table_info.is_ttl_instant_table() {
547 instant_table_ids.insert(table_info.table_id());
548 }
549 table_infos.insert(table_info.table_id(), table.table_info());
550 if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
551 req,
552 &table,
553 ctx,
554 accommodate_existing_schema,
555 is_single_value,
556 )? {
557 alter_tables.push(alter_expr);
558 }
559 }
560 None => {
561 let create_expr =
562 self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
563 create_tables.push(create_expr);
564 }
565 }
566 }
567
568 match auto_create_table_type {
569 AutoCreateTableType::Logical(_) => {
570 if !create_tables.is_empty() {
571 let tables = self
573 .create_logical_tables(create_tables, ctx, statement_executor)
574 .await?;
575
576 for table in tables {
577 let table_info = table.table_info();
578 if table_info.is_ttl_instant_table() {
579 instant_table_ids.insert(table_info.table_id());
580 }
581 table_infos.insert(table_info.table_id(), table.table_info());
582 }
583 }
584 if !alter_tables.is_empty() {
585 statement_executor
587 .alter_logical_tables(alter_tables, ctx.clone())
588 .await?;
589 }
590 }
591 AutoCreateTableType::Physical
592 | AutoCreateTableType::Log
593 | AutoCreateTableType::LastNonNull => {
594 for create_table in create_tables {
597 let table = self
598 .create_physical_table(create_table, None, ctx, statement_executor)
599 .await?;
600 let table_info = table.table_info();
601 if table_info.is_ttl_instant_table() {
602 instant_table_ids.insert(table_info.table_id());
603 }
604 table_infos.insert(table_info.table_id(), table.table_info());
605 }
606 for alter_expr in alter_tables.into_iter() {
607 statement_executor
608 .alter_table_inner(alter_expr, ctx.clone())
609 .await?;
610 }
611 }
612
613 AutoCreateTableType::Trace => {
614 let trace_table_name = ctx
615 .extension(TRACE_TABLE_NAME_SESSION_KEY)
616 .unwrap_or(TRACE_TABLE_NAME);
617
618 for mut create_table in create_tables {
621 if create_table.table_name == trace_services_table_name(trace_table_name) {
622 create_table
624 .table_options
625 .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
626 let table = self
627 .create_physical_table(create_table, None, ctx, statement_executor)
628 .await?;
629 let table_info = table.table_info();
630 if table_info.is_ttl_instant_table() {
631 instant_table_ids.insert(table_info.table_id());
632 }
633 table_infos.insert(table_info.table_id(), table.table_info());
634 } else {
635 let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
638 .context(CreatePartitionRulesSnafu)?;
639 let index_columns =
644 [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
645 for index_column in index_columns {
646 if let Some(col) = create_table
647 .column_defs
648 .iter_mut()
649 .find(|c| c.name == index_column)
650 {
651 col.options =
652 options_from_skipping(&SkippingIndexOptions::default())
653 .context(ColumnOptionsSnafu)?;
654 } else {
655 warn!(
656 "Column {} not found when creating index for trace table: {}.",
657 index_column, create_table.table_name
658 );
659 }
660 }
661
662 create_table.table_options.insert(
664 TABLE_DATA_MODEL.to_string(),
665 TABLE_DATA_MODEL_TRACE_V1.to_string(),
666 );
667
668 let table = self
669 .create_physical_table(
670 create_table,
671 Some(partitions),
672 ctx,
673 statement_executor,
674 )
675 .await?;
676 let table_info = table.table_info();
677 if table_info.is_ttl_instant_table() {
678 instant_table_ids.insert(table_info.table_id());
679 }
680 table_infos.insert(table_info.table_id(), table.table_info());
681 }
682 }
683 for alter_expr in alter_tables.into_iter() {
684 statement_executor
685 .alter_table_inner(alter_expr, ctx.clone())
686 .await?;
687 }
688 }
689 }
690
691 Ok(CreateAlterTableResult {
692 instant_table_ids,
693 table_infos,
694 })
695 }
696
697 async fn create_physical_table_on_demand(
698 &self,
699 ctx: &QueryContextRef,
700 physical_table: String,
701 statement_executor: &StatementExecutor,
702 ) -> Result<()> {
703 let catalog_name = ctx.current_catalog();
704 let schema_name = ctx.current_schema();
705
706 if self
708 .get_table(catalog_name, &schema_name, &physical_table)
709 .await?
710 .is_some()
711 {
712 return Ok(());
713 }
714
715 let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
716 info!("Physical metric table `{table_reference}` does not exist, try creating table");
717
718 let default_schema = vec![
720 ColumnSchema {
721 column_name: GREPTIME_TIMESTAMP.to_string(),
722 datatype: ColumnDataType::TimestampMillisecond as _,
723 semantic_type: SemanticType::Timestamp as _,
724 datatype_extension: None,
725 options: None,
726 },
727 ColumnSchema {
728 column_name: GREPTIME_VALUE.to_string(),
729 datatype: ColumnDataType::Float64 as _,
730 semantic_type: SemanticType::Field as _,
731 datatype_extension: None,
732 options: None,
733 },
734 ];
735 let create_table_expr =
736 &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
737
738 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
739 create_table_expr
740 .table_options
741 .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
742
743 let res = statement_executor
745 .create_table_inner(create_table_expr, None, ctx.clone())
746 .await;
747
748 match res {
749 Ok(_) => {
750 info!("Successfully created table {table_reference}",);
751 Ok(())
752 }
753 Err(err) => {
754 error!(err; "Failed to create table {table_reference}");
755 Err(err)
756 }
757 }
758 }
759
760 async fn get_table(
761 &self,
762 catalog: &str,
763 schema: &str,
764 table: &str,
765 ) -> Result<Option<TableRef>> {
766 self.catalog_manager
767 .table(catalog, schema, table, None)
768 .await
769 .context(CatalogSnafu)
770 }
771
772 fn get_create_table_expr_on_demand(
773 &self,
774 req: &RowInsertRequest,
775 create_type: &AutoCreateTableType,
776 ctx: &QueryContextRef,
777 ) -> Result<CreateTableExpr> {
778 let mut table_options = std::collections::HashMap::with_capacity(4);
779 fill_table_options_for_create(&mut table_options, create_type, ctx);
780
781 let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
782 METRIC_ENGINE_NAME
784 } else {
785 default_engine()
786 };
787
788 let schema = ctx.current_schema();
789 let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
790 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
792 let mut create_table_expr =
793 build_create_table_expr(&table_ref, request_schema, engine_name)?;
794
795 info!("Table `{table_ref}` does not exist, try creating table");
796 create_table_expr.table_options.extend(table_options);
797 Ok(create_table_expr)
798 }
799
800 fn get_alter_table_expr_on_demand(
808 &self,
809 req: &mut RowInsertRequest,
810 table: &TableRef,
811 ctx: &QueryContextRef,
812 accommodate_existing_schema: bool,
813 is_single_value: bool,
814 ) -> Result<Option<AlterTableExpr>> {
815 let catalog_name = ctx.current_catalog();
816 let schema_name = ctx.current_schema();
817 let table_name = table.table_info().name.clone();
818
819 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
820 let column_exprs = ColumnExpr::from_column_schemas(request_schema);
821 let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
822 let Some(mut add_columns) = add_columns else {
823 return Ok(None);
824 };
825
826 if accommodate_existing_schema {
828 let table_schema = table.schema();
829 let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
831 let mut field_col_name = None;
833 if is_single_value {
834 let mut multiple_field_cols = false;
835 table.field_columns().for_each(|col| {
836 if field_col_name.is_none() {
837 field_col_name = Some(col.name.clone());
838 } else {
839 multiple_field_cols = true;
840 }
841 });
842 if multiple_field_cols {
843 field_col_name = None;
844 }
845 }
846
847 if let Some(rows) = req.rows.as_mut() {
849 for col in &mut rows.schema {
850 match col.semantic_type {
851 x if x == SemanticType::Timestamp as i32 => {
852 if let Some(ref ts_name) = ts_col_name
853 && col.column_name != *ts_name
854 {
855 col.column_name = ts_name.clone();
856 }
857 }
858 x if x == SemanticType::Field as i32 => {
859 if let Some(ref field_name) = field_col_name
860 && col.column_name != *field_name
861 {
862 col.column_name = field_name.clone();
863 }
864 }
865 _ => {}
866 }
867 }
868 }
869
870 add_columns.add_columns.retain(|col| {
872 let def = col.column_def.as_ref().unwrap();
873 def.semantic_type == SemanticType::Tag as i32
874 || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
875 });
876
877 if add_columns.add_columns.is_empty() {
878 return Ok(None);
879 }
880 }
881
882 Ok(Some(AlterTableExpr {
883 catalog_name: catalog_name.to_string(),
884 schema_name: schema_name.to_string(),
885 table_name: table_name.to_string(),
886 kind: Some(Kind::AddColumns(add_columns)),
887 }))
888 }
889
890 async fn create_physical_table(
892 &self,
893 mut create_table_expr: CreateTableExpr,
894 partitions: Option<Partitions>,
895 ctx: &QueryContextRef,
896 statement_executor: &StatementExecutor,
897 ) -> Result<TableRef> {
898 {
899 let table_ref = TableReference::full(
900 &create_table_expr.catalog_name,
901 &create_table_expr.schema_name,
902 &create_table_expr.table_name,
903 );
904
905 info!("Table `{table_ref}` does not exist, try creating table");
906 }
907 let res = statement_executor
908 .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
909 .await;
910
911 let table_ref = TableReference::full(
912 &create_table_expr.catalog_name,
913 &create_table_expr.schema_name,
914 &create_table_expr.table_name,
915 );
916
917 match res {
918 Ok(table) => {
919 info!(
920 "Successfully created table {} with options: {:?}",
921 table_ref, create_table_expr.table_options,
922 );
923 Ok(table)
924 }
925 Err(err) => {
926 error!(err; "Failed to create table {}", table_ref);
927 Err(err)
928 }
929 }
930 }
931
932 async fn create_logical_tables(
933 &self,
934 create_table_exprs: Vec<CreateTableExpr>,
935 ctx: &QueryContextRef,
936 statement_executor: &StatementExecutor,
937 ) -> Result<Vec<TableRef>> {
938 let res = statement_executor
939 .create_logical_tables(&create_table_exprs, ctx.clone())
940 .await;
941
942 match res {
943 Ok(res) => {
944 info!("Successfully created logical tables");
945 Ok(res)
946 }
947 Err(err) => {
948 let failed_tables = create_table_exprs
949 .into_iter()
950 .map(|expr| {
951 format!(
952 "{}.{}.{}",
953 expr.catalog_name, expr.schema_name, expr.table_name
954 )
955 })
956 .collect::<Vec<_>>();
957 error!(
958 err;
959 "Failed to create logical tables {:?}",
960 failed_tables
961 );
962 Err(err)
963 }
964 }
965 }
966
967 pub fn node_manager(&self) -> &NodeManagerRef {
968 &self.node_manager
969 }
970
971 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
972 &self.partition_manager
973 }
974}
975
976fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
977 for request in &requests.inserts {
978 let rows = request.rows.as_ref().unwrap();
979 let column_count = rows.schema.len();
980 rows.rows.iter().try_for_each(|r| {
981 ensure!(
982 r.values.len() == column_count,
983 InvalidInsertRequestSnafu {
984 reason: format!(
985 "column count mismatch, columns: {}, values: {}",
986 column_count,
987 r.values.len()
988 )
989 }
990 );
991 Ok(())
992 })?;
993 }
994 Ok(())
995}
996
997pub fn fill_table_options_for_create(
999 table_options: &mut std::collections::HashMap<String, String>,
1000 create_type: &AutoCreateTableType,
1001 ctx: &QueryContextRef,
1002) {
1003 for key in VALID_TABLE_OPTION_KEYS {
1004 if let Some(value) = ctx.extension(key) {
1005 table_options.insert(key.to_string(), value.to_string());
1006 }
1007 }
1008
1009 match create_type {
1010 AutoCreateTableType::Logical(physical_table) => {
1011 table_options.insert(
1012 LOGICAL_TABLE_METADATA_KEY.to_string(),
1013 physical_table.to_string(),
1014 );
1015 }
1016 AutoCreateTableType::Physical => {
1017 if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1018 table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1019 }
1020 if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1021 table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1022 }
1023 if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1024 table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1025 table_options.insert(
1027 COMPACTION_TYPE.to_string(),
1028 COMPACTION_TYPE_TWCS.to_string(),
1029 );
1030 }
1031 }
1032 AutoCreateTableType::Log => {
1035 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1036 }
1037 AutoCreateTableType::LastNonNull => {
1038 table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1039 }
1040 AutoCreateTableType::Trace => {
1041 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1042 }
1043 }
1044}
1045
1046pub fn build_create_table_expr(
1047 table: &TableReference,
1048 request_schema: &[ColumnSchema],
1049 engine: &str,
1050) -> Result<CreateTableExpr> {
1051 expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1052}
1053
1054struct CreateAlterTableResult {
1056 instant_table_ids: HashSet<TableId>,
1058 table_infos: HashMap<TableId, Arc<TableInfo>>,
1060}
1061
1062struct FlowMirrorTask {
1063 requests: HashMap<Peer, RegionInsertRequests>,
1064 num_rows: usize,
1065}
1066
1067impl FlowMirrorTask {
1068 async fn new(
1069 cache: &TableFlownodeSetCacheRef,
1070 requests: impl Iterator<Item = &RegionInsertRequest>,
1071 ) -> Result<Self> {
1072 let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1073 HashMap::new();
1074 let mut num_rows = 0;
1075
1076 for req in requests {
1077 let table_id = RegionId::from_u64(req.region_id).table_id();
1078 match src_table_reqs.get_mut(&table_id) {
1079 Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1080 Some(None) => continue,
1082 _ => {
1083 let peers = cache
1085 .get(table_id)
1086 .await
1087 .context(RequestInsertsSnafu)?
1088 .unwrap_or_default()
1089 .values()
1090 .cloned()
1091 .collect::<HashSet<_>>()
1092 .into_iter()
1093 .collect::<Vec<_>>();
1094
1095 if !peers.is_empty() {
1096 let mut reqs = RegionInsertRequests::default();
1097 reqs.requests.push(req.clone());
1098 num_rows += reqs
1099 .requests
1100 .iter()
1101 .map(|r| r.rows.as_ref().unwrap().rows.len())
1102 .sum::<usize>();
1103 src_table_reqs.insert(table_id, Some((peers, reqs)));
1104 } else {
1105 src_table_reqs.insert(table_id, None);
1107 }
1108 }
1109 }
1110 }
1111
1112 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1113
1114 for (_table_id, (peers, reqs)) in src_table_reqs
1115 .into_iter()
1116 .filter_map(|(k, v)| v.map(|v| (k, v)))
1117 {
1118 if peers.len() == 1 {
1119 inserts
1121 .entry(peers[0].clone())
1122 .or_default()
1123 .requests
1124 .extend(reqs.requests);
1125 continue;
1126 } else {
1127 for flownode in peers {
1129 inserts
1130 .entry(flownode.clone())
1131 .or_default()
1132 .requests
1133 .extend(reqs.requests.clone());
1134 }
1135 }
1136 }
1137
1138 Ok(Self {
1139 requests: inserts,
1140 num_rows,
1141 })
1142 }
1143
1144 fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1145 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1146 for (peer, inserts) in self.requests {
1147 let node_manager = node_manager.clone();
1148 common_runtime::spawn_global(async move {
1149 let result = node_manager
1150 .flownode(&peer)
1151 .await
1152 .handle_inserts(inserts)
1153 .await
1154 .context(RequestInsertsSnafu);
1155
1156 match result {
1157 Ok(resp) => {
1158 let affected_rows = resp.affected_rows;
1159 crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1160 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1161 }
1162 Err(err) => {
1163 error!(err; "Failed to insert data into flownode {}", peer);
1164 }
1165 }
1166 });
1167 }
1168
1169 Ok(())
1170 }
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175 use std::sync::Arc;
1176
1177 use api::v1::helper::{field_column_schema, time_index_column_schema};
1178 use api::v1::{RowInsertRequest, Rows, Value};
1179 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1180 use common_meta::cache::new_table_flownode_set_cache;
1181 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1182 use common_meta::test_util::MockDatanodeManager;
1183 use datatypes::data_type::ConcreteDataType;
1184 use datatypes::schema::ColumnSchema;
1185 use moka::future::Cache;
1186 use session::context::QueryContext;
1187 use table::TableRef;
1188 use table::dist_table::DummyDataSource;
1189 use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1190
1191 use super::*;
1192 use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1193
1194 fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1195 let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1196 ColumnSchema::new(
1197 ts_name,
1198 ConcreteDataType::timestamp_millisecond_datatype(),
1199 false,
1200 )
1201 .with_time_index(true),
1202 ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1203 ])
1204 .unwrap()
1205 .build()
1206 .unwrap();
1207 let meta = TableMetaBuilder::empty()
1208 .schema(Arc::new(schema))
1209 .primary_key_indices(vec![])
1210 .value_indices(vec![1])
1211 .engine("mito")
1212 .next_column_id(0)
1213 .options(Default::default())
1214 .created_on(Default::default())
1215 .region_numbers(vec![0])
1216 .build()
1217 .unwrap();
1218 let info = Arc::new(
1219 TableInfoBuilder::default()
1220 .table_id(1)
1221 .table_version(0)
1222 .name("test_table")
1223 .schema_name(DEFAULT_SCHEMA_NAME)
1224 .catalog_name(DEFAULT_CATALOG_NAME)
1225 .desc(None)
1226 .table_type(TableType::Base)
1227 .meta(meta)
1228 .build()
1229 .unwrap(),
1230 );
1231 Arc::new(table::Table::new(
1232 info,
1233 table::metadata::FilterPushDownType::Unsupported,
1234 Arc::new(DummyDataSource),
1235 ))
1236 }
1237
1238 #[tokio::test]
1239 async fn test_accommodate_existing_schema_logic() {
1240 let ts_name = "my_ts";
1241 let field_name = "my_field";
1242 let table = make_table_ref_with_schema(ts_name, field_name);
1243
1244 let mut req = RowInsertRequest {
1246 table_name: "test_table".to_string(),
1247 rows: Some(Rows {
1248 schema: vec![
1249 time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1250 field_column_schema("field_wrong", ColumnDataType::Float64),
1251 ],
1252 rows: vec![api::v1::Row {
1253 values: vec![Value::default(), Value::default()],
1254 }],
1255 }),
1256 };
1257 let ctx = Arc::new(QueryContext::with(
1258 DEFAULT_CATALOG_NAME,
1259 DEFAULT_SCHEMA_NAME,
1260 ));
1261
1262 let kv_backend = prepare_mocked_backend().await;
1263 let inserter = Inserter::new(
1264 catalog::memory::MemoryCatalogManager::new(),
1265 create_partition_rule_manager(kv_backend.clone()).await,
1266 Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1267 Arc::new(new_table_flownode_set_cache(
1268 String::new(),
1269 Cache::new(100),
1270 kv_backend.clone(),
1271 )),
1272 );
1273 let alter_expr = inserter
1274 .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1275 .unwrap();
1276 assert!(alter_expr.is_none());
1277
1278 let req_schema = req.rows.as_ref().unwrap().schema.clone();
1280 assert_eq!(req_schema[0].column_name, ts_name);
1281 assert_eq!(req_schema[1].column_name, field_name);
1282 }
1283}