operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
32    TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
39use common_query::Output;
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::prelude::*;
48use snafu::ResultExt;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
56use store_api::storage::{RegionId, TableId};
57use table::metadata::TableInfo;
58use table::requests::{
59    InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
60    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
61};
62use table::table_reference::TableReference;
63use table::TableRef;
64
65use crate::error::{
66    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
67    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
68};
69use crate::expr_helper;
70use crate::region_req_factory::RegionRequestFactory;
71use crate::req_convert::common::preprocess_row_insert_requests;
72use crate::req_convert::insert::{
73    fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
74};
75use crate::statement::StatementExecutor;
76
77pub struct Inserter {
78    catalog_manager: CatalogManagerRef,
79    pub(crate) partition_manager: PartitionRuleManagerRef,
80    pub(crate) node_manager: NodeManagerRef,
81    table_flownode_set_cache: TableFlownodeSetCacheRef,
82}
83
84pub type InserterRef = Arc<Inserter>;
85
86/// Hint for the table type to create automatically.
87#[derive(Clone)]
88enum AutoCreateTableType {
89    /// A logical table with the physical table name.
90    Logical(String),
91    /// A physical table.
92    Physical,
93    /// A log table which is append-only.
94    Log,
95    /// A table that merges rows by `last_non_null` strategy.
96    LastNonNull,
97    /// Create table that build index and default partition rules on trace_id
98    Trace,
99}
100
101impl AutoCreateTableType {
102    fn as_str(&self) -> &'static str {
103        match self {
104            AutoCreateTableType::Logical(_) => "logical",
105            AutoCreateTableType::Physical => "physical",
106            AutoCreateTableType::Log => "log",
107            AutoCreateTableType::LastNonNull => "last_non_null",
108            AutoCreateTableType::Trace => "trace",
109        }
110    }
111}
112
113/// Split insert requests into normal and instant requests.
114///
115/// Where instant requests are requests with ttl=instant,
116/// and normal requests are requests with ttl set to other values.
117///
118/// This is used to split requests for different processing.
119#[derive(Clone)]
120pub struct InstantAndNormalInsertRequests {
121    /// Requests with normal ttl.
122    pub normal_requests: RegionInsertRequests,
123    /// Requests with ttl=instant.
124    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
125    pub instant_requests: RegionInsertRequests,
126}
127
128impl Inserter {
129    pub fn new(
130        catalog_manager: CatalogManagerRef,
131        partition_manager: PartitionRuleManagerRef,
132        node_manager: NodeManagerRef,
133        table_flownode_set_cache: TableFlownodeSetCacheRef,
134    ) -> Self {
135        Self {
136            catalog_manager,
137            partition_manager,
138            node_manager,
139            table_flownode_set_cache,
140        }
141    }
142
143    pub async fn handle_column_inserts(
144        &self,
145        requests: InsertRequests,
146        ctx: QueryContextRef,
147        statement_executor: &StatementExecutor,
148    ) -> Result<Output> {
149        let row_inserts = ColumnToRow::convert(requests)?;
150        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
151            .await
152    }
153
154    /// Handles row inserts request and creates a physical table on demand.
155    pub async fn handle_row_inserts(
156        &self,
157        mut requests: RowInsertRequests,
158        ctx: QueryContextRef,
159        statement_executor: &StatementExecutor,
160        accommodate_existing_schema: bool,
161        is_single_value: bool,
162    ) -> Result<Output> {
163        preprocess_row_insert_requests(&mut requests.inserts)?;
164        self.handle_row_inserts_with_create_type(
165            requests,
166            ctx,
167            statement_executor,
168            AutoCreateTableType::Physical,
169            accommodate_existing_schema,
170            is_single_value,
171        )
172        .await
173    }
174
175    /// Handles row inserts request and creates a log table on demand.
176    pub async fn handle_log_inserts(
177        &self,
178        requests: RowInsertRequests,
179        ctx: QueryContextRef,
180        statement_executor: &StatementExecutor,
181    ) -> Result<Output> {
182        self.handle_row_inserts_with_create_type(
183            requests,
184            ctx,
185            statement_executor,
186            AutoCreateTableType::Log,
187            false,
188            false,
189        )
190        .await
191    }
192
193    pub async fn handle_trace_inserts(
194        &self,
195        requests: RowInsertRequests,
196        ctx: QueryContextRef,
197        statement_executor: &StatementExecutor,
198    ) -> Result<Output> {
199        self.handle_row_inserts_with_create_type(
200            requests,
201            ctx,
202            statement_executor,
203            AutoCreateTableType::Trace,
204            false,
205            false,
206        )
207        .await
208    }
209
210    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
211    pub async fn handle_last_non_null_inserts(
212        &self,
213        requests: RowInsertRequests,
214        ctx: QueryContextRef,
215        statement_executor: &StatementExecutor,
216        accommodate_existing_schema: bool,
217        is_single_value: bool,
218    ) -> Result<Output> {
219        self.handle_row_inserts_with_create_type(
220            requests,
221            ctx,
222            statement_executor,
223            AutoCreateTableType::LastNonNull,
224            accommodate_existing_schema,
225            is_single_value,
226        )
227        .await
228    }
229
230    /// Handles row inserts request with specified [AutoCreateTableType].
231    async fn handle_row_inserts_with_create_type(
232        &self,
233        mut requests: RowInsertRequests,
234        ctx: QueryContextRef,
235        statement_executor: &StatementExecutor,
236        create_type: AutoCreateTableType,
237        accommodate_existing_schema: bool,
238        is_single_value: bool,
239    ) -> Result<Output> {
240        // remove empty requests
241        requests.inserts.retain(|req| {
242            req.rows
243                .as_ref()
244                .map(|r| !r.rows.is_empty())
245                .unwrap_or_default()
246        });
247        validate_column_count_match(&requests)?;
248
249        let CreateAlterTableResult {
250            instant_table_ids,
251            table_infos,
252        } = self
253            .create_or_alter_tables_on_demand(
254                &mut requests,
255                &ctx,
256                create_type,
257                statement_executor,
258                accommodate_existing_schema,
259                is_single_value,
260            )
261            .await?;
262
263        let name_to_info = table_infos
264            .values()
265            .map(|info| (info.name.clone(), info.clone()))
266            .collect::<HashMap<_, _>>();
267        let inserts = RowToRegion::new(
268            name_to_info,
269            instant_table_ids,
270            self.partition_manager.as_ref(),
271        )
272        .convert(requests)
273        .await?;
274
275        self.do_request(inserts, &table_infos, &ctx).await
276    }
277
278    /// Handles row inserts request with metric engine.
279    pub async fn handle_metric_row_inserts(
280        &self,
281        mut requests: RowInsertRequests,
282        ctx: QueryContextRef,
283        statement_executor: &StatementExecutor,
284        physical_table: String,
285    ) -> Result<Output> {
286        // remove empty requests
287        requests.inserts.retain(|req| {
288            req.rows
289                .as_ref()
290                .map(|r| !r.rows.is_empty())
291                .unwrap_or_default()
292        });
293        validate_column_count_match(&requests)?;
294
295        // check and create physical table
296        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
297            .await?;
298
299        // check and create logical tables
300        let CreateAlterTableResult {
301            instant_table_ids,
302            table_infos,
303        } = self
304            .create_or_alter_tables_on_demand(
305                &mut requests,
306                &ctx,
307                AutoCreateTableType::Logical(physical_table.to_string()),
308                statement_executor,
309                true,
310                true,
311            )
312            .await?;
313        let name_to_info = table_infos
314            .values()
315            .map(|info| (info.name.clone(), info.clone()))
316            .collect::<HashMap<_, _>>();
317        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
318            .convert(requests)
319            .await?;
320
321        self.do_request(inserts, &table_infos, &ctx).await
322    }
323
324    pub async fn handle_table_insert(
325        &self,
326        request: TableInsertRequest,
327        ctx: QueryContextRef,
328    ) -> Result<Output> {
329        let catalog = request.catalog_name.as_str();
330        let schema = request.schema_name.as_str();
331        let table_name = request.table_name.as_str();
332        let table = self.get_table(catalog, schema, table_name).await?;
333        let table = table.with_context(|| TableNotFoundSnafu {
334            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
335        })?;
336        let table_info = table.table_info();
337
338        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
339            .convert(request)
340            .await?;
341
342        let table_infos =
343            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
344
345        self.do_request(inserts, &table_infos, &ctx).await
346    }
347
348    pub async fn handle_statement_insert(
349        &self,
350        insert: &Insert,
351        ctx: &QueryContextRef,
352    ) -> Result<Output> {
353        let (inserts, table_info) =
354            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
355                .convert(insert, ctx)
356                .await?;
357
358        let table_infos =
359            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
360
361        self.do_request(inserts, &table_infos, ctx).await
362    }
363}
364
365impl Inserter {
366    async fn do_request(
367        &self,
368        requests: InstantAndNormalInsertRequests,
369        table_infos: &HashMap<TableId, Arc<TableInfo>>,
370        ctx: &QueryContextRef,
371    ) -> Result<Output> {
372        // Fill impure default values in the request
373        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
374
375        let write_cost = write_meter!(
376            ctx.current_catalog(),
377            ctx.current_schema(),
378            requests,
379            ctx.channel() as u8
380        );
381        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
382            tracing_context: TracingContext::from_current_span().to_w3c(),
383            dbname: ctx.get_db_string(),
384            ..Default::default()
385        });
386
387        let InstantAndNormalInsertRequests {
388            normal_requests,
389            instant_requests,
390        } = requests;
391
392        // Mirror requests for source table to flownode asynchronously
393        let flow_mirror_task = FlowMirrorTask::new(
394            &self.table_flownode_set_cache,
395            normal_requests
396                .requests
397                .iter()
398                .chain(instant_requests.requests.iter()),
399        )
400        .await?;
401        flow_mirror_task.detach(self.node_manager.clone())?;
402
403        // Write requests to datanode and wait for response
404        let write_tasks = self
405            .group_requests_by_peer(normal_requests)
406            .await?
407            .into_iter()
408            .map(|(peer, inserts)| {
409                let node_manager = self.node_manager.clone();
410                let request = request_factory.build_insert(inserts);
411                common_runtime::spawn_global(async move {
412                    node_manager
413                        .datanode(&peer)
414                        .await
415                        .handle(request)
416                        .await
417                        .context(RequestInsertsSnafu)
418                })
419            });
420        let results = future::try_join_all(write_tasks)
421            .await
422            .context(JoinTaskSnafu)?;
423        let affected_rows = results
424            .into_iter()
425            .map(|resp| resp.map(|r| r.affected_rows))
426            .sum::<Result<AffectedRows>>()?;
427        crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
428        Ok(Output::new(
429            OutputData::AffectedRows(affected_rows),
430            OutputMeta::new_with_cost(write_cost as _),
431        ))
432    }
433
434    async fn group_requests_by_peer(
435        &self,
436        requests: RegionInsertRequests,
437    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
438        // group by region ids first to reduce repeatedly call `find_region_leader`
439        // TODO(discord9): determine if a addition clone is worth it
440        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
441        for req in requests.requests {
442            let region_id = RegionId::from_u64(req.region_id);
443            requests_per_region
444                .entry(region_id)
445                .or_default()
446                .requests
447                .push(req);
448        }
449
450        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
451
452        for (region_id, reqs) in requests_per_region {
453            let peer = self
454                .partition_manager
455                .find_region_leader(region_id)
456                .await
457                .context(FindRegionLeaderSnafu)?;
458            inserts
459                .entry(peer)
460                .or_default()
461                .requests
462                .extend(reqs.requests);
463        }
464
465        Ok(inserts)
466    }
467
468    /// Creates or alter tables on demand:
469    /// - if table does not exist, create table by inferred CreateExpr
470    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
471    ///
472    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
473    /// This mapping is used in the conversion of RowToRegion.
474    ///
475    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
476    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
477    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
478    /// remote write. This will modify the `RowInsertRequests` in place.
479    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
480    async fn create_or_alter_tables_on_demand(
481        &self,
482        requests: &mut RowInsertRequests,
483        ctx: &QueryContextRef,
484        auto_create_table_type: AutoCreateTableType,
485        statement_executor: &StatementExecutor,
486        accommodate_existing_schema: bool,
487        is_single_value: bool,
488    ) -> Result<CreateAlterTableResult> {
489        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
490            .with_label_values(&[auto_create_table_type.as_str()])
491            .start_timer();
492
493        let catalog = ctx.current_catalog();
494        let schema = ctx.current_schema();
495
496        let mut table_infos = HashMap::new();
497        // If `auto_create_table` hint is disabled, skip creating/altering tables.
498        let auto_create_table_hint = ctx
499            .extension(AUTO_CREATE_TABLE_KEY)
500            .map(|v| v.parse::<bool>())
501            .transpose()
502            .map_err(|_| {
503                InvalidInsertRequestSnafu {
504                    reason: "`auto_create_table` hint must be a boolean",
505                }
506                .build()
507            })?
508            .unwrap_or(true);
509        if !auto_create_table_hint {
510            let mut instant_table_ids = HashSet::new();
511            for req in &requests.inserts {
512                let table = self
513                    .get_table(catalog, &schema, &req.table_name)
514                    .await?
515                    .context(InvalidInsertRequestSnafu {
516                        reason: format!(
517                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
518                            req.table_name
519                        ),
520                    })?;
521                let table_info = table.table_info();
522                if table_info.is_ttl_instant_table() {
523                    instant_table_ids.insert(table_info.table_id());
524                }
525                table_infos.insert(table_info.table_id(), table.table_info());
526            }
527            let ret = CreateAlterTableResult {
528                instant_table_ids,
529                table_infos,
530            };
531            return Ok(ret);
532        }
533
534        let mut create_tables = vec![];
535        let mut alter_tables = vec![];
536        let mut instant_table_ids = HashSet::new();
537
538        for req in &mut requests.inserts {
539            match self.get_table(catalog, &schema, &req.table_name).await? {
540                Some(table) => {
541                    let table_info = table.table_info();
542                    if table_info.is_ttl_instant_table() {
543                        instant_table_ids.insert(table_info.table_id());
544                    }
545                    table_infos.insert(table_info.table_id(), table.table_info());
546                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
547                        req,
548                        &table,
549                        ctx,
550                        accommodate_existing_schema,
551                        is_single_value,
552                    )? {
553                        alter_tables.push(alter_expr);
554                    }
555                }
556                None => {
557                    let create_expr =
558                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
559                    create_tables.push(create_expr);
560                }
561            }
562        }
563
564        match auto_create_table_type {
565            AutoCreateTableType::Logical(_) => {
566                if !create_tables.is_empty() {
567                    // Creates logical tables in batch.
568                    let tables = self
569                        .create_logical_tables(create_tables, ctx, statement_executor)
570                        .await?;
571
572                    for table in tables {
573                        let table_info = table.table_info();
574                        if table_info.is_ttl_instant_table() {
575                            instant_table_ids.insert(table_info.table_id());
576                        }
577                        table_infos.insert(table_info.table_id(), table.table_info());
578                    }
579                }
580                if !alter_tables.is_empty() {
581                    // Alter logical tables in batch.
582                    statement_executor
583                        .alter_logical_tables(alter_tables, ctx.clone())
584                        .await?;
585                }
586            }
587            AutoCreateTableType::Physical
588            | AutoCreateTableType::Log
589            | AutoCreateTableType::LastNonNull => {
590                // note that auto create table shouldn't be ttl instant table
591                // for it's a very unexpected behavior and should be set by user explicitly
592                for create_table in create_tables {
593                    let table = self
594                        .create_physical_table(create_table, None, ctx, statement_executor)
595                        .await?;
596                    let table_info = table.table_info();
597                    if table_info.is_ttl_instant_table() {
598                        instant_table_ids.insert(table_info.table_id());
599                    }
600                    table_infos.insert(table_info.table_id(), table.table_info());
601                }
602                for alter_expr in alter_tables.into_iter() {
603                    statement_executor
604                        .alter_table_inner(alter_expr, ctx.clone())
605                        .await?;
606                }
607            }
608
609            AutoCreateTableType::Trace => {
610                let trace_table_name = ctx
611                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
612                    .unwrap_or(TRACE_TABLE_NAME);
613
614                // note that auto create table shouldn't be ttl instant table
615                // for it's a very unexpected behavior and should be set by user explicitly
616                for mut create_table in create_tables {
617                    if create_table.table_name == trace_services_table_name(trace_table_name) {
618                        // Disable append mode for trace services table since it requires upsert behavior.
619                        create_table
620                            .table_options
621                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
622                        let table = self
623                            .create_physical_table(create_table, None, ctx, statement_executor)
624                            .await?;
625                        let table_info = table.table_info();
626                        if table_info.is_ttl_instant_table() {
627                            instant_table_ids.insert(table_info.table_id());
628                        }
629                        table_infos.insert(table_info.table_id(), table.table_info());
630                    } else {
631                        // prebuilt partition rules for uuid data: see the function
632                        // for more information
633                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
634                            .context(CreatePartitionRulesSnafu)?;
635                        // add skip index to
636                        // - trace_id: when searching by trace id
637                        // - parent_span_id: when searching root span
638                        // - span_name: when searching certain types of span
639                        let index_columns =
640                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
641                        for index_column in index_columns {
642                            if let Some(col) = create_table
643                                .column_defs
644                                .iter_mut()
645                                .find(|c| c.name == index_column)
646                            {
647                                col.options =
648                                    options_from_skipping(&SkippingIndexOptions::default())
649                                        .context(ColumnOptionsSnafu)?;
650                            } else {
651                                warn!(
652                                    "Column {} not found when creating index for trace table: {}.",
653                                    index_column, create_table.table_name
654                                );
655                            }
656                        }
657
658                        // use table_options to mark table model version
659                        create_table.table_options.insert(
660                            TABLE_DATA_MODEL.to_string(),
661                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
662                        );
663
664                        let table = self
665                            .create_physical_table(
666                                create_table,
667                                Some(partitions),
668                                ctx,
669                                statement_executor,
670                            )
671                            .await?;
672                        let table_info = table.table_info();
673                        if table_info.is_ttl_instant_table() {
674                            instant_table_ids.insert(table_info.table_id());
675                        }
676                        table_infos.insert(table_info.table_id(), table.table_info());
677                    }
678                }
679                for alter_expr in alter_tables.into_iter() {
680                    statement_executor
681                        .alter_table_inner(alter_expr, ctx.clone())
682                        .await?;
683                }
684            }
685        }
686
687        Ok(CreateAlterTableResult {
688            instant_table_ids,
689            table_infos,
690        })
691    }
692
693    async fn create_physical_table_on_demand(
694        &self,
695        ctx: &QueryContextRef,
696        physical_table: String,
697        statement_executor: &StatementExecutor,
698    ) -> Result<()> {
699        let catalog_name = ctx.current_catalog();
700        let schema_name = ctx.current_schema();
701
702        // check if exist
703        if self
704            .get_table(catalog_name, &schema_name, &physical_table)
705            .await?
706            .is_some()
707        {
708            return Ok(());
709        }
710
711        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
712        info!("Physical metric table `{table_reference}` does not exist, try creating table");
713
714        // schema with timestamp and field column
715        let default_schema = vec![
716            ColumnSchema {
717                column_name: GREPTIME_TIMESTAMP.to_string(),
718                datatype: ColumnDataType::TimestampMillisecond as _,
719                semantic_type: SemanticType::Timestamp as _,
720                datatype_extension: None,
721                options: None,
722            },
723            ColumnSchema {
724                column_name: GREPTIME_VALUE.to_string(),
725                datatype: ColumnDataType::Float64 as _,
726                semantic_type: SemanticType::Field as _,
727                datatype_extension: None,
728                options: None,
729            },
730        ];
731        let create_table_expr =
732            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
733
734        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
735        create_table_expr
736            .table_options
737            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
738
739        // create physical table
740        let res = statement_executor
741            .create_table_inner(create_table_expr, None, ctx.clone())
742            .await;
743
744        match res {
745            Ok(_) => {
746                info!("Successfully created table {table_reference}",);
747                Ok(())
748            }
749            Err(err) => {
750                error!(err; "Failed to create table {table_reference}");
751                Err(err)
752            }
753        }
754    }
755
756    async fn get_table(
757        &self,
758        catalog: &str,
759        schema: &str,
760        table: &str,
761    ) -> Result<Option<TableRef>> {
762        self.catalog_manager
763            .table(catalog, schema, table, None)
764            .await
765            .context(CatalogSnafu)
766    }
767
768    fn get_create_table_expr_on_demand(
769        &self,
770        req: &RowInsertRequest,
771        create_type: &AutoCreateTableType,
772        ctx: &QueryContextRef,
773    ) -> Result<CreateTableExpr> {
774        let mut table_options = Vec::with_capacity(4);
775        for key in VALID_TABLE_OPTION_KEYS {
776            if let Some(value) = ctx.extension(key) {
777                table_options.push((key, value));
778            }
779        }
780
781        let mut engine_name = default_engine();
782        match create_type {
783            AutoCreateTableType::Logical(physical_table) => {
784                engine_name = METRIC_ENGINE_NAME;
785                table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
786            }
787            AutoCreateTableType::Physical => {
788                if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
789                    table_options.push((APPEND_MODE_KEY, append_mode));
790                }
791                if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
792                    table_options.push((MERGE_MODE_KEY, merge_mode));
793                }
794            }
795            // Set append_mode to true for log table.
796            // because log tables should keep rows with the same ts and tags.
797            AutoCreateTableType::Log => {
798                table_options.push((APPEND_MODE_KEY, "true"));
799            }
800            AutoCreateTableType::LastNonNull => {
801                table_options.push((MERGE_MODE_KEY, "last_non_null"));
802            }
803            AutoCreateTableType::Trace => {
804                table_options.push((APPEND_MODE_KEY, "true"));
805            }
806        }
807
808        let schema = ctx.current_schema();
809        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
810        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
811        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
812        let mut create_table_expr =
813            build_create_table_expr(&table_ref, request_schema, engine_name)?;
814
815        info!("Table `{table_ref}` does not exist, try creating table");
816        for (k, v) in table_options {
817            create_table_expr
818                .table_options
819                .insert(k.to_string(), v.to_string());
820        }
821
822        Ok(create_table_expr)
823    }
824
825    /// Returns an alter table expression if it finds new columns in the request.
826    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
827    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
828    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
829    /// for more details.
830    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
831    /// input `req`.
832    fn get_alter_table_expr_on_demand(
833        &self,
834        req: &mut RowInsertRequest,
835        table: &TableRef,
836        ctx: &QueryContextRef,
837        accommodate_existing_schema: bool,
838        is_single_value: bool,
839    ) -> Result<Option<AlterTableExpr>> {
840        let catalog_name = ctx.current_catalog();
841        let schema_name = ctx.current_schema();
842        let table_name = table.table_info().name.clone();
843
844        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
845        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
846        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
847        let Some(mut add_columns) = add_columns else {
848            return Ok(None);
849        };
850
851        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
852        if accommodate_existing_schema {
853            let table_schema = table.schema();
854            // Find timestamp column name
855            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
856            // Find field column name if there is only one and `is_single_value` is true.
857            let mut field_col_name = None;
858            if is_single_value {
859                let mut multiple_field_cols = false;
860                table.field_columns().for_each(|col| {
861                    if field_col_name.is_none() {
862                        field_col_name = Some(col.name.clone());
863                    } else {
864                        multiple_field_cols = true;
865                    }
866                });
867                if multiple_field_cols {
868                    field_col_name = None;
869                }
870            }
871
872            // Update column name in request schema for Timestamp/Field columns
873            if let Some(rows) = req.rows.as_mut() {
874                for col in &mut rows.schema {
875                    match col.semantic_type {
876                        x if x == SemanticType::Timestamp as i32 => {
877                            if let Some(ref ts_name) = ts_col_name {
878                                if col.column_name != *ts_name {
879                                    col.column_name = ts_name.clone();
880                                }
881                            }
882                        }
883                        x if x == SemanticType::Field as i32 => {
884                            if let Some(ref field_name) = field_col_name {
885                                if col.column_name != *field_name {
886                                    col.column_name = field_name.clone();
887                                }
888                            }
889                        }
890                        _ => {}
891                    }
892                }
893            }
894
895            // Only keep columns that are tags or non-single field.
896            add_columns.add_columns.retain(|col| {
897                let def = col.column_def.as_ref().unwrap();
898                def.semantic_type == SemanticType::Tag as i32
899                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
900            });
901
902            if add_columns.add_columns.is_empty() {
903                return Ok(None);
904            }
905        }
906
907        Ok(Some(AlterTableExpr {
908            catalog_name: catalog_name.to_string(),
909            schema_name: schema_name.to_string(),
910            table_name: table_name.to_string(),
911            kind: Some(Kind::AddColumns(add_columns)),
912        }))
913    }
914
915    /// Creates a table with options.
916    async fn create_physical_table(
917        &self,
918        mut create_table_expr: CreateTableExpr,
919        partitions: Option<Partitions>,
920        ctx: &QueryContextRef,
921        statement_executor: &StatementExecutor,
922    ) -> Result<TableRef> {
923        {
924            let table_ref = TableReference::full(
925                &create_table_expr.catalog_name,
926                &create_table_expr.schema_name,
927                &create_table_expr.table_name,
928            );
929
930            info!("Table `{table_ref}` does not exist, try creating table");
931        }
932        let res = statement_executor
933            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
934            .await;
935
936        let table_ref = TableReference::full(
937            &create_table_expr.catalog_name,
938            &create_table_expr.schema_name,
939            &create_table_expr.table_name,
940        );
941
942        match res {
943            Ok(table) => {
944                info!(
945                    "Successfully created table {} with options: {:?}",
946                    table_ref, create_table_expr.table_options,
947                );
948                Ok(table)
949            }
950            Err(err) => {
951                error!(err; "Failed to create table {}", table_ref);
952                Err(err)
953            }
954        }
955    }
956
957    async fn create_logical_tables(
958        &self,
959        create_table_exprs: Vec<CreateTableExpr>,
960        ctx: &QueryContextRef,
961        statement_executor: &StatementExecutor,
962    ) -> Result<Vec<TableRef>> {
963        let res = statement_executor
964            .create_logical_tables(&create_table_exprs, ctx.clone())
965            .await;
966
967        match res {
968            Ok(res) => {
969                info!("Successfully created logical tables");
970                Ok(res)
971            }
972            Err(err) => {
973                let failed_tables = create_table_exprs
974                    .into_iter()
975                    .map(|expr| {
976                        format!(
977                            "{}.{}.{}",
978                            expr.catalog_name, expr.schema_name, expr.table_name
979                        )
980                    })
981                    .collect::<Vec<_>>();
982                error!(
983                    err;
984                    "Failed to create logical tables {:?}",
985                    failed_tables
986                );
987                Err(err)
988            }
989        }
990    }
991}
992
993fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
994    for request in &requests.inserts {
995        let rows = request.rows.as_ref().unwrap();
996        let column_count = rows.schema.len();
997        rows.rows.iter().try_for_each(|r| {
998            ensure!(
999                r.values.len() == column_count,
1000                InvalidInsertRequestSnafu {
1001                    reason: format!(
1002                        "column count mismatch, columns: {}, values: {}",
1003                        column_count,
1004                        r.values.len()
1005                    )
1006                }
1007            );
1008            Ok(())
1009        })?;
1010    }
1011    Ok(())
1012}
1013
1014fn build_create_table_expr(
1015    table: &TableReference,
1016    request_schema: &[ColumnSchema],
1017    engine: &str,
1018) -> Result<CreateTableExpr> {
1019    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1020}
1021
1022/// Result of `create_or_alter_tables_on_demand`.
1023struct CreateAlterTableResult {
1024    /// table ids of ttl=instant tables.
1025    instant_table_ids: HashSet<TableId>,
1026    /// Table Info of the created tables.
1027    table_infos: HashMap<TableId, Arc<TableInfo>>,
1028}
1029
1030struct FlowMirrorTask {
1031    requests: HashMap<Peer, RegionInsertRequests>,
1032    num_rows: usize,
1033}
1034
1035impl FlowMirrorTask {
1036    async fn new(
1037        cache: &TableFlownodeSetCacheRef,
1038        requests: impl Iterator<Item = &RegionInsertRequest>,
1039    ) -> Result<Self> {
1040        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1041            HashMap::new();
1042        let mut num_rows = 0;
1043
1044        for req in requests {
1045            let table_id = RegionId::from_u64(req.region_id).table_id();
1046            match src_table_reqs.get_mut(&table_id) {
1047                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1048                // already know this is not source table
1049                Some(None) => continue,
1050                _ => {
1051                    // dedup peers
1052                    let peers = cache
1053                        .get(table_id)
1054                        .await
1055                        .context(RequestInsertsSnafu)?
1056                        .unwrap_or_default()
1057                        .values()
1058                        .cloned()
1059                        .collect::<HashSet<_>>()
1060                        .into_iter()
1061                        .collect::<Vec<_>>();
1062
1063                    if !peers.is_empty() {
1064                        let mut reqs = RegionInsertRequests::default();
1065                        reqs.requests.push(req.clone());
1066                        num_rows += reqs
1067                            .requests
1068                            .iter()
1069                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1070                            .sum::<usize>();
1071                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1072                    } else {
1073                        // insert a empty entry to avoid repeat query
1074                        src_table_reqs.insert(table_id, None);
1075                    }
1076                }
1077            }
1078        }
1079
1080        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1081
1082        for (_table_id, (peers, reqs)) in src_table_reqs
1083            .into_iter()
1084            .filter_map(|(k, v)| v.map(|v| (k, v)))
1085        {
1086            if peers.len() == 1 {
1087                // fast path, zero copy
1088                inserts
1089                    .entry(peers[0].clone())
1090                    .or_default()
1091                    .requests
1092                    .extend(reqs.requests);
1093                continue;
1094            } else {
1095                // TODO(discord9): need to split requests to multiple flownodes
1096                for flownode in peers {
1097                    inserts
1098                        .entry(flownode.clone())
1099                        .or_default()
1100                        .requests
1101                        .extend(reqs.requests.clone());
1102                }
1103            }
1104        }
1105
1106        Ok(Self {
1107            requests: inserts,
1108            num_rows,
1109        })
1110    }
1111
1112    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1113        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1114        for (peer, inserts) in self.requests {
1115            let node_manager = node_manager.clone();
1116            common_runtime::spawn_global(async move {
1117                let result = node_manager
1118                    .flownode(&peer)
1119                    .await
1120                    .handle_inserts(inserts)
1121                    .await
1122                    .context(RequestInsertsSnafu);
1123
1124                match result {
1125                    Ok(resp) => {
1126                        let affected_rows = resp.affected_rows;
1127                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1128                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1129                    }
1130                    Err(err) => {
1131                        error!(err; "Failed to insert data into flownode {}", peer);
1132                    }
1133                }
1134            });
1135        }
1136
1137        Ok(())
1138    }
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143    use std::sync::Arc;
1144
1145    use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
1146    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1147    use common_meta::cache::new_table_flownode_set_cache;
1148    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1149    use common_meta::test_util::MockDatanodeManager;
1150    use datatypes::data_type::ConcreteDataType;
1151    use datatypes::schema::ColumnSchema;
1152    use moka::future::Cache;
1153    use session::context::QueryContext;
1154    use table::dist_table::DummyDataSource;
1155    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1156    use table::TableRef;
1157
1158    use super::*;
1159    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1160
1161    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1162        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1163            ColumnSchema::new(
1164                ts_name,
1165                ConcreteDataType::timestamp_millisecond_datatype(),
1166                false,
1167            )
1168            .with_time_index(true),
1169            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1170        ])
1171        .unwrap()
1172        .build()
1173        .unwrap();
1174        let meta = TableMetaBuilder::empty()
1175            .schema(Arc::new(schema))
1176            .primary_key_indices(vec![])
1177            .value_indices(vec![1])
1178            .engine("mito")
1179            .next_column_id(0)
1180            .options(Default::default())
1181            .created_on(Default::default())
1182            .region_numbers(vec![0])
1183            .build()
1184            .unwrap();
1185        let info = Arc::new(
1186            TableInfoBuilder::default()
1187                .table_id(1)
1188                .table_version(0)
1189                .name("test_table")
1190                .schema_name(DEFAULT_SCHEMA_NAME)
1191                .catalog_name(DEFAULT_CATALOG_NAME)
1192                .desc(None)
1193                .table_type(TableType::Base)
1194                .meta(meta)
1195                .build()
1196                .unwrap(),
1197        );
1198        Arc::new(table::Table::new(
1199            info,
1200            table::metadata::FilterPushDownType::Unsupported,
1201            Arc::new(DummyDataSource),
1202        ))
1203    }
1204
1205    #[tokio::test]
1206    async fn test_accommodate_existing_schema_logic() {
1207        let ts_name = "my_ts";
1208        let field_name = "my_field";
1209        let table = make_table_ref_with_schema(ts_name, field_name);
1210
1211        // The request uses different names for timestamp and field columns
1212        let mut req = RowInsertRequest {
1213            table_name: "test_table".to_string(),
1214            rows: Some(Rows {
1215                schema: vec![
1216                    GrpcColumnSchema {
1217                        column_name: "ts_wrong".to_string(),
1218                        datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
1219                        semantic_type: SemanticType::Timestamp as i32,
1220                        ..Default::default()
1221                    },
1222                    GrpcColumnSchema {
1223                        column_name: "field_wrong".to_string(),
1224                        datatype: api::v1::ColumnDataType::Float64 as i32,
1225                        semantic_type: SemanticType::Field as i32,
1226                        ..Default::default()
1227                    },
1228                ],
1229                rows: vec![api::v1::Row {
1230                    values: vec![Value::default(), Value::default()],
1231                }],
1232            }),
1233        };
1234        let ctx = Arc::new(QueryContext::with(
1235            DEFAULT_CATALOG_NAME,
1236            DEFAULT_SCHEMA_NAME,
1237        ));
1238
1239        let kv_backend = prepare_mocked_backend().await;
1240        let inserter = Inserter::new(
1241            catalog::memory::MemoryCatalogManager::new(),
1242            create_partition_rule_manager(kv_backend.clone()).await,
1243            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1244            Arc::new(new_table_flownode_set_cache(
1245                String::new(),
1246                Cache::new(100),
1247                kv_backend.clone(),
1248            )),
1249        );
1250        let alter_expr = inserter
1251            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1252            .unwrap();
1253        assert!(alter_expr.is_none());
1254
1255        // The request's schema should have updated names for timestamp and field columns
1256        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1257        assert_eq!(req_schema[0].column_name, ts_name);
1258        assert_eq!(req_schema[1].column_name, field_name);
1259    }
1260}