1use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21 InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22 RegionRequestHeader,
23};
24use api::v1::{
25 AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26 RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31 PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32 TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33 trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54 LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57 APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TTL_KEY,
58 TWCS_TIME_WINDOW,
59};
60use store_api::storage::{RegionId, TableId};
61use table::TableRef;
62use table::metadata::TableInfo;
63use table::requests::{
64 AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
65 TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
66};
67use table::table_reference::TableReference;
68
69use crate::error::{
70 CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
71 InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
72};
73use crate::expr_helper;
74use crate::region_req_factory::RegionRequestFactory;
75use crate::req_convert::common::preprocess_row_insert_requests;
76use crate::req_convert::insert::{
77 ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
78};
79use crate::statement::StatementExecutor;
80
81pub struct Inserter {
82 catalog_manager: CatalogManagerRef,
83 pub(crate) partition_manager: PartitionRuleManagerRef,
84 pub(crate) node_manager: NodeManagerRef,
85 pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
86}
87
88pub type InserterRef = Arc<Inserter>;
89
90#[derive(Clone)]
92pub enum AutoCreateTableType {
93 Logical(String),
95 Physical,
97 Log,
99 LastNonNull,
101 Trace,
103}
104
105impl AutoCreateTableType {
106 fn as_str(&self) -> &'static str {
107 match self {
108 AutoCreateTableType::Logical(_) => "logical",
109 AutoCreateTableType::Physical => "physical",
110 AutoCreateTableType::Log => "log",
111 AutoCreateTableType::LastNonNull => "last_non_null",
112 AutoCreateTableType::Trace => "trace",
113 }
114 }
115}
116
117#[derive(Clone)]
124pub struct InstantAndNormalInsertRequests {
125 pub normal_requests: RegionInsertRequests,
127 pub instant_requests: RegionInsertRequests,
130}
131
132impl Inserter {
133 pub fn new(
134 catalog_manager: CatalogManagerRef,
135 partition_manager: PartitionRuleManagerRef,
136 node_manager: NodeManagerRef,
137 table_flownode_set_cache: TableFlownodeSetCacheRef,
138 ) -> Self {
139 Self {
140 catalog_manager,
141 partition_manager,
142 node_manager,
143 table_flownode_set_cache,
144 }
145 }
146
147 pub async fn handle_column_inserts(
148 &self,
149 requests: InsertRequests,
150 ctx: QueryContextRef,
151 statement_executor: &StatementExecutor,
152 ) -> Result<Output> {
153 let row_inserts = ColumnToRow::convert(requests)?;
154 self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
155 .await
156 }
157
158 pub async fn handle_row_inserts(
160 &self,
161 mut requests: RowInsertRequests,
162 ctx: QueryContextRef,
163 statement_executor: &StatementExecutor,
164 accommodate_existing_schema: bool,
165 is_single_value: bool,
166 ) -> Result<Output> {
167 preprocess_row_insert_requests(&mut requests.inserts)?;
168 self.handle_row_inserts_with_create_type(
169 requests,
170 ctx,
171 statement_executor,
172 AutoCreateTableType::Physical,
173 accommodate_existing_schema,
174 is_single_value,
175 )
176 .await
177 }
178
179 pub async fn handle_log_inserts(
181 &self,
182 requests: RowInsertRequests,
183 ctx: QueryContextRef,
184 statement_executor: &StatementExecutor,
185 ) -> Result<Output> {
186 self.handle_row_inserts_with_create_type(
187 requests,
188 ctx,
189 statement_executor,
190 AutoCreateTableType::Log,
191 false,
192 false,
193 )
194 .await
195 }
196
197 pub async fn handle_trace_inserts(
198 &self,
199 requests: RowInsertRequests,
200 ctx: QueryContextRef,
201 statement_executor: &StatementExecutor,
202 ) -> Result<Output> {
203 self.handle_row_inserts_with_create_type(
204 requests,
205 ctx,
206 statement_executor,
207 AutoCreateTableType::Trace,
208 false,
209 false,
210 )
211 .await
212 }
213
214 pub async fn handle_last_non_null_inserts(
216 &self,
217 requests: RowInsertRequests,
218 ctx: QueryContextRef,
219 statement_executor: &StatementExecutor,
220 accommodate_existing_schema: bool,
221 is_single_value: bool,
222 ) -> Result<Output> {
223 self.handle_row_inserts_with_create_type(
224 requests,
225 ctx,
226 statement_executor,
227 AutoCreateTableType::LastNonNull,
228 accommodate_existing_schema,
229 is_single_value,
230 )
231 .await
232 }
233
234 async fn handle_row_inserts_with_create_type(
236 &self,
237 mut requests: RowInsertRequests,
238 ctx: QueryContextRef,
239 statement_executor: &StatementExecutor,
240 create_type: AutoCreateTableType,
241 accommodate_existing_schema: bool,
242 is_single_value: bool,
243 ) -> Result<Output> {
244 requests.inserts.retain(|req| {
246 req.rows
247 .as_ref()
248 .map(|r| !r.rows.is_empty())
249 .unwrap_or_default()
250 });
251 validate_column_count_match(&requests)?;
252
253 let CreateAlterTableResult {
254 instant_table_ids,
255 table_infos,
256 } = self
257 .create_or_alter_tables_on_demand(
258 &mut requests,
259 &ctx,
260 create_type,
261 statement_executor,
262 accommodate_existing_schema,
263 is_single_value,
264 )
265 .await?;
266
267 let name_to_info = table_infos
268 .values()
269 .map(|info| (info.name.clone(), info.clone()))
270 .collect::<HashMap<_, _>>();
271 let inserts = RowToRegion::new(
272 name_to_info,
273 instant_table_ids,
274 self.partition_manager.as_ref(),
275 )
276 .convert(requests)
277 .await?;
278
279 self.do_request(inserts, &table_infos, &ctx).await
280 }
281
282 pub async fn handle_metric_row_inserts(
284 &self,
285 mut requests: RowInsertRequests,
286 ctx: QueryContextRef,
287 statement_executor: &StatementExecutor,
288 physical_table: String,
289 ) -> Result<Output> {
290 requests.inserts.retain(|req| {
292 req.rows
293 .as_ref()
294 .map(|r| !r.rows.is_empty())
295 .unwrap_or_default()
296 });
297 validate_column_count_match(&requests)?;
298
299 self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
301 .await?;
302
303 let CreateAlterTableResult {
305 instant_table_ids,
306 table_infos,
307 } = self
308 .create_or_alter_tables_on_demand(
309 &mut requests,
310 &ctx,
311 AutoCreateTableType::Logical(physical_table.clone()),
312 statement_executor,
313 true,
314 true,
315 )
316 .await?;
317 let name_to_info = table_infos
318 .values()
319 .map(|info| (info.name.clone(), info.clone()))
320 .collect::<HashMap<_, _>>();
321 let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
322 .convert(requests)
323 .await?;
324
325 self.do_request(inserts, &table_infos, &ctx).await
326 }
327
328 pub async fn handle_table_insert(
329 &self,
330 request: TableInsertRequest,
331 ctx: QueryContextRef,
332 ) -> Result<Output> {
333 let catalog = request.catalog_name.as_str();
334 let schema = request.schema_name.as_str();
335 let table_name = request.table_name.as_str();
336 let table = self.get_table(catalog, schema, table_name).await?;
337 let table = table.with_context(|| TableNotFoundSnafu {
338 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
339 })?;
340 let table_info = table.table_info();
341
342 let inserts = TableToRegion::new(&table_info, &self.partition_manager)
343 .convert(request)
344 .await?;
345
346 let table_infos =
347 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
348
349 self.do_request(inserts, &table_infos, &ctx).await
350 }
351
352 pub async fn handle_statement_insert(
353 &self,
354 insert: &Insert,
355 ctx: &QueryContextRef,
356 statement_executor: &StatementExecutor,
357 ) -> Result<Output> {
358 let (inserts, table_info) =
359 StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
360 .convert(insert, ctx, statement_executor)
361 .await?;
362
363 let table_infos =
364 HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
365
366 self.do_request(inserts, &table_infos, ctx).await
367 }
368}
369
370impl Inserter {
371 async fn do_request(
372 &self,
373 requests: InstantAndNormalInsertRequests,
374 table_infos: &HashMap<TableId, Arc<TableInfo>>,
375 ctx: &QueryContextRef,
376 ) -> Result<Output> {
377 let requests = fill_reqs_with_impure_default(table_infos, requests)?;
379
380 let write_cost = write_meter!(
381 ctx.current_catalog(),
382 ctx.current_schema(),
383 requests,
384 ctx.channel() as u8
385 );
386 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
387 tracing_context: TracingContext::from_current_span().to_w3c(),
388 dbname: ctx.get_db_string(),
389 ..Default::default()
390 });
391
392 let InstantAndNormalInsertRequests {
393 normal_requests,
394 instant_requests,
395 } = requests;
396
397 let flow_mirror_task = FlowMirrorTask::new(
399 &self.table_flownode_set_cache,
400 normal_requests
401 .requests
402 .iter()
403 .chain(instant_requests.requests.iter()),
404 )
405 .await?;
406 flow_mirror_task.detach(self.node_manager.clone())?;
407
408 let write_tasks = self
410 .group_requests_by_peer(normal_requests)
411 .await?
412 .into_iter()
413 .map(|(peer, inserts)| {
414 let node_manager = self.node_manager.clone();
415 let request = request_factory.build_insert(inserts);
416 common_runtime::spawn_global(async move {
417 node_manager
418 .datanode(&peer)
419 .await
420 .handle(request)
421 .await
422 .context(RequestInsertsSnafu)
423 })
424 });
425 let results = future::try_join_all(write_tasks)
426 .await
427 .context(JoinTaskSnafu)?;
428 let affected_rows = results
429 .into_iter()
430 .map(|resp| resp.map(|r| r.affected_rows))
431 .sum::<Result<AffectedRows>>()?;
432 crate::metrics::DIST_INGEST_ROW_COUNT
433 .with_label_values(&[ctx.get_db_string().as_str()])
434 .inc_by(affected_rows as u64);
435 Ok(Output::new(
436 OutputData::AffectedRows(affected_rows),
437 OutputMeta::new_with_cost(write_cost as _),
438 ))
439 }
440
441 async fn group_requests_by_peer(
442 &self,
443 requests: RegionInsertRequests,
444 ) -> Result<HashMap<Peer, RegionInsertRequests>> {
445 let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
448 for req in requests.requests {
449 let region_id = RegionId::from_u64(req.region_id);
450 requests_per_region
451 .entry(region_id)
452 .or_default()
453 .requests
454 .push(req);
455 }
456
457 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
458
459 for (region_id, reqs) in requests_per_region {
460 let peer = self
461 .partition_manager
462 .find_region_leader(region_id)
463 .await
464 .context(FindRegionLeaderSnafu)?;
465 inserts
466 .entry(peer)
467 .or_default()
468 .requests
469 .extend(reqs.requests);
470 }
471
472 Ok(inserts)
473 }
474
475 async fn create_or_alter_tables_on_demand(
488 &self,
489 requests: &mut RowInsertRequests,
490 ctx: &QueryContextRef,
491 auto_create_table_type: AutoCreateTableType,
492 statement_executor: &StatementExecutor,
493 accommodate_existing_schema: bool,
494 is_single_value: bool,
495 ) -> Result<CreateAlterTableResult> {
496 let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
497 .with_label_values(&[auto_create_table_type.as_str()])
498 .start_timer();
499
500 let catalog = ctx.current_catalog();
501 let schema = ctx.current_schema();
502
503 let mut table_infos = HashMap::new();
504 let auto_create_table_hint = ctx
506 .extension(AUTO_CREATE_TABLE_KEY)
507 .map(|v| v.parse::<bool>())
508 .transpose()
509 .map_err(|_| {
510 InvalidInsertRequestSnafu {
511 reason: "`auto_create_table` hint must be a boolean",
512 }
513 .build()
514 })?
515 .unwrap_or(true);
516 if !auto_create_table_hint {
517 let mut instant_table_ids = HashSet::new();
518 for req in &requests.inserts {
519 let table = self
520 .get_table(catalog, &schema, &req.table_name)
521 .await?
522 .context(InvalidInsertRequestSnafu {
523 reason: format!(
524 "Table `{}` does not exist, and `auto_create_table` hint is disabled",
525 req.table_name
526 ),
527 })?;
528 let table_info = table.table_info();
529 if table_info.is_ttl_instant_table() {
530 instant_table_ids.insert(table_info.table_id());
531 }
532 table_infos.insert(table_info.table_id(), table.table_info());
533 }
534 let ret = CreateAlterTableResult {
535 instant_table_ids,
536 table_infos,
537 };
538 return Ok(ret);
539 }
540
541 let mut create_tables = vec![];
542 let mut alter_tables = vec![];
543 let mut need_refresh_table_infos = HashSet::new();
544 let mut instant_table_ids = HashSet::new();
545
546 for req in &mut requests.inserts {
547 match self.get_table(catalog, &schema, &req.table_name).await? {
548 Some(table) => {
549 let table_info = table.table_info();
550 if table_info.is_ttl_instant_table() {
551 instant_table_ids.insert(table_info.table_id());
552 }
553 if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
554 req,
555 &table,
556 ctx,
557 accommodate_existing_schema,
558 is_single_value,
559 )? {
560 alter_tables.push(alter_expr);
561 need_refresh_table_infos.insert((
562 catalog.to_string(),
563 schema.clone(),
564 req.table_name.clone(),
565 ));
566 } else {
567 table_infos.insert(table_info.table_id(), table.table_info());
568 }
569 }
570 None => {
571 let create_expr =
572 self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
573 create_tables.push(create_expr);
574 }
575 }
576 }
577
578 match auto_create_table_type {
579 AutoCreateTableType::Logical(_) => {
580 if !create_tables.is_empty() {
581 let tables = self
583 .create_logical_tables(create_tables, ctx, statement_executor)
584 .await?;
585
586 for table in tables {
587 let table_info = table.table_info();
588 if table_info.is_ttl_instant_table() {
589 instant_table_ids.insert(table_info.table_id());
590 }
591 table_infos.insert(table_info.table_id(), table.table_info());
592 }
593 }
594 if !alter_tables.is_empty() {
595 statement_executor
597 .alter_logical_tables(alter_tables, ctx.clone())
598 .await?;
599 }
600 }
601 AutoCreateTableType::Physical
602 | AutoCreateTableType::Log
603 | AutoCreateTableType::LastNonNull => {
604 for create_table in create_tables {
607 let table = self
608 .create_physical_table(create_table, None, ctx, statement_executor)
609 .await?;
610 let table_info = table.table_info();
611 if table_info.is_ttl_instant_table() {
612 instant_table_ids.insert(table_info.table_id());
613 }
614 table_infos.insert(table_info.table_id(), table.table_info());
615 }
616 for alter_expr in alter_tables.into_iter() {
617 statement_executor
618 .alter_table_inner(alter_expr, ctx.clone())
619 .await?;
620 }
621 }
622
623 AutoCreateTableType::Trace => {
624 let trace_table_name = ctx
625 .extension(TRACE_TABLE_NAME_SESSION_KEY)
626 .unwrap_or(TRACE_TABLE_NAME);
627
628 for mut create_table in create_tables {
631 if create_table.table_name == trace_services_table_name(trace_table_name)
632 || create_table.table_name == trace_operations_table_name(trace_table_name)
633 {
634 create_table
636 .table_options
637 .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
638 create_table.table_options.remove(TTL_KEY);
640
641 let table = self
642 .create_physical_table(create_table, None, ctx, statement_executor)
643 .await?;
644 let table_info = table.table_info();
645 if table_info.is_ttl_instant_table() {
646 instant_table_ids.insert(table_info.table_id());
647 }
648 table_infos.insert(table_info.table_id(), table.table_info());
649 } else {
650 let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
653 .context(CreatePartitionRulesSnafu)?;
654 let index_columns =
659 [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
660 for index_column in index_columns {
661 if let Some(col) = create_table
662 .column_defs
663 .iter_mut()
664 .find(|c| c.name == index_column)
665 {
666 col.options =
667 options_from_skipping(&SkippingIndexOptions::default())
668 .context(ColumnOptionsSnafu)?;
669 } else {
670 warn!(
671 "Column {} not found when creating index for trace table: {}.",
672 index_column, create_table.table_name
673 );
674 }
675 }
676
677 create_table.table_options.insert(
679 TABLE_DATA_MODEL.to_string(),
680 TABLE_DATA_MODEL_TRACE_V1.to_string(),
681 );
682
683 let table = self
684 .create_physical_table(
685 create_table,
686 Some(partitions),
687 ctx,
688 statement_executor,
689 )
690 .await?;
691 let table_info = table.table_info();
692 if table_info.is_ttl_instant_table() {
693 instant_table_ids.insert(table_info.table_id());
694 }
695 table_infos.insert(table_info.table_id(), table.table_info());
696 }
697 }
698 for alter_expr in alter_tables.into_iter() {
699 statement_executor
700 .alter_table_inner(alter_expr, ctx.clone())
701 .await?;
702 }
703 }
704 }
705
706 for (catalog, schema, table_name) in need_refresh_table_infos {
708 let table = self
709 .get_table(&catalog, &schema, &table_name)
710 .await?
711 .context(TableNotFoundSnafu {
712 table_name: common_catalog::format_full_table_name(
713 &catalog,
714 &schema,
715 &table_name,
716 ),
717 })?;
718 let table_info = table.table_info();
719 table_infos.insert(table_info.table_id(), table.table_info());
720 }
721
722 Ok(CreateAlterTableResult {
723 instant_table_ids,
724 table_infos,
725 })
726 }
727
728 async fn create_physical_table_on_demand(
729 &self,
730 ctx: &QueryContextRef,
731 physical_table: String,
732 statement_executor: &StatementExecutor,
733 ) -> Result<()> {
734 let catalog_name = ctx.current_catalog();
735 let schema_name = ctx.current_schema();
736
737 if self
739 .get_table(catalog_name, &schema_name, &physical_table)
740 .await?
741 .is_some()
742 {
743 return Ok(());
744 }
745
746 let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
747 info!("Physical metric table `{table_reference}` does not exist, try creating table");
748
749 let default_schema = vec![
751 ColumnSchema {
752 column_name: greptime_timestamp().to_string(),
753 datatype: ColumnDataType::TimestampMillisecond as _,
754 semantic_type: SemanticType::Timestamp as _,
755 datatype_extension: None,
756 options: None,
757 },
758 ColumnSchema {
759 column_name: greptime_value().to_string(),
760 datatype: ColumnDataType::Float64 as _,
761 semantic_type: SemanticType::Field as _,
762 datatype_extension: None,
763 options: None,
764 },
765 ];
766 let create_table_expr =
767 &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
768
769 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
770 create_table_expr
771 .table_options
772 .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
773
774 let res = statement_executor
776 .create_table_inner(create_table_expr, None, ctx.clone())
777 .await;
778
779 match res {
780 Ok(_) => {
781 info!("Successfully created table {table_reference}",);
782 Ok(())
783 }
784 Err(err) => {
785 error!(err; "Failed to create table {table_reference}");
786 Err(err)
787 }
788 }
789 }
790
791 async fn get_table(
792 &self,
793 catalog: &str,
794 schema: &str,
795 table: &str,
796 ) -> Result<Option<TableRef>> {
797 self.catalog_manager
798 .table(catalog, schema, table, None)
799 .await
800 .context(CatalogSnafu)
801 }
802
803 fn get_create_table_expr_on_demand(
804 &self,
805 req: &RowInsertRequest,
806 create_type: &AutoCreateTableType,
807 ctx: &QueryContextRef,
808 ) -> Result<CreateTableExpr> {
809 let mut table_options = std::collections::HashMap::with_capacity(4);
810 fill_table_options_for_create(&mut table_options, create_type, ctx);
811
812 let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
813 METRIC_ENGINE_NAME
815 } else {
816 default_engine()
817 };
818
819 let schema = ctx.current_schema();
820 let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
821 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
823 let mut create_table_expr =
824 build_create_table_expr(&table_ref, request_schema, engine_name)?;
825
826 info!("Table `{table_ref}` does not exist, try creating table");
827 create_table_expr.table_options.extend(table_options);
828 Ok(create_table_expr)
829 }
830
831 fn get_alter_table_expr_on_demand(
839 &self,
840 req: &mut RowInsertRequest,
841 table: &TableRef,
842 ctx: &QueryContextRef,
843 accommodate_existing_schema: bool,
844 is_single_value: bool,
845 ) -> Result<Option<AlterTableExpr>> {
846 let catalog_name = ctx.current_catalog();
847 let schema_name = ctx.current_schema();
848 let table_name = table.table_info().name.clone();
849
850 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
851 let column_exprs = ColumnExpr::from_column_schemas(request_schema);
852 let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
853 let Some(mut add_columns) = add_columns else {
854 return Ok(None);
855 };
856
857 if accommodate_existing_schema {
859 let table_schema = table.schema();
860 let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
862 let mut field_col_name = None;
864 if is_single_value {
865 let mut multiple_field_cols = false;
866 table.field_columns().for_each(|col| {
867 if field_col_name.is_none() {
868 field_col_name = Some(col.name.clone());
869 } else {
870 multiple_field_cols = true;
871 }
872 });
873 if multiple_field_cols {
874 field_col_name = None;
875 }
876 }
877
878 if let Some(rows) = req.rows.as_mut() {
880 for col in &mut rows.schema {
881 match col.semantic_type {
882 x if x == SemanticType::Timestamp as i32 => {
883 if let Some(ref ts_name) = ts_col_name
884 && col.column_name != *ts_name
885 {
886 col.column_name = ts_name.clone();
887 }
888 }
889 x if x == SemanticType::Field as i32 => {
890 if let Some(ref field_name) = field_col_name
891 && col.column_name != *field_name
892 {
893 col.column_name = field_name.clone();
894 }
895 }
896 _ => {}
897 }
898 }
899 }
900
901 add_columns.add_columns.retain(|col| {
903 let def = col.column_def.as_ref().unwrap();
904 def.semantic_type == SemanticType::Tag as i32
905 || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
906 });
907
908 if add_columns.add_columns.is_empty() {
909 return Ok(None);
910 }
911 }
912
913 Ok(Some(AlterTableExpr {
914 catalog_name: catalog_name.to_string(),
915 schema_name: schema_name.clone(),
916 table_name: table_name.clone(),
917 kind: Some(Kind::AddColumns(add_columns)),
918 }))
919 }
920
921 async fn create_physical_table(
923 &self,
924 mut create_table_expr: CreateTableExpr,
925 partitions: Option<Partitions>,
926 ctx: &QueryContextRef,
927 statement_executor: &StatementExecutor,
928 ) -> Result<TableRef> {
929 {
930 let table_ref = TableReference::full(
931 &create_table_expr.catalog_name,
932 &create_table_expr.schema_name,
933 &create_table_expr.table_name,
934 );
935
936 info!("Table `{table_ref}` does not exist, try creating table");
937 }
938 let res = statement_executor
939 .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
940 .await;
941
942 let table_ref = TableReference::full(
943 &create_table_expr.catalog_name,
944 &create_table_expr.schema_name,
945 &create_table_expr.table_name,
946 );
947
948 match res {
949 Ok(table) => {
950 info!(
951 "Successfully created table {} with options: {:?}",
952 table_ref, create_table_expr.table_options,
953 );
954 Ok(table)
955 }
956 Err(err) => {
957 error!(err; "Failed to create table {}", table_ref);
958 Err(err)
959 }
960 }
961 }
962
963 async fn create_logical_tables(
964 &self,
965 create_table_exprs: Vec<CreateTableExpr>,
966 ctx: &QueryContextRef,
967 statement_executor: &StatementExecutor,
968 ) -> Result<Vec<TableRef>> {
969 let res = statement_executor
970 .create_logical_tables(&create_table_exprs, ctx.clone())
971 .await;
972
973 match res {
974 Ok(res) => {
975 info!("Successfully created logical tables");
976 Ok(res)
977 }
978 Err(err) => {
979 let failed_tables = create_table_exprs
980 .into_iter()
981 .map(|expr| {
982 format!(
983 "{}.{}.{}",
984 expr.catalog_name, expr.schema_name, expr.table_name
985 )
986 })
987 .collect::<Vec<_>>();
988 error!(
989 err;
990 "Failed to create logical tables {:?}",
991 failed_tables
992 );
993 Err(err)
994 }
995 }
996 }
997
998 pub fn node_manager(&self) -> &NodeManagerRef {
999 &self.node_manager
1000 }
1001
1002 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
1003 &self.partition_manager
1004 }
1005}
1006
1007fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
1008 for request in &requests.inserts {
1009 let rows = request.rows.as_ref().unwrap();
1010 let column_count = rows.schema.len();
1011 rows.rows.iter().try_for_each(|r| {
1012 ensure!(
1013 r.values.len() == column_count,
1014 InvalidInsertRequestSnafu {
1015 reason: format!(
1016 "column count mismatch, columns: {}, values: {}",
1017 column_count,
1018 r.values.len()
1019 )
1020 }
1021 );
1022 Ok(())
1023 })?;
1024 }
1025 Ok(())
1026}
1027
1028pub fn fill_table_options_for_create(
1030 table_options: &mut std::collections::HashMap<String, String>,
1031 create_type: &AutoCreateTableType,
1032 ctx: &QueryContextRef,
1033) {
1034 for key in VALID_TABLE_OPTION_KEYS {
1035 if let Some(value) = ctx.extension(key) {
1036 table_options.insert(key.to_string(), value.to_string());
1037 }
1038 }
1039
1040 match create_type {
1041 AutoCreateTableType::Logical(physical_table) => {
1042 table_options.insert(
1043 LOGICAL_TABLE_METADATA_KEY.to_string(),
1044 physical_table.clone(),
1045 );
1046 }
1047 AutoCreateTableType::Physical => {
1048 if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1049 table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1050 }
1051 if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1052 table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1053 }
1054 if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1055 table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1056 table_options.insert(
1058 COMPACTION_TYPE.to_string(),
1059 COMPACTION_TYPE_TWCS.to_string(),
1060 );
1061 }
1062 }
1063 AutoCreateTableType::Log => {
1066 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1067 }
1068 AutoCreateTableType::LastNonNull => {
1069 table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1070 }
1071 AutoCreateTableType::Trace => {
1072 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1073 }
1074 }
1075}
1076
1077pub fn build_create_table_expr(
1078 table: &TableReference,
1079 request_schema: &[ColumnSchema],
1080 engine: &str,
1081) -> Result<CreateTableExpr> {
1082 expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1083}
1084
1085struct CreateAlterTableResult {
1087 instant_table_ids: HashSet<TableId>,
1089 table_infos: HashMap<TableId, Arc<TableInfo>>,
1091}
1092
1093struct FlowMirrorTask {
1094 requests: HashMap<Peer, RegionInsertRequests>,
1095 num_rows: usize,
1096}
1097
1098impl FlowMirrorTask {
1099 async fn new(
1100 cache: &TableFlownodeSetCacheRef,
1101 requests: impl Iterator<Item = &RegionInsertRequest>,
1102 ) -> Result<Self> {
1103 let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1104 HashMap::new();
1105 let mut num_rows = 0;
1106
1107 for req in requests {
1108 let table_id = RegionId::from_u64(req.region_id).table_id();
1109 match src_table_reqs.get_mut(&table_id) {
1110 Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1111 Some(None) => continue,
1113 _ => {
1114 let peers = cache
1116 .get(table_id)
1117 .await
1118 .context(RequestInsertsSnafu)?
1119 .unwrap_or_default()
1120 .values()
1121 .cloned()
1122 .collect::<HashSet<_>>()
1123 .into_iter()
1124 .collect::<Vec<_>>();
1125
1126 if !peers.is_empty() {
1127 let mut reqs = RegionInsertRequests::default();
1128 reqs.requests.push(req.clone());
1129 num_rows += reqs
1130 .requests
1131 .iter()
1132 .map(|r| r.rows.as_ref().unwrap().rows.len())
1133 .sum::<usize>();
1134 src_table_reqs.insert(table_id, Some((peers, reqs)));
1135 } else {
1136 src_table_reqs.insert(table_id, None);
1138 }
1139 }
1140 }
1141 }
1142
1143 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1144
1145 for (_table_id, (peers, reqs)) in src_table_reqs
1146 .into_iter()
1147 .filter_map(|(k, v)| v.map(|v| (k, v)))
1148 {
1149 if peers.len() == 1 {
1150 inserts
1152 .entry(peers[0].clone())
1153 .or_default()
1154 .requests
1155 .extend(reqs.requests);
1156 continue;
1157 } else {
1158 for flownode in peers {
1160 inserts
1161 .entry(flownode.clone())
1162 .or_default()
1163 .requests
1164 .extend(reqs.requests.clone());
1165 }
1166 }
1167 }
1168
1169 Ok(Self {
1170 requests: inserts,
1171 num_rows,
1172 })
1173 }
1174
1175 fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1176 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1177 for (peer, inserts) in self.requests {
1178 let node_manager = node_manager.clone();
1179 common_runtime::spawn_global(async move {
1180 let result = node_manager
1181 .flownode(&peer)
1182 .await
1183 .handle_inserts(inserts)
1184 .await
1185 .context(RequestInsertsSnafu);
1186
1187 match result {
1188 Ok(resp) => {
1189 let affected_rows = resp.affected_rows;
1190 crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1191 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1192 }
1193 Err(err) => {
1194 error!(err; "Failed to insert data into flownode {}", peer);
1195 }
1196 }
1197 });
1198 }
1199
1200 Ok(())
1201 }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206 use std::sync::Arc;
1207
1208 use api::v1::helper::{field_column_schema, time_index_column_schema};
1209 use api::v1::{RowInsertRequest, Rows, Value};
1210 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1211 use common_meta::cache::new_table_flownode_set_cache;
1212 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1213 use common_meta::test_util::MockDatanodeManager;
1214 use datatypes::data_type::ConcreteDataType;
1215 use datatypes::schema::ColumnSchema;
1216 use moka::future::Cache;
1217 use session::context::QueryContext;
1218 use table::TableRef;
1219 use table::dist_table::DummyDataSource;
1220 use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1221
1222 use super::*;
1223 use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1224
1225 fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1226 let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1227 ColumnSchema::new(
1228 ts_name,
1229 ConcreteDataType::timestamp_millisecond_datatype(),
1230 false,
1231 )
1232 .with_time_index(true),
1233 ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1234 ])
1235 .unwrap()
1236 .build()
1237 .unwrap();
1238 let meta = TableMetaBuilder::empty()
1239 .schema(Arc::new(schema))
1240 .primary_key_indices(vec![])
1241 .value_indices(vec![1])
1242 .engine("mito")
1243 .next_column_id(0)
1244 .options(Default::default())
1245 .created_on(Default::default())
1246 .build()
1247 .unwrap();
1248 let info = Arc::new(
1249 TableInfoBuilder::default()
1250 .table_id(1)
1251 .table_version(0)
1252 .name("test_table")
1253 .schema_name(DEFAULT_SCHEMA_NAME)
1254 .catalog_name(DEFAULT_CATALOG_NAME)
1255 .desc(None)
1256 .table_type(TableType::Base)
1257 .meta(meta)
1258 .build()
1259 .unwrap(),
1260 );
1261 Arc::new(table::Table::new(
1262 info,
1263 table::metadata::FilterPushDownType::Unsupported,
1264 Arc::new(DummyDataSource),
1265 ))
1266 }
1267
1268 #[tokio::test]
1269 async fn test_accommodate_existing_schema_logic() {
1270 let ts_name = "my_ts";
1271 let field_name = "my_field";
1272 let table = make_table_ref_with_schema(ts_name, field_name);
1273
1274 let mut req = RowInsertRequest {
1276 table_name: "test_table".to_string(),
1277 rows: Some(Rows {
1278 schema: vec![
1279 time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1280 field_column_schema("field_wrong", ColumnDataType::Float64),
1281 ],
1282 rows: vec![api::v1::Row {
1283 values: vec![Value::default(), Value::default()],
1284 }],
1285 }),
1286 };
1287 let ctx = Arc::new(QueryContext::with(
1288 DEFAULT_CATALOG_NAME,
1289 DEFAULT_SCHEMA_NAME,
1290 ));
1291
1292 let kv_backend = prepare_mocked_backend().await;
1293 let inserter = Inserter::new(
1294 catalog::memory::MemoryCatalogManager::new(),
1295 create_partition_rule_manager(kv_backend.clone()).await,
1296 Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1297 Arc::new(new_table_flownode_set_cache(
1298 String::new(),
1299 Cache::new(100),
1300 kv_backend.clone(),
1301 )),
1302 );
1303 let alter_expr = inserter
1304 .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1305 .unwrap();
1306 assert!(alter_expr.is_none());
1307
1308 let req_schema = req.rows.as_ref().unwrap().schema.clone();
1310 assert_eq!(req_schema[0].column_name, ts_name);
1311 assert_eq!(req_schema[1].column_name, field_name);
1312 }
1313}