1use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21 InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22 RegionRequestHeader,
23};
24use api::v1::{
25 AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26 RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31 default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
32 TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
39use common_query::Output;
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::prelude::*;
48use snafu::ResultExt;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53 LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
56use store_api::storage::{RegionId, TableId};
57use table::metadata::TableInfo;
58use table::requests::{
59 InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
60 TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
61};
62use table::table_reference::TableReference;
63use table::TableRef;
64
65use crate::error::{
66 CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
67 InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
68};
69use crate::expr_helper;
70use crate::region_req_factory::RegionRequestFactory;
71use crate::req_convert::common::preprocess_row_insert_requests;
72use crate::req_convert::insert::{
73 fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
74};
75use crate::statement::StatementExecutor;
76
77pub struct Inserter {
78 catalog_manager: CatalogManagerRef,
79 pub(crate) partition_manager: PartitionRuleManagerRef,
80 pub(crate) node_manager: NodeManagerRef,
81 table_flownode_set_cache: TableFlownodeSetCacheRef,
82}
83
84pub type InserterRef = Arc<Inserter>;
85
86#[derive(Clone)]
88enum AutoCreateTableType {
89 Logical(String),
91 Physical,
93 Log,
95 LastNonNull,
97 Trace,
99}
100
101impl AutoCreateTableType {
102 fn as_str(&self) -> &'static str {
103 match self {
104 AutoCreateTableType::Logical(_) => "logical",
105 AutoCreateTableType::Physical => "physical",
106 AutoCreateTableType::Log => "log",
107 AutoCreateTableType::LastNonNull => "last_non_null",
108 AutoCreateTableType::Trace => "trace",
109 }
110 }
111}
112
113#[derive(Clone)]
120pub struct InstantAndNormalInsertRequests {
121 pub normal_requests: RegionInsertRequests,
123 pub instant_requests: RegionInsertRequests,
126}
127
128impl Inserter {
129 pub fn new(
130 catalog_manager: CatalogManagerRef,
131 partition_manager: PartitionRuleManagerRef,
132 node_manager: NodeManagerRef,
133 table_flownode_set_cache: TableFlownodeSetCacheRef,
134 ) -> Self {
135 Self {
136 catalog_manager,
137 partition_manager,
138 node_manager,
139 table_flownode_set_cache,
140 }
141 }
142
143 pub async fn handle_column_inserts(
144 &self,
145 requests: InsertRequests,
146 ctx: QueryContextRef,
147 statement_executor: &StatementExecutor,
148 ) -> Result<Output> {
149 let row_inserts = ColumnToRow::convert(requests)?;
150 self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
151 .await
152 }
153
154 pub async fn handle_row_inserts(
156 &self,
157 mut requests: RowInsertRequests,
158 ctx: QueryContextRef,
159 statement_executor: &StatementExecutor,
160 accommodate_existing_schema: bool,
161 is_single_value: bool,
162 ) -> Result<Output> {
163 preprocess_row_insert_requests(&mut requests.inserts)?;
164 self.handle_row_inserts_with_create_type(
165 requests,
166 ctx,
167 statement_executor,
168 AutoCreateTableType::Physical,
169 accommodate_existing_schema,
170 is_single_value,
171 )
172 .await
173 }
174
175 pub async fn handle_log_inserts(
177 &self,
178 requests: RowInsertRequests,
179 ctx: QueryContextRef,
180 statement_executor: &StatementExecutor,
181 ) -> Result<Output> {
182 self.handle_row_inserts_with_create_type(
183 requests,
184 ctx,
185 statement_executor,
186 AutoCreateTableType::Log,
187 false,
188 false,
189 )
190 .await
191 }
192
193 pub async fn handle_trace_inserts(
194 &self,
195 requests: RowInsertRequests,
196 ctx: QueryContextRef,
197 statement_executor: &StatementExecutor,
198 ) -> Result<Output> {
199 self.handle_row_inserts_with_create_type(
200 requests,
201 ctx,
202 statement_executor,
203 AutoCreateTableType::Trace,
204 false,
205 false,
206 )
207 .await
208 }
209
210 pub async fn handle_last_non_null_inserts(
212 &self,
213 requests: RowInsertRequests,
214 ctx: QueryContextRef,
215 statement_executor: &StatementExecutor,
216 accommodate_existing_schema: bool,
217 is_single_value: bool,
218 ) -> Result<Output> {
219 self.handle_row_inserts_with_create_type(
220 requests,
221 ctx,
222 statement_executor,
223 AutoCreateTableType::LastNonNull,
224 accommodate_existing_schema,
225 is_single_value,
226 )
227 .await
228 }
229
230 async fn handle_row_inserts_with_create_type(
232 &self,
233 mut requests: RowInsertRequests,
234 ctx: QueryContextRef,
235 statement_executor: &StatementExecutor,
236 create_type: AutoCreateTableType,
237 accommodate_existing_schema: bool,
238 is_single_value: bool,
239 ) -> Result<Output> {
240 requests.inserts.retain(|req| {
242 req.rows
243 .as_ref()
244 .map(|r| !r.rows.is_empty())
245 .unwrap_or_default()
246 });
247 validate_column_count_match(&requests)?;
248
249 let CreateAlterTableResult {
250 instant_table_ids,
251 table_infos,
252 } = self
253 .create_or_alter_tables_on_demand(
254 &mut requests,
255 &ctx,
256 create_type,
257 statement_executor,
258 accommodate_existing_schema,
259 is_single_value,
260 )
261 .await?;
262
263 let name_to_info = table_infos
264 .values()
265 .map(|info| (info.name.clone(), info.clone()))
266 .collect::<HashMap<_, _>>();
267 let inserts = RowToRegion::new(
268 name_to_info,
269 instant_table_ids,
270 self.partition_manager.as_ref(),
271 )
272 .convert(requests)
273 .await?;
274
275 self.do_request(inserts, &table_infos, &ctx).await
276 }
277
278 pub async fn handle_metric_row_inserts(
280 &self,
281 mut requests: RowInsertRequests,
282 ctx: QueryContextRef,
283 statement_executor: &StatementExecutor,
284 physical_table: String,
285 ) -> Result<Output> {
286 requests.inserts.retain(|req| {
288 req.rows
289 .as_ref()
290 .map(|r| !r.rows.is_empty())
291 .unwrap_or_default()
292 });
293 validate_column_count_match(&requests)?;
294
295 self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
297 .await?;
298
299 let CreateAlterTableResult {
301 instant_table_ids,
302 table_infos,
303 } = self
304 .create_or_alter_tables_on_demand(
305 &mut requests,
306 &ctx,
307 AutoCreateTableType::Logical(physical_table.to_string()),
308 statement_executor,
309 true,
310 true,
311 )
312 .await?;
313 let name_to_info = table_infos
314 .values()
315 .map(|info| (info.name.clone(), info.clone()))
316 .collect::<HashMap<_, _>>();
317 let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
318 .convert(requests)
319 .await?;
320
321 self.do_request(inserts, &table_infos, &ctx).await
322 }
323
324 pub async fn handle_table_insert(
325 &self,
326 request: TableInsertRequest,
327 ctx: QueryContextRef,
328 ) -> Result<Output> {
329 let catalog = request.catalog_name.as_str();
330 let schema = request.schema_name.as_str();
331 let table_name = request.table_name.as_str();
332 let table = self.get_table(catalog, schema, table_name).await?;
333 let table = table.with_context(|| TableNotFoundSnafu {
334 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
335 })?;
336 let table_info = table.table_info();
337
338 let inserts = TableToRegion::new(&table_info, &self.partition_manager)
339 .convert(request)
340 .await?;
341
342 let table_infos =
343 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
344
345 self.do_request(inserts, &table_infos, &ctx).await
346 }
347
348 pub async fn handle_statement_insert(
349 &self,
350 insert: &Insert,
351 ctx: &QueryContextRef,
352 ) -> Result<Output> {
353 let (inserts, table_info) =
354 StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
355 .convert(insert, ctx)
356 .await?;
357
358 let table_infos =
359 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
360
361 self.do_request(inserts, &table_infos, ctx).await
362 }
363}
364
365impl Inserter {
366 async fn do_request(
367 &self,
368 requests: InstantAndNormalInsertRequests,
369 table_infos: &HashMap<TableId, Arc<TableInfo>>,
370 ctx: &QueryContextRef,
371 ) -> Result<Output> {
372 let requests = fill_reqs_with_impure_default(table_infos, requests)?;
374
375 let write_cost = write_meter!(
376 ctx.current_catalog(),
377 ctx.current_schema(),
378 requests,
379 ctx.channel() as u8
380 );
381 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
382 tracing_context: TracingContext::from_current_span().to_w3c(),
383 dbname: ctx.get_db_string(),
384 ..Default::default()
385 });
386
387 let InstantAndNormalInsertRequests {
388 normal_requests,
389 instant_requests,
390 } = requests;
391
392 let flow_mirror_task = FlowMirrorTask::new(
394 &self.table_flownode_set_cache,
395 normal_requests
396 .requests
397 .iter()
398 .chain(instant_requests.requests.iter()),
399 )
400 .await?;
401 flow_mirror_task.detach(self.node_manager.clone())?;
402
403 let write_tasks = self
405 .group_requests_by_peer(normal_requests)
406 .await?
407 .into_iter()
408 .map(|(peer, inserts)| {
409 let node_manager = self.node_manager.clone();
410 let request = request_factory.build_insert(inserts);
411 common_runtime::spawn_global(async move {
412 node_manager
413 .datanode(&peer)
414 .await
415 .handle(request)
416 .await
417 .context(RequestInsertsSnafu)
418 })
419 });
420 let results = future::try_join_all(write_tasks)
421 .await
422 .context(JoinTaskSnafu)?;
423 let affected_rows = results
424 .into_iter()
425 .map(|resp| resp.map(|r| r.affected_rows))
426 .sum::<Result<AffectedRows>>()?;
427 crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
428 Ok(Output::new(
429 OutputData::AffectedRows(affected_rows),
430 OutputMeta::new_with_cost(write_cost as _),
431 ))
432 }
433
434 async fn group_requests_by_peer(
435 &self,
436 requests: RegionInsertRequests,
437 ) -> Result<HashMap<Peer, RegionInsertRequests>> {
438 let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
441 for req in requests.requests {
442 let region_id = RegionId::from_u64(req.region_id);
443 requests_per_region
444 .entry(region_id)
445 .or_default()
446 .requests
447 .push(req);
448 }
449
450 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
451
452 for (region_id, reqs) in requests_per_region {
453 let peer = self
454 .partition_manager
455 .find_region_leader(region_id)
456 .await
457 .context(FindRegionLeaderSnafu)?;
458 inserts
459 .entry(peer)
460 .or_default()
461 .requests
462 .extend(reqs.requests);
463 }
464
465 Ok(inserts)
466 }
467
468 async fn create_or_alter_tables_on_demand(
481 &self,
482 requests: &mut RowInsertRequests,
483 ctx: &QueryContextRef,
484 auto_create_table_type: AutoCreateTableType,
485 statement_executor: &StatementExecutor,
486 accommodate_existing_schema: bool,
487 is_single_value: bool,
488 ) -> Result<CreateAlterTableResult> {
489 let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
490 .with_label_values(&[auto_create_table_type.as_str()])
491 .start_timer();
492
493 let catalog = ctx.current_catalog();
494 let schema = ctx.current_schema();
495
496 let mut table_infos = HashMap::new();
497 let auto_create_table_hint = ctx
499 .extension(AUTO_CREATE_TABLE_KEY)
500 .map(|v| v.parse::<bool>())
501 .transpose()
502 .map_err(|_| {
503 InvalidInsertRequestSnafu {
504 reason: "`auto_create_table` hint must be a boolean",
505 }
506 .build()
507 })?
508 .unwrap_or(true);
509 if !auto_create_table_hint {
510 let mut instant_table_ids = HashSet::new();
511 for req in &requests.inserts {
512 let table = self
513 .get_table(catalog, &schema, &req.table_name)
514 .await?
515 .context(InvalidInsertRequestSnafu {
516 reason: format!(
517 "Table `{}` does not exist, and `auto_create_table` hint is disabled",
518 req.table_name
519 ),
520 })?;
521 let table_info = table.table_info();
522 if table_info.is_ttl_instant_table() {
523 instant_table_ids.insert(table_info.table_id());
524 }
525 table_infos.insert(table_info.table_id(), table.table_info());
526 }
527 let ret = CreateAlterTableResult {
528 instant_table_ids,
529 table_infos,
530 };
531 return Ok(ret);
532 }
533
534 let mut create_tables = vec![];
535 let mut alter_tables = vec![];
536 let mut instant_table_ids = HashSet::new();
537
538 for req in &mut requests.inserts {
539 match self.get_table(catalog, &schema, &req.table_name).await? {
540 Some(table) => {
541 let table_info = table.table_info();
542 if table_info.is_ttl_instant_table() {
543 instant_table_ids.insert(table_info.table_id());
544 }
545 table_infos.insert(table_info.table_id(), table.table_info());
546 if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
547 req,
548 &table,
549 ctx,
550 accommodate_existing_schema,
551 is_single_value,
552 )? {
553 alter_tables.push(alter_expr);
554 }
555 }
556 None => {
557 let create_expr =
558 self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
559 create_tables.push(create_expr);
560 }
561 }
562 }
563
564 match auto_create_table_type {
565 AutoCreateTableType::Logical(_) => {
566 if !create_tables.is_empty() {
567 let tables = self
569 .create_logical_tables(create_tables, ctx, statement_executor)
570 .await?;
571
572 for table in tables {
573 let table_info = table.table_info();
574 if table_info.is_ttl_instant_table() {
575 instant_table_ids.insert(table_info.table_id());
576 }
577 table_infos.insert(table_info.table_id(), table.table_info());
578 }
579 }
580 if !alter_tables.is_empty() {
581 statement_executor
583 .alter_logical_tables(alter_tables, ctx.clone())
584 .await?;
585 }
586 }
587 AutoCreateTableType::Physical
588 | AutoCreateTableType::Log
589 | AutoCreateTableType::LastNonNull => {
590 for create_table in create_tables {
593 let table = self
594 .create_physical_table(create_table, None, ctx, statement_executor)
595 .await?;
596 let table_info = table.table_info();
597 if table_info.is_ttl_instant_table() {
598 instant_table_ids.insert(table_info.table_id());
599 }
600 table_infos.insert(table_info.table_id(), table.table_info());
601 }
602 for alter_expr in alter_tables.into_iter() {
603 statement_executor
604 .alter_table_inner(alter_expr, ctx.clone())
605 .await?;
606 }
607 }
608
609 AutoCreateTableType::Trace => {
610 let trace_table_name = ctx
611 .extension(TRACE_TABLE_NAME_SESSION_KEY)
612 .unwrap_or(TRACE_TABLE_NAME);
613
614 for mut create_table in create_tables {
617 if create_table.table_name == trace_services_table_name(trace_table_name) {
618 create_table
620 .table_options
621 .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
622 let table = self
623 .create_physical_table(create_table, None, ctx, statement_executor)
624 .await?;
625 let table_info = table.table_info();
626 if table_info.is_ttl_instant_table() {
627 instant_table_ids.insert(table_info.table_id());
628 }
629 table_infos.insert(table_info.table_id(), table.table_info());
630 } else {
631 let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
634 .context(CreatePartitionRulesSnafu)?;
635 let index_columns =
640 [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
641 for index_column in index_columns {
642 if let Some(col) = create_table
643 .column_defs
644 .iter_mut()
645 .find(|c| c.name == index_column)
646 {
647 col.options =
648 options_from_skipping(&SkippingIndexOptions::default())
649 .context(ColumnOptionsSnafu)?;
650 } else {
651 warn!(
652 "Column {} not found when creating index for trace table: {}.",
653 index_column, create_table.table_name
654 );
655 }
656 }
657
658 create_table.table_options.insert(
660 TABLE_DATA_MODEL.to_string(),
661 TABLE_DATA_MODEL_TRACE_V1.to_string(),
662 );
663
664 let table = self
665 .create_physical_table(
666 create_table,
667 Some(partitions),
668 ctx,
669 statement_executor,
670 )
671 .await?;
672 let table_info = table.table_info();
673 if table_info.is_ttl_instant_table() {
674 instant_table_ids.insert(table_info.table_id());
675 }
676 table_infos.insert(table_info.table_id(), table.table_info());
677 }
678 }
679 for alter_expr in alter_tables.into_iter() {
680 statement_executor
681 .alter_table_inner(alter_expr, ctx.clone())
682 .await?;
683 }
684 }
685 }
686
687 Ok(CreateAlterTableResult {
688 instant_table_ids,
689 table_infos,
690 })
691 }
692
693 async fn create_physical_table_on_demand(
694 &self,
695 ctx: &QueryContextRef,
696 physical_table: String,
697 statement_executor: &StatementExecutor,
698 ) -> Result<()> {
699 let catalog_name = ctx.current_catalog();
700 let schema_name = ctx.current_schema();
701
702 if self
704 .get_table(catalog_name, &schema_name, &physical_table)
705 .await?
706 .is_some()
707 {
708 return Ok(());
709 }
710
711 let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
712 info!("Physical metric table `{table_reference}` does not exist, try creating table");
713
714 let default_schema = vec![
716 ColumnSchema {
717 column_name: GREPTIME_TIMESTAMP.to_string(),
718 datatype: ColumnDataType::TimestampMillisecond as _,
719 semantic_type: SemanticType::Timestamp as _,
720 datatype_extension: None,
721 options: None,
722 },
723 ColumnSchema {
724 column_name: GREPTIME_VALUE.to_string(),
725 datatype: ColumnDataType::Float64 as _,
726 semantic_type: SemanticType::Field as _,
727 datatype_extension: None,
728 options: None,
729 },
730 ];
731 let create_table_expr =
732 &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
733
734 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
735 create_table_expr
736 .table_options
737 .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
738
739 let res = statement_executor
741 .create_table_inner(create_table_expr, None, ctx.clone())
742 .await;
743
744 match res {
745 Ok(_) => {
746 info!("Successfully created table {table_reference}",);
747 Ok(())
748 }
749 Err(err) => {
750 error!(err; "Failed to create table {table_reference}");
751 Err(err)
752 }
753 }
754 }
755
756 async fn get_table(
757 &self,
758 catalog: &str,
759 schema: &str,
760 table: &str,
761 ) -> Result<Option<TableRef>> {
762 self.catalog_manager
763 .table(catalog, schema, table, None)
764 .await
765 .context(CatalogSnafu)
766 }
767
768 fn get_create_table_expr_on_demand(
769 &self,
770 req: &RowInsertRequest,
771 create_type: &AutoCreateTableType,
772 ctx: &QueryContextRef,
773 ) -> Result<CreateTableExpr> {
774 let mut table_options = Vec::with_capacity(4);
775 for key in VALID_TABLE_OPTION_KEYS {
776 if let Some(value) = ctx.extension(key) {
777 table_options.push((key, value));
778 }
779 }
780
781 let mut engine_name = default_engine();
782 match create_type {
783 AutoCreateTableType::Logical(physical_table) => {
784 engine_name = METRIC_ENGINE_NAME;
785 table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
786 }
787 AutoCreateTableType::Physical => {
788 if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
789 table_options.push((APPEND_MODE_KEY, append_mode));
790 }
791 if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
792 table_options.push((MERGE_MODE_KEY, merge_mode));
793 }
794 }
795 AutoCreateTableType::Log => {
798 table_options.push((APPEND_MODE_KEY, "true"));
799 }
800 AutoCreateTableType::LastNonNull => {
801 table_options.push((MERGE_MODE_KEY, "last_non_null"));
802 }
803 AutoCreateTableType::Trace => {
804 table_options.push((APPEND_MODE_KEY, "true"));
805 }
806 }
807
808 let schema = ctx.current_schema();
809 let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
810 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
812 let mut create_table_expr =
813 build_create_table_expr(&table_ref, request_schema, engine_name)?;
814
815 info!("Table `{table_ref}` does not exist, try creating table");
816 for (k, v) in table_options {
817 create_table_expr
818 .table_options
819 .insert(k.to_string(), v.to_string());
820 }
821
822 Ok(create_table_expr)
823 }
824
825 fn get_alter_table_expr_on_demand(
833 &self,
834 req: &mut RowInsertRequest,
835 table: &TableRef,
836 ctx: &QueryContextRef,
837 accommodate_existing_schema: bool,
838 is_single_value: bool,
839 ) -> Result<Option<AlterTableExpr>> {
840 let catalog_name = ctx.current_catalog();
841 let schema_name = ctx.current_schema();
842 let table_name = table.table_info().name.clone();
843
844 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
845 let column_exprs = ColumnExpr::from_column_schemas(request_schema);
846 let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
847 let Some(mut add_columns) = add_columns else {
848 return Ok(None);
849 };
850
851 if accommodate_existing_schema {
853 let table_schema = table.schema();
854 let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
856 let mut field_col_name = None;
858 if is_single_value {
859 let mut multiple_field_cols = false;
860 table.field_columns().for_each(|col| {
861 if field_col_name.is_none() {
862 field_col_name = Some(col.name.clone());
863 } else {
864 multiple_field_cols = true;
865 }
866 });
867 if multiple_field_cols {
868 field_col_name = None;
869 }
870 }
871
872 if let Some(rows) = req.rows.as_mut() {
874 for col in &mut rows.schema {
875 match col.semantic_type {
876 x if x == SemanticType::Timestamp as i32 => {
877 if let Some(ref ts_name) = ts_col_name {
878 if col.column_name != *ts_name {
879 col.column_name = ts_name.clone();
880 }
881 }
882 }
883 x if x == SemanticType::Field as i32 => {
884 if let Some(ref field_name) = field_col_name {
885 if col.column_name != *field_name {
886 col.column_name = field_name.clone();
887 }
888 }
889 }
890 _ => {}
891 }
892 }
893 }
894
895 add_columns.add_columns.retain(|col| {
897 let def = col.column_def.as_ref().unwrap();
898 def.semantic_type == SemanticType::Tag as i32
899 || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
900 });
901
902 if add_columns.add_columns.is_empty() {
903 return Ok(None);
904 }
905 }
906
907 Ok(Some(AlterTableExpr {
908 catalog_name: catalog_name.to_string(),
909 schema_name: schema_name.to_string(),
910 table_name: table_name.to_string(),
911 kind: Some(Kind::AddColumns(add_columns)),
912 }))
913 }
914
915 async fn create_physical_table(
917 &self,
918 mut create_table_expr: CreateTableExpr,
919 partitions: Option<Partitions>,
920 ctx: &QueryContextRef,
921 statement_executor: &StatementExecutor,
922 ) -> Result<TableRef> {
923 {
924 let table_ref = TableReference::full(
925 &create_table_expr.catalog_name,
926 &create_table_expr.schema_name,
927 &create_table_expr.table_name,
928 );
929
930 info!("Table `{table_ref}` does not exist, try creating table");
931 }
932 let res = statement_executor
933 .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
934 .await;
935
936 let table_ref = TableReference::full(
937 &create_table_expr.catalog_name,
938 &create_table_expr.schema_name,
939 &create_table_expr.table_name,
940 );
941
942 match res {
943 Ok(table) => {
944 info!(
945 "Successfully created table {} with options: {:?}",
946 table_ref, create_table_expr.table_options,
947 );
948 Ok(table)
949 }
950 Err(err) => {
951 error!(err; "Failed to create table {}", table_ref);
952 Err(err)
953 }
954 }
955 }
956
957 async fn create_logical_tables(
958 &self,
959 create_table_exprs: Vec<CreateTableExpr>,
960 ctx: &QueryContextRef,
961 statement_executor: &StatementExecutor,
962 ) -> Result<Vec<TableRef>> {
963 let res = statement_executor
964 .create_logical_tables(&create_table_exprs, ctx.clone())
965 .await;
966
967 match res {
968 Ok(res) => {
969 info!("Successfully created logical tables");
970 Ok(res)
971 }
972 Err(err) => {
973 let failed_tables = create_table_exprs
974 .into_iter()
975 .map(|expr| {
976 format!(
977 "{}.{}.{}",
978 expr.catalog_name, expr.schema_name, expr.table_name
979 )
980 })
981 .collect::<Vec<_>>();
982 error!(
983 err;
984 "Failed to create logical tables {:?}",
985 failed_tables
986 );
987 Err(err)
988 }
989 }
990 }
991}
992
993fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
994 for request in &requests.inserts {
995 let rows = request.rows.as_ref().unwrap();
996 let column_count = rows.schema.len();
997 rows.rows.iter().try_for_each(|r| {
998 ensure!(
999 r.values.len() == column_count,
1000 InvalidInsertRequestSnafu {
1001 reason: format!(
1002 "column count mismatch, columns: {}, values: {}",
1003 column_count,
1004 r.values.len()
1005 )
1006 }
1007 );
1008 Ok(())
1009 })?;
1010 }
1011 Ok(())
1012}
1013
1014fn build_create_table_expr(
1015 table: &TableReference,
1016 request_schema: &[ColumnSchema],
1017 engine: &str,
1018) -> Result<CreateTableExpr> {
1019 expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1020}
1021
1022struct CreateAlterTableResult {
1024 instant_table_ids: HashSet<TableId>,
1026 table_infos: HashMap<TableId, Arc<TableInfo>>,
1028}
1029
1030struct FlowMirrorTask {
1031 requests: HashMap<Peer, RegionInsertRequests>,
1032 num_rows: usize,
1033}
1034
1035impl FlowMirrorTask {
1036 async fn new(
1037 cache: &TableFlownodeSetCacheRef,
1038 requests: impl Iterator<Item = &RegionInsertRequest>,
1039 ) -> Result<Self> {
1040 let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1041 HashMap::new();
1042 let mut num_rows = 0;
1043
1044 for req in requests {
1045 let table_id = RegionId::from_u64(req.region_id).table_id();
1046 match src_table_reqs.get_mut(&table_id) {
1047 Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1048 Some(None) => continue,
1050 _ => {
1051 let peers = cache
1053 .get(table_id)
1054 .await
1055 .context(RequestInsertsSnafu)?
1056 .unwrap_or_default()
1057 .values()
1058 .cloned()
1059 .collect::<HashSet<_>>()
1060 .into_iter()
1061 .collect::<Vec<_>>();
1062
1063 if !peers.is_empty() {
1064 let mut reqs = RegionInsertRequests::default();
1065 reqs.requests.push(req.clone());
1066 num_rows += reqs
1067 .requests
1068 .iter()
1069 .map(|r| r.rows.as_ref().unwrap().rows.len())
1070 .sum::<usize>();
1071 src_table_reqs.insert(table_id, Some((peers, reqs)));
1072 } else {
1073 src_table_reqs.insert(table_id, None);
1075 }
1076 }
1077 }
1078 }
1079
1080 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1081
1082 for (_table_id, (peers, reqs)) in src_table_reqs
1083 .into_iter()
1084 .filter_map(|(k, v)| v.map(|v| (k, v)))
1085 {
1086 if peers.len() == 1 {
1087 inserts
1089 .entry(peers[0].clone())
1090 .or_default()
1091 .requests
1092 .extend(reqs.requests);
1093 continue;
1094 } else {
1095 for flownode in peers {
1097 inserts
1098 .entry(flownode.clone())
1099 .or_default()
1100 .requests
1101 .extend(reqs.requests.clone());
1102 }
1103 }
1104 }
1105
1106 Ok(Self {
1107 requests: inserts,
1108 num_rows,
1109 })
1110 }
1111
1112 fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1113 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1114 for (peer, inserts) in self.requests {
1115 let node_manager = node_manager.clone();
1116 common_runtime::spawn_global(async move {
1117 let result = node_manager
1118 .flownode(&peer)
1119 .await
1120 .handle_inserts(inserts)
1121 .await
1122 .context(RequestInsertsSnafu);
1123
1124 match result {
1125 Ok(resp) => {
1126 let affected_rows = resp.affected_rows;
1127 crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1128 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1129 }
1130 Err(err) => {
1131 error!(err; "Failed to insert data into flownode {}", peer);
1132 }
1133 }
1134 });
1135 }
1136
1137 Ok(())
1138 }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143 use std::sync::Arc;
1144
1145 use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
1146 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1147 use common_meta::cache::new_table_flownode_set_cache;
1148 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1149 use common_meta::test_util::MockDatanodeManager;
1150 use datatypes::data_type::ConcreteDataType;
1151 use datatypes::schema::ColumnSchema;
1152 use moka::future::Cache;
1153 use session::context::QueryContext;
1154 use table::dist_table::DummyDataSource;
1155 use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1156 use table::TableRef;
1157
1158 use super::*;
1159 use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1160
1161 fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1162 let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1163 ColumnSchema::new(
1164 ts_name,
1165 ConcreteDataType::timestamp_millisecond_datatype(),
1166 false,
1167 )
1168 .with_time_index(true),
1169 ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1170 ])
1171 .unwrap()
1172 .build()
1173 .unwrap();
1174 let meta = TableMetaBuilder::empty()
1175 .schema(Arc::new(schema))
1176 .primary_key_indices(vec![])
1177 .value_indices(vec![1])
1178 .engine("mito")
1179 .next_column_id(0)
1180 .options(Default::default())
1181 .created_on(Default::default())
1182 .region_numbers(vec![0])
1183 .build()
1184 .unwrap();
1185 let info = Arc::new(
1186 TableInfoBuilder::default()
1187 .table_id(1)
1188 .table_version(0)
1189 .name("test_table")
1190 .schema_name(DEFAULT_SCHEMA_NAME)
1191 .catalog_name(DEFAULT_CATALOG_NAME)
1192 .desc(None)
1193 .table_type(TableType::Base)
1194 .meta(meta)
1195 .build()
1196 .unwrap(),
1197 );
1198 Arc::new(table::Table::new(
1199 info,
1200 table::metadata::FilterPushDownType::Unsupported,
1201 Arc::new(DummyDataSource),
1202 ))
1203 }
1204
1205 #[tokio::test]
1206 async fn test_accommodate_existing_schema_logic() {
1207 let ts_name = "my_ts";
1208 let field_name = "my_field";
1209 let table = make_table_ref_with_schema(ts_name, field_name);
1210
1211 let mut req = RowInsertRequest {
1213 table_name: "test_table".to_string(),
1214 rows: Some(Rows {
1215 schema: vec![
1216 GrpcColumnSchema {
1217 column_name: "ts_wrong".to_string(),
1218 datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
1219 semantic_type: SemanticType::Timestamp as i32,
1220 ..Default::default()
1221 },
1222 GrpcColumnSchema {
1223 column_name: "field_wrong".to_string(),
1224 datatype: api::v1::ColumnDataType::Float64 as i32,
1225 semantic_type: SemanticType::Field as i32,
1226 ..Default::default()
1227 },
1228 ],
1229 rows: vec![api::v1::Row {
1230 values: vec![Value::default(), Value::default()],
1231 }],
1232 }),
1233 };
1234 let ctx = Arc::new(QueryContext::with(
1235 DEFAULT_CATALOG_NAME,
1236 DEFAULT_SCHEMA_NAME,
1237 ));
1238
1239 let kv_backend = prepare_mocked_backend().await;
1240 let inserter = Inserter::new(
1241 catalog::memory::MemoryCatalogManager::new(),
1242 create_partition_rule_manager(kv_backend.clone()).await,
1243 Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1244 Arc::new(new_table_flownode_set_cache(
1245 String::new(),
1246 Cache::new(100),
1247 kv_backend.clone(),
1248 )),
1249 );
1250 let alter_expr = inserter
1251 .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1252 .unwrap();
1253 assert!(alter_expr.is_none());
1254
1255 let req_schema = req.rows.as_ref().unwrap().schema.clone();
1257 assert_eq!(req_schema[0].column_name, ts_name);
1258 assert_eq!(req_schema[1].column_name, field_name);
1259 }
1260}