1use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21 InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22 RegionRequestHeader,
23};
24use api::v1::{
25 AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26 RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31 PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32 TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33 trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54 LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57 APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TTL_KEY,
58 TWCS_TIME_WINDOW,
59};
60use store_api::storage::{RegionId, TableId};
61use table::TableRef;
62use table::metadata::TableInfo;
63use table::requests::{
64 AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
65 TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS,
66};
67use table::table_reference::TableReference;
68
69use crate::error::{
70 CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
71 InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
72};
73use crate::expr_helper;
74use crate::region_req_factory::RegionRequestFactory;
75use crate::req_convert::common::preprocess_row_insert_requests;
76use crate::req_convert::insert::{
77 ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
78};
79use crate::statement::StatementExecutor;
80
81pub struct Inserter {
82 catalog_manager: CatalogManagerRef,
83 pub(crate) partition_manager: PartitionRuleManagerRef,
84 pub(crate) node_manager: NodeManagerRef,
85 pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
86}
87
88pub type InserterRef = Arc<Inserter>;
89
90#[derive(Clone)]
92pub enum AutoCreateTableType {
93 Logical(String),
95 Physical,
97 Log,
99 LastNonNull,
101 Trace,
103}
104
105impl AutoCreateTableType {
106 pub fn as_str(&self) -> &'static str {
107 match self {
108 AutoCreateTableType::Logical(_) => "logical",
109 AutoCreateTableType::Physical => "physical",
110 AutoCreateTableType::Log => "log",
111 AutoCreateTableType::LastNonNull => "last_non_null",
112 AutoCreateTableType::Trace => "trace",
113 }
114 }
115}
116
117#[derive(Clone)]
124pub struct InstantAndNormalInsertRequests {
125 pub normal_requests: RegionInsertRequests,
127 pub instant_requests: RegionInsertRequests,
130}
131
132impl Inserter {
133 pub fn new(
134 catalog_manager: CatalogManagerRef,
135 partition_manager: PartitionRuleManagerRef,
136 node_manager: NodeManagerRef,
137 table_flownode_set_cache: TableFlownodeSetCacheRef,
138 ) -> Self {
139 Self {
140 catalog_manager,
141 partition_manager,
142 node_manager,
143 table_flownode_set_cache,
144 }
145 }
146
147 pub async fn handle_column_inserts(
148 &self,
149 requests: InsertRequests,
150 ctx: QueryContextRef,
151 statement_executor: &StatementExecutor,
152 ) -> Result<Output> {
153 let row_inserts = ColumnToRow::convert(requests)?;
154 self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
155 .await
156 }
157
158 pub async fn handle_row_inserts(
160 &self,
161 mut requests: RowInsertRequests,
162 ctx: QueryContextRef,
163 statement_executor: &StatementExecutor,
164 accommodate_existing_schema: bool,
165 is_single_value: bool,
166 ) -> Result<Output> {
167 preprocess_row_insert_requests(&mut requests.inserts)?;
168 self.handle_row_inserts_with_create_type(
169 requests,
170 ctx,
171 statement_executor,
172 AutoCreateTableType::Physical,
173 accommodate_existing_schema,
174 is_single_value,
175 )
176 .await
177 }
178
179 pub async fn handle_log_inserts(
181 &self,
182 requests: RowInsertRequests,
183 ctx: QueryContextRef,
184 statement_executor: &StatementExecutor,
185 ) -> Result<Output> {
186 self.handle_row_inserts_with_create_type(
187 requests,
188 ctx,
189 statement_executor,
190 AutoCreateTableType::Log,
191 false,
192 false,
193 )
194 .await
195 }
196
197 pub async fn handle_trace_inserts(
198 &self,
199 requests: RowInsertRequests,
200 ctx: QueryContextRef,
201 statement_executor: &StatementExecutor,
202 ) -> Result<Output> {
203 self.handle_row_inserts_with_create_type(
204 requests,
205 ctx,
206 statement_executor,
207 AutoCreateTableType::Trace,
208 false,
209 false,
210 )
211 .await
212 }
213
214 pub async fn handle_last_non_null_inserts(
216 &self,
217 requests: RowInsertRequests,
218 ctx: QueryContextRef,
219 statement_executor: &StatementExecutor,
220 accommodate_existing_schema: bool,
221 is_single_value: bool,
222 ) -> Result<Output> {
223 self.handle_row_inserts_with_create_type(
224 requests,
225 ctx,
226 statement_executor,
227 AutoCreateTableType::LastNonNull,
228 accommodate_existing_schema,
229 is_single_value,
230 )
231 .await
232 }
233
234 async fn handle_row_inserts_with_create_type(
236 &self,
237 mut requests: RowInsertRequests,
238 ctx: QueryContextRef,
239 statement_executor: &StatementExecutor,
240 create_type: AutoCreateTableType,
241 accommodate_existing_schema: bool,
242 is_single_value: bool,
243 ) -> Result<Output> {
244 requests.inserts.retain(|req| {
246 req.rows
247 .as_ref()
248 .map(|r| !r.rows.is_empty())
249 .unwrap_or_default()
250 });
251 validate_column_count_match(&requests)?;
252
253 let CreateAlterTableResult {
254 instant_table_ids,
255 table_infos,
256 } = self
257 .create_or_alter_tables_on_demand(
258 &mut requests,
259 &ctx,
260 create_type,
261 statement_executor,
262 accommodate_existing_schema,
263 is_single_value,
264 )
265 .await?;
266
267 let name_to_info = table_infos
268 .values()
269 .map(|info| (info.name.clone(), info.clone()))
270 .collect::<HashMap<_, _>>();
271 let inserts = RowToRegion::new(
272 name_to_info,
273 instant_table_ids,
274 self.partition_manager.as_ref(),
275 )
276 .convert(requests)
277 .await?;
278
279 self.do_request(inserts, &table_infos, &ctx).await
280 }
281
282 pub async fn handle_metric_row_inserts(
284 &self,
285 mut requests: RowInsertRequests,
286 ctx: QueryContextRef,
287 statement_executor: &StatementExecutor,
288 physical_table: String,
289 ) -> Result<Output> {
290 requests.inserts.retain(|req| {
292 req.rows
293 .as_ref()
294 .map(|r| !r.rows.is_empty())
295 .unwrap_or_default()
296 });
297 validate_column_count_match(&requests)?;
298
299 self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
301 .await?;
302
303 let CreateAlterTableResult {
305 instant_table_ids,
306 table_infos,
307 } = self
308 .create_or_alter_tables_on_demand(
309 &mut requests,
310 &ctx,
311 AutoCreateTableType::Logical(physical_table.clone()),
312 statement_executor,
313 true,
314 true,
315 )
316 .await?;
317 let name_to_info = table_infos
318 .values()
319 .map(|info| (info.name.clone(), info.clone()))
320 .collect::<HashMap<_, _>>();
321 let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
322 .convert(requests)
323 .await?;
324
325 self.do_request(inserts, &table_infos, &ctx).await
326 }
327
328 pub async fn handle_table_insert(
329 &self,
330 request: TableInsertRequest,
331 ctx: QueryContextRef,
332 ) -> Result<Output> {
333 let catalog = request.catalog_name.as_str();
334 let schema = request.schema_name.as_str();
335 let table_name = request.table_name.as_str();
336 let table = self.get_table(catalog, schema, table_name).await?;
337 let table = table.with_context(|| TableNotFoundSnafu {
338 table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
339 })?;
340 let table_info = table.table_info();
341
342 let inserts = TableToRegion::new(&table_info, &self.partition_manager)
343 .convert(request)
344 .await?;
345
346 let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
347
348 self.do_request(inserts, &table_infos, &ctx).await
349 }
350
351 pub async fn handle_statement_insert(
352 &self,
353 insert: &Insert,
354 ctx: &QueryContextRef,
355 ) -> Result<Output> {
356 let (inserts, table_info) =
357 StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
358 .convert(insert, ctx)
359 .await?;
360
361 let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
362
363 self.do_request(inserts, &table_infos, ctx).await
364 }
365}
366
367impl Inserter {
368 async fn do_request(
369 &self,
370 requests: InstantAndNormalInsertRequests,
371 table_infos: &HashMap<TableId, Arc<TableInfo>>,
372 ctx: &QueryContextRef,
373 ) -> Result<Output> {
374 let requests = fill_reqs_with_impure_default(table_infos, requests)?;
376
377 let write_cost = write_meter!(
378 ctx.current_catalog(),
379 ctx.current_schema(),
380 requests,
381 ctx.channel() as u8
382 );
383 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
384 tracing_context: TracingContext::from_current_span().to_w3c(),
385 dbname: ctx.get_db_string(),
386 ..Default::default()
387 });
388
389 let InstantAndNormalInsertRequests {
390 normal_requests,
391 instant_requests,
392 } = requests;
393
394 let flow_mirror_task = FlowMirrorTask::new(
396 &self.table_flownode_set_cache,
397 normal_requests
398 .requests
399 .iter()
400 .chain(instant_requests.requests.iter()),
401 )
402 .await?;
403 flow_mirror_task.detach(self.node_manager.clone())?;
404
405 let write_tasks = self
407 .group_requests_by_peer(normal_requests)
408 .await?
409 .into_iter()
410 .map(|(peer, inserts)| {
411 let node_manager = self.node_manager.clone();
412 let request = request_factory.build_insert(inserts);
413 common_runtime::spawn_global(async move {
414 node_manager
415 .datanode(&peer)
416 .await
417 .handle(request)
418 .await
419 .context(RequestInsertsSnafu)
420 })
421 });
422 let results = future::try_join_all(write_tasks)
423 .await
424 .context(JoinTaskSnafu)?;
425 let affected_rows = results
426 .into_iter()
427 .map(|resp| resp.map(|r| r.affected_rows))
428 .sum::<Result<AffectedRows>>()?;
429 crate::metrics::DIST_INGEST_ROW_COUNT
430 .with_label_values(&[ctx.get_db_string().as_str()])
431 .inc_by(affected_rows as u64);
432 Ok(Output::new(
433 OutputData::AffectedRows(affected_rows),
434 OutputMeta::new_with_cost(write_cost as _),
435 ))
436 }
437
438 async fn group_requests_by_peer(
439 &self,
440 requests: RegionInsertRequests,
441 ) -> Result<HashMap<Peer, RegionInsertRequests>> {
442 let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
445 for req in requests.requests {
446 let region_id = RegionId::from_u64(req.region_id);
447 requests_per_region
448 .entry(region_id)
449 .or_default()
450 .requests
451 .push(req);
452 }
453
454 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
455
456 for (region_id, reqs) in requests_per_region {
457 let peer = self
458 .partition_manager
459 .find_region_leader(region_id)
460 .await
461 .context(FindRegionLeaderSnafu)?;
462 inserts
463 .entry(peer)
464 .or_default()
465 .requests
466 .extend(reqs.requests);
467 }
468
469 Ok(inserts)
470 }
471
472 async fn create_or_alter_tables_on_demand(
485 &self,
486 requests: &mut RowInsertRequests,
487 ctx: &QueryContextRef,
488 auto_create_table_type: AutoCreateTableType,
489 statement_executor: &StatementExecutor,
490 accommodate_existing_schema: bool,
491 is_single_value: bool,
492 ) -> Result<CreateAlterTableResult> {
493 let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
494 .with_label_values(&[auto_create_table_type.as_str()])
495 .start_timer();
496
497 let catalog = ctx.current_catalog();
498 let schema = ctx.current_schema();
499
500 let mut table_infos = HashMap::new();
501 let auto_create_table_hint = ctx
503 .extension(AUTO_CREATE_TABLE_KEY)
504 .map(|v| v.parse::<bool>())
505 .transpose()
506 .map_err(|_| {
507 InvalidInsertRequestSnafu {
508 reason: "`auto_create_table` hint must be a boolean",
509 }
510 .build()
511 })?
512 .unwrap_or(true);
513 if !auto_create_table_hint {
514 let mut instant_table_ids = HashSet::new();
515 for req in &requests.inserts {
516 let table = self
517 .get_table(catalog, &schema, &req.table_name)
518 .await?
519 .context(InvalidInsertRequestSnafu {
520 reason: format!(
521 "Table `{}` does not exist, and `auto_create_table` hint is disabled",
522 req.table_name
523 ),
524 })?;
525 let table_info = table.table_info();
526 if table_info.is_ttl_instant_table() {
527 instant_table_ids.insert(table_info.table_id());
528 }
529 table_infos.insert(table_info.table_id(), table.table_info());
530 }
531 let ret = CreateAlterTableResult {
532 instant_table_ids,
533 table_infos,
534 };
535 return Ok(ret);
536 }
537
538 let mut create_tables = vec![];
539 let mut alter_tables = vec![];
540 let mut need_refresh_table_infos = HashSet::new();
541 let mut instant_table_ids = HashSet::new();
542
543 for req in &mut requests.inserts {
544 match self.get_table(catalog, &schema, &req.table_name).await? {
545 Some(table) => {
546 let table_info = table.table_info();
547 if table_info.is_ttl_instant_table() {
548 instant_table_ids.insert(table_info.table_id());
549 }
550 if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
551 req,
552 &table,
553 ctx,
554 accommodate_existing_schema,
555 is_single_value,
556 )? {
557 alter_tables.push(alter_expr);
558 need_refresh_table_infos.insert((
559 catalog.to_string(),
560 schema.clone(),
561 req.table_name.clone(),
562 ));
563 } else {
564 table_infos.insert(table_info.table_id(), table.table_info());
565 }
566 }
567 None => {
568 let create_expr =
569 self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
570 create_tables.push(create_expr);
571 }
572 }
573 }
574
575 match auto_create_table_type {
576 AutoCreateTableType::Logical(_) => {
577 if !create_tables.is_empty() {
578 let tables = self
580 .create_logical_tables(create_tables, ctx, statement_executor)
581 .await?;
582
583 for table in tables {
584 let table_info = table.table_info();
585 if table_info.is_ttl_instant_table() {
586 instant_table_ids.insert(table_info.table_id());
587 }
588 table_infos.insert(table_info.table_id(), table.table_info());
589 }
590 }
591 if !alter_tables.is_empty() {
592 statement_executor
594 .alter_logical_tables(alter_tables, ctx.clone())
595 .await?;
596 }
597 }
598 AutoCreateTableType::Physical
599 | AutoCreateTableType::Log
600 | AutoCreateTableType::LastNonNull => {
601 for create_table in create_tables {
604 let table = self
605 .create_physical_table(create_table, None, ctx, statement_executor)
606 .await?;
607 let table_info = table.table_info();
608 if table_info.is_ttl_instant_table() {
609 instant_table_ids.insert(table_info.table_id());
610 }
611 table_infos.insert(table_info.table_id(), table.table_info());
612 }
613 for alter_expr in alter_tables.into_iter() {
614 statement_executor
615 .alter_table_inner(alter_expr, ctx.clone())
616 .await?;
617 }
618 }
619
620 AutoCreateTableType::Trace => {
621 let trace_table_name = ctx
622 .extension(TRACE_TABLE_NAME_SESSION_KEY)
623 .unwrap_or(TRACE_TABLE_NAME);
624
625 let trace_table_partitions = if let Some(trace_table_partitions) =
626 ctx.extension(TRACE_TABLE_PARTITIONS_HINT_KEY)
627 {
628 let p = trace_table_partitions.parse::<u32>().map_err(|_| {
629 InvalidInsertRequestSnafu {
630 reason: format!(
631 "Failed to parse trace_table_partitions: {}",
632 trace_table_partitions
633 ),
634 }
635 .build()
636 })?;
637 Some(p)
638 } else {
639 None
640 };
641
642 for mut create_table in create_tables {
645 if create_table.table_name == trace_services_table_name(trace_table_name)
646 || create_table.table_name == trace_operations_table_name(trace_table_name)
647 {
648 create_table
650 .table_options
651 .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
652 create_table.table_options.remove(TTL_KEY);
654
655 let table = self
656 .create_physical_table(create_table, None, ctx, statement_executor)
657 .await?;
658 let table_info = table.table_info();
659 if table_info.is_ttl_instant_table() {
660 instant_table_ids.insert(table_info.table_id());
661 }
662 table_infos.insert(table_info.table_id(), table.table_info());
663 } else {
664 let partitions = if matches!(trace_table_partitions, Some(0) | Some(1)) {
667 None
669 } else {
670 let p = partition_rule_for_hexstring(
671 TRACE_ID_COLUMN,
672 trace_table_partitions,
673 )
674 .context(CreatePartitionRulesSnafu)?;
675 Some(p)
676 };
677
678 let index_columns =
683 [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
684 for index_column in index_columns {
685 if let Some(col) = create_table
686 .column_defs
687 .iter_mut()
688 .find(|c| c.name == index_column)
689 {
690 col.options =
691 options_from_skipping(&SkippingIndexOptions::default())
692 .context(ColumnOptionsSnafu)?;
693 } else {
694 warn!(
695 "Column {} not found when creating index for trace table: {}.",
696 index_column, create_table.table_name
697 );
698 }
699 }
700
701 create_table.table_options.insert(
703 TABLE_DATA_MODEL.to_string(),
704 TABLE_DATA_MODEL_TRACE_V1.to_string(),
705 );
706
707 let table = self
708 .create_physical_table(
709 create_table,
710 partitions,
711 ctx,
712 statement_executor,
713 )
714 .await?;
715 let table_info = table.table_info();
716 if table_info.is_ttl_instant_table() {
717 instant_table_ids.insert(table_info.table_id());
718 }
719 table_infos.insert(table_info.table_id(), table.table_info());
720 }
721 }
722 for alter_expr in alter_tables.into_iter() {
723 statement_executor
724 .alter_table_inner(alter_expr, ctx.clone())
725 .await?;
726 }
727 }
728 }
729
730 for (catalog, schema, table_name) in need_refresh_table_infos {
732 let table = self
733 .get_table(&catalog, &schema, &table_name)
734 .await?
735 .context(TableNotFoundSnafu {
736 table_name: common_catalog::format_full_table_name(
737 &catalog,
738 &schema,
739 &table_name,
740 ),
741 })?;
742 let table_info = table.table_info();
743 table_infos.insert(table_info.table_id(), table.table_info());
744 }
745
746 Ok(CreateAlterTableResult {
747 instant_table_ids,
748 table_infos,
749 })
750 }
751
752 async fn create_physical_table_on_demand(
753 &self,
754 ctx: &QueryContextRef,
755 physical_table: String,
756 statement_executor: &StatementExecutor,
757 ) -> Result<()> {
758 let catalog_name = ctx.current_catalog();
759 let schema_name = ctx.current_schema();
760
761 if self
763 .get_table(catalog_name, &schema_name, &physical_table)
764 .await?
765 .is_some()
766 {
767 return Ok(());
768 }
769
770 let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
771 info!("Physical metric table `{table_reference}` does not exist, try creating table");
772
773 let default_schema = vec![
775 ColumnSchema {
776 column_name: greptime_timestamp().to_string(),
777 datatype: ColumnDataType::TimestampMillisecond as _,
778 semantic_type: SemanticType::Timestamp as _,
779 datatype_extension: None,
780 options: None,
781 },
782 ColumnSchema {
783 column_name: greptime_value().to_string(),
784 datatype: ColumnDataType::Float64 as _,
785 semantic_type: SemanticType::Field as _,
786 datatype_extension: None,
787 options: None,
788 },
789 ];
790 let create_table_expr =
791 &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
792
793 create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
794 create_table_expr
795 .table_options
796 .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
797
798 let res = statement_executor
800 .create_table_inner(create_table_expr, None, ctx.clone())
801 .await;
802
803 match res {
804 Ok(_) => {
805 info!("Successfully created table {table_reference}",);
806 Ok(())
807 }
808 Err(err) => {
809 error!(err; "Failed to create table {table_reference}");
810 Err(err)
811 }
812 }
813 }
814
815 async fn get_table(
816 &self,
817 catalog: &str,
818 schema: &str,
819 table: &str,
820 ) -> Result<Option<TableRef>> {
821 self.catalog_manager
822 .table(catalog, schema, table, None)
823 .await
824 .context(CatalogSnafu)
825 }
826
827 fn get_create_table_expr_on_demand(
828 &self,
829 req: &RowInsertRequest,
830 create_type: &AutoCreateTableType,
831 ctx: &QueryContextRef,
832 ) -> Result<CreateTableExpr> {
833 let mut table_options = std::collections::HashMap::with_capacity(4);
834 fill_table_options_for_create(&mut table_options, create_type, ctx);
835
836 let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
837 METRIC_ENGINE_NAME
839 } else {
840 default_engine()
841 };
842
843 let schema = ctx.current_schema();
844 let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
845 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
847 let mut create_table_expr =
848 build_create_table_expr(&table_ref, request_schema, engine_name)?;
849
850 info!("Table `{table_ref}` does not exist, try creating table");
851 create_table_expr.table_options.extend(table_options);
852 Ok(create_table_expr)
853 }
854
855 fn get_alter_table_expr_on_demand(
863 &self,
864 req: &mut RowInsertRequest,
865 table: &TableRef,
866 ctx: &QueryContextRef,
867 accommodate_existing_schema: bool,
868 is_single_value: bool,
869 ) -> Result<Option<AlterTableExpr>> {
870 let catalog_name = ctx.current_catalog();
871 let schema_name = ctx.current_schema();
872 let table_name = table.table_info().name.clone();
873
874 let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
875 let column_exprs = ColumnExpr::from_column_schemas(request_schema);
876 let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
877 let Some(mut add_columns) = add_columns else {
878 return Ok(None);
879 };
880
881 if accommodate_existing_schema {
883 let table_schema = table.schema();
884 let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
886 let mut field_col_name = None;
888 if is_single_value {
889 let mut multiple_field_cols = false;
890 table.field_columns().for_each(|col| {
891 if field_col_name.is_none() {
892 field_col_name = Some(col.name.clone());
893 } else {
894 multiple_field_cols = true;
895 }
896 });
897 if multiple_field_cols {
898 field_col_name = None;
899 }
900 }
901
902 if let Some(rows) = req.rows.as_mut() {
904 for col in &mut rows.schema {
905 match col.semantic_type {
906 x if x == SemanticType::Timestamp as i32 => {
907 if let Some(ref ts_name) = ts_col_name
908 && col.column_name != *ts_name
909 {
910 col.column_name = ts_name.clone();
911 }
912 }
913 x if x == SemanticType::Field as i32 => {
914 if let Some(ref field_name) = field_col_name
915 && col.column_name != *field_name
916 {
917 col.column_name = field_name.clone();
918 }
919 }
920 _ => {}
921 }
922 }
923 }
924
925 add_columns.add_columns.retain(|col| {
927 let def = col.column_def.as_ref().unwrap();
928 def.semantic_type == SemanticType::Tag as i32
929 || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
930 });
931
932 if add_columns.add_columns.is_empty() {
933 return Ok(None);
934 }
935 }
936
937 Ok(Some(AlterTableExpr {
938 catalog_name: catalog_name.to_string(),
939 schema_name: schema_name.clone(),
940 table_name: table_name.clone(),
941 kind: Some(Kind::AddColumns(add_columns)),
942 }))
943 }
944
945 async fn create_physical_table(
947 &self,
948 mut create_table_expr: CreateTableExpr,
949 partitions: Option<Partitions>,
950 ctx: &QueryContextRef,
951 statement_executor: &StatementExecutor,
952 ) -> Result<TableRef> {
953 {
954 let table_ref = TableReference::full(
955 &create_table_expr.catalog_name,
956 &create_table_expr.schema_name,
957 &create_table_expr.table_name,
958 );
959
960 info!("Table `{table_ref}` does not exist, try creating table");
961 }
962 let res = statement_executor
963 .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
964 .await;
965
966 let table_ref = TableReference::full(
967 &create_table_expr.catalog_name,
968 &create_table_expr.schema_name,
969 &create_table_expr.table_name,
970 );
971
972 match res {
973 Ok(table) => {
974 info!(
975 "Successfully created table {} with options: {:?}",
976 table_ref, create_table_expr.table_options,
977 );
978 Ok(table)
979 }
980 Err(err) => {
981 error!(err; "Failed to create table {}", table_ref);
982 Err(err)
983 }
984 }
985 }
986
987 async fn create_logical_tables(
988 &self,
989 create_table_exprs: Vec<CreateTableExpr>,
990 ctx: &QueryContextRef,
991 statement_executor: &StatementExecutor,
992 ) -> Result<Vec<TableRef>> {
993 let res = statement_executor
994 .create_logical_tables(&create_table_exprs, ctx.clone())
995 .await;
996
997 match res {
998 Ok(res) => {
999 info!("Successfully created logical tables");
1000 Ok(res)
1001 }
1002 Err(err) => {
1003 let failed_tables = create_table_exprs
1004 .into_iter()
1005 .map(|expr| {
1006 format!(
1007 "{}.{}.{}",
1008 expr.catalog_name, expr.schema_name, expr.table_name
1009 )
1010 })
1011 .collect::<Vec<_>>();
1012 error!(
1013 err;
1014 "Failed to create logical tables {:?}",
1015 failed_tables
1016 );
1017 Err(err)
1018 }
1019 }
1020 }
1021
1022 pub fn node_manager(&self) -> &NodeManagerRef {
1023 &self.node_manager
1024 }
1025
1026 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
1027 &self.partition_manager
1028 }
1029}
1030
1031fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
1032 for request in &requests.inserts {
1033 let rows = request.rows.as_ref().unwrap();
1034 let column_count = rows.schema.len();
1035 rows.rows.iter().try_for_each(|r| {
1036 ensure!(
1037 r.values.len() == column_count,
1038 InvalidInsertRequestSnafu {
1039 reason: format!(
1040 "column count mismatch, columns: {}, values: {}",
1041 column_count,
1042 r.values.len()
1043 )
1044 }
1045 );
1046 Ok(())
1047 })?;
1048 }
1049 Ok(())
1050}
1051
1052pub fn fill_table_options_for_create(
1054 table_options: &mut std::collections::HashMap<String, String>,
1055 create_type: &AutoCreateTableType,
1056 ctx: &QueryContextRef,
1057) {
1058 for key in VALID_TABLE_OPTION_KEYS {
1059 if let Some(value) = ctx.extension(key) {
1060 table_options.insert(key.to_string(), value.to_string());
1061 }
1062 }
1063
1064 match create_type {
1065 AutoCreateTableType::Logical(physical_table) => {
1066 table_options.insert(
1067 LOGICAL_TABLE_METADATA_KEY.to_string(),
1068 physical_table.clone(),
1069 );
1070 }
1071 AutoCreateTableType::Physical => {
1072 if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1073 table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1074 }
1075 if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1076 table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1077 }
1078 if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1079 table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1080 table_options.insert(
1082 COMPACTION_TYPE.to_string(),
1083 COMPACTION_TYPE_TWCS.to_string(),
1084 );
1085 }
1086 }
1087 AutoCreateTableType::Log => {
1090 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1091 }
1092 AutoCreateTableType::LastNonNull => {
1093 table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1094 }
1095 AutoCreateTableType::Trace => {
1096 table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1097 }
1098 }
1099}
1100
1101pub fn build_create_table_expr(
1102 table: &TableReference,
1103 request_schema: &[ColumnSchema],
1104 engine: &str,
1105) -> Result<CreateTableExpr> {
1106 expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1107}
1108
1109struct CreateAlterTableResult {
1111 instant_table_ids: HashSet<TableId>,
1113 table_infos: HashMap<TableId, Arc<TableInfo>>,
1115}
1116
1117struct FlowMirrorTask {
1118 requests: HashMap<Peer, RegionInsertRequests>,
1119 num_rows: usize,
1120}
1121
1122impl FlowMirrorTask {
1123 async fn new(
1124 cache: &TableFlownodeSetCacheRef,
1125 requests: impl Iterator<Item = &RegionInsertRequest>,
1126 ) -> Result<Self> {
1127 let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1128 HashMap::new();
1129 let mut num_rows = 0;
1130
1131 for req in requests {
1132 let table_id = RegionId::from_u64(req.region_id).table_id();
1133 match src_table_reqs.get_mut(&table_id) {
1134 Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1135 Some(None) => continue,
1137 _ => {
1138 let peers = cache
1140 .get(table_id)
1141 .await
1142 .context(RequestInsertsSnafu)?
1143 .unwrap_or_default()
1144 .values()
1145 .cloned()
1146 .collect::<HashSet<_>>()
1147 .into_iter()
1148 .collect::<Vec<_>>();
1149
1150 if !peers.is_empty() {
1151 let mut reqs = RegionInsertRequests::default();
1152 reqs.requests.push(req.clone());
1153 num_rows += reqs
1154 .requests
1155 .iter()
1156 .map(|r| r.rows.as_ref().unwrap().rows.len())
1157 .sum::<usize>();
1158 src_table_reqs.insert(table_id, Some((peers, reqs)));
1159 } else {
1160 src_table_reqs.insert(table_id, None);
1162 }
1163 }
1164 }
1165 }
1166
1167 let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1168
1169 for (_table_id, (peers, reqs)) in src_table_reqs
1170 .into_iter()
1171 .filter_map(|(k, v)| v.map(|v| (k, v)))
1172 {
1173 if peers.len() == 1 {
1174 inserts
1176 .entry(peers[0].clone())
1177 .or_default()
1178 .requests
1179 .extend(reqs.requests);
1180 continue;
1181 } else {
1182 for flownode in peers {
1184 inserts
1185 .entry(flownode.clone())
1186 .or_default()
1187 .requests
1188 .extend(reqs.requests.clone());
1189 }
1190 }
1191 }
1192
1193 Ok(Self {
1194 requests: inserts,
1195 num_rows,
1196 })
1197 }
1198
1199 fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1200 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1201 for (peer, inserts) in self.requests {
1202 let node_manager = node_manager.clone();
1203 common_runtime::spawn_global(async move {
1204 let result = node_manager
1205 .flownode(&peer)
1206 .await
1207 .handle_inserts(inserts)
1208 .await
1209 .context(RequestInsertsSnafu);
1210
1211 match result {
1212 Ok(resp) => {
1213 let affected_rows = resp.affected_rows;
1214 crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1215 crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1216 }
1217 Err(err) => {
1218 error!(err; "Failed to insert data into flownode {}", peer);
1219 }
1220 }
1221 });
1222 }
1223
1224 Ok(())
1225 }
1226}
1227
1228#[cfg(test)]
1229mod tests {
1230 use std::sync::Arc;
1231
1232 use api::v1::helper::{field_column_schema, time_index_column_schema};
1233 use api::v1::{RowInsertRequest, Rows, Value};
1234 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1235 use common_meta::cache::new_table_flownode_set_cache;
1236 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1237 use common_meta::test_util::MockDatanodeManager;
1238 use datatypes::data_type::ConcreteDataType;
1239 use datatypes::schema::ColumnSchema;
1240 use moka::future::Cache;
1241 use session::context::QueryContext;
1242 use table::TableRef;
1243 use table::dist_table::DummyDataSource;
1244 use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1245
1246 use super::*;
1247 use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1248
1249 fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1250 let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1251 ColumnSchema::new(
1252 ts_name,
1253 ConcreteDataType::timestamp_millisecond_datatype(),
1254 false,
1255 )
1256 .with_time_index(true),
1257 ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1258 ])
1259 .unwrap()
1260 .build()
1261 .unwrap();
1262 let meta = TableMetaBuilder::empty()
1263 .schema(Arc::new(schema))
1264 .primary_key_indices(vec![])
1265 .value_indices(vec![1])
1266 .engine("mito")
1267 .next_column_id(0)
1268 .options(Default::default())
1269 .created_on(Default::default())
1270 .build()
1271 .unwrap();
1272 let info = Arc::new(
1273 TableInfoBuilder::default()
1274 .table_id(1)
1275 .table_version(0)
1276 .name("test_table")
1277 .schema_name(DEFAULT_SCHEMA_NAME)
1278 .catalog_name(DEFAULT_CATALOG_NAME)
1279 .desc(None)
1280 .table_type(TableType::Base)
1281 .meta(meta)
1282 .build()
1283 .unwrap(),
1284 );
1285 Arc::new(table::Table::new(
1286 info,
1287 table::metadata::FilterPushDownType::Unsupported,
1288 Arc::new(DummyDataSource),
1289 ))
1290 }
1291
1292 #[tokio::test]
1293 async fn test_accommodate_existing_schema_logic() {
1294 let ts_name = "my_ts";
1295 let field_name = "my_field";
1296 let table = make_table_ref_with_schema(ts_name, field_name);
1297
1298 let mut req = RowInsertRequest {
1300 table_name: "test_table".to_string(),
1301 rows: Some(Rows {
1302 schema: vec![
1303 time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1304 field_column_schema("field_wrong", ColumnDataType::Float64),
1305 ],
1306 rows: vec![api::v1::Row {
1307 values: vec![Value::default(), Value::default()],
1308 }],
1309 }),
1310 };
1311 let ctx = Arc::new(QueryContext::with(
1312 DEFAULT_CATALOG_NAME,
1313 DEFAULT_SCHEMA_NAME,
1314 ));
1315
1316 let kv_backend = prepare_mocked_backend().await;
1317 let inserter = Inserter::new(
1318 catalog::memory::MemoryCatalogManager::new(),
1319 create_partition_rule_manager(kv_backend.clone()).await,
1320 Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1321 Arc::new(new_table_flownode_set_cache(
1322 String::new(),
1323 Cache::new(100),
1324 kv_backend.clone(),
1325 )),
1326 );
1327 let alter_expr = inserter
1328 .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1329 .unwrap();
1330 assert!(alter_expr.is_none());
1331
1332 let req_schema = req.rows.as_ref().unwrap().schema.clone();
1334 assert_eq!(req_schema[0].column_name, ts_name);
1335 assert_eq!(req_schema[1].column_name, field_name);
1336 }
1337}