operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
32    TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
39use common_query::Output;
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::prelude::*;
48use snafu::ResultExt;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
56use store_api::storage::{RegionId, TableId};
57use table::metadata::TableInfo;
58use table::requests::{
59    InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
60    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
61};
62use table::table_reference::TableReference;
63use table::TableRef;
64
65use crate::error::{
66    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
67    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
68};
69use crate::expr_helper;
70use crate::region_req_factory::RegionRequestFactory;
71use crate::req_convert::common::preprocess_row_insert_requests;
72use crate::req_convert::insert::{
73    fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
74};
75use crate::statement::StatementExecutor;
76
77pub struct Inserter {
78    catalog_manager: CatalogManagerRef,
79    pub(crate) partition_manager: PartitionRuleManagerRef,
80    pub(crate) node_manager: NodeManagerRef,
81    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
82}
83
84pub type InserterRef = Arc<Inserter>;
85
86/// Hint for the table type to create automatically.
87#[derive(Clone)]
88pub enum AutoCreateTableType {
89    /// A logical table with the physical table name.
90    Logical(String),
91    /// A physical table.
92    Physical,
93    /// A log table which is append-only.
94    Log,
95    /// A table that merges rows by `last_non_null` strategy.
96    LastNonNull,
97    /// Create table that build index and default partition rules on trace_id
98    Trace,
99}
100
101impl AutoCreateTableType {
102    fn as_str(&self) -> &'static str {
103        match self {
104            AutoCreateTableType::Logical(_) => "logical",
105            AutoCreateTableType::Physical => "physical",
106            AutoCreateTableType::Log => "log",
107            AutoCreateTableType::LastNonNull => "last_non_null",
108            AutoCreateTableType::Trace => "trace",
109        }
110    }
111}
112
113/// Split insert requests into normal and instant requests.
114///
115/// Where instant requests are requests with ttl=instant,
116/// and normal requests are requests with ttl set to other values.
117///
118/// This is used to split requests for different processing.
119#[derive(Clone)]
120pub struct InstantAndNormalInsertRequests {
121    /// Requests with normal ttl.
122    pub normal_requests: RegionInsertRequests,
123    /// Requests with ttl=instant.
124    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
125    pub instant_requests: RegionInsertRequests,
126}
127
128impl Inserter {
129    pub fn new(
130        catalog_manager: CatalogManagerRef,
131        partition_manager: PartitionRuleManagerRef,
132        node_manager: NodeManagerRef,
133        table_flownode_set_cache: TableFlownodeSetCacheRef,
134    ) -> Self {
135        Self {
136            catalog_manager,
137            partition_manager,
138            node_manager,
139            table_flownode_set_cache,
140        }
141    }
142
143    pub async fn handle_column_inserts(
144        &self,
145        requests: InsertRequests,
146        ctx: QueryContextRef,
147        statement_executor: &StatementExecutor,
148    ) -> Result<Output> {
149        let row_inserts = ColumnToRow::convert(requests)?;
150        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
151            .await
152    }
153
154    /// Handles row inserts request and creates a physical table on demand.
155    pub async fn handle_row_inserts(
156        &self,
157        mut requests: RowInsertRequests,
158        ctx: QueryContextRef,
159        statement_executor: &StatementExecutor,
160        accommodate_existing_schema: bool,
161        is_single_value: bool,
162    ) -> Result<Output> {
163        preprocess_row_insert_requests(&mut requests.inserts)?;
164        self.handle_row_inserts_with_create_type(
165            requests,
166            ctx,
167            statement_executor,
168            AutoCreateTableType::Physical,
169            accommodate_existing_schema,
170            is_single_value,
171        )
172        .await
173    }
174
175    /// Handles row inserts request and creates a log table on demand.
176    pub async fn handle_log_inserts(
177        &self,
178        requests: RowInsertRequests,
179        ctx: QueryContextRef,
180        statement_executor: &StatementExecutor,
181    ) -> Result<Output> {
182        self.handle_row_inserts_with_create_type(
183            requests,
184            ctx,
185            statement_executor,
186            AutoCreateTableType::Log,
187            false,
188            false,
189        )
190        .await
191    }
192
193    pub async fn handle_trace_inserts(
194        &self,
195        requests: RowInsertRequests,
196        ctx: QueryContextRef,
197        statement_executor: &StatementExecutor,
198    ) -> Result<Output> {
199        self.handle_row_inserts_with_create_type(
200            requests,
201            ctx,
202            statement_executor,
203            AutoCreateTableType::Trace,
204            false,
205            false,
206        )
207        .await
208    }
209
210    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
211    pub async fn handle_last_non_null_inserts(
212        &self,
213        requests: RowInsertRequests,
214        ctx: QueryContextRef,
215        statement_executor: &StatementExecutor,
216        accommodate_existing_schema: bool,
217        is_single_value: bool,
218    ) -> Result<Output> {
219        self.handle_row_inserts_with_create_type(
220            requests,
221            ctx,
222            statement_executor,
223            AutoCreateTableType::LastNonNull,
224            accommodate_existing_schema,
225            is_single_value,
226        )
227        .await
228    }
229
230    /// Handles row inserts request with specified [AutoCreateTableType].
231    async fn handle_row_inserts_with_create_type(
232        &self,
233        mut requests: RowInsertRequests,
234        ctx: QueryContextRef,
235        statement_executor: &StatementExecutor,
236        create_type: AutoCreateTableType,
237        accommodate_existing_schema: bool,
238        is_single_value: bool,
239    ) -> Result<Output> {
240        // remove empty requests
241        requests.inserts.retain(|req| {
242            req.rows
243                .as_ref()
244                .map(|r| !r.rows.is_empty())
245                .unwrap_or_default()
246        });
247        validate_column_count_match(&requests)?;
248
249        let CreateAlterTableResult {
250            instant_table_ids,
251            table_infos,
252        } = self
253            .create_or_alter_tables_on_demand(
254                &mut requests,
255                &ctx,
256                create_type,
257                statement_executor,
258                accommodate_existing_schema,
259                is_single_value,
260            )
261            .await?;
262
263        let name_to_info = table_infos
264            .values()
265            .map(|info| (info.name.clone(), info.clone()))
266            .collect::<HashMap<_, _>>();
267        let inserts = RowToRegion::new(
268            name_to_info,
269            instant_table_ids,
270            self.partition_manager.as_ref(),
271        )
272        .convert(requests)
273        .await?;
274
275        self.do_request(inserts, &table_infos, &ctx).await
276    }
277
278    /// Handles row inserts request with metric engine.
279    pub async fn handle_metric_row_inserts(
280        &self,
281        mut requests: RowInsertRequests,
282        ctx: QueryContextRef,
283        statement_executor: &StatementExecutor,
284        physical_table: String,
285    ) -> Result<Output> {
286        // remove empty requests
287        requests.inserts.retain(|req| {
288            req.rows
289                .as_ref()
290                .map(|r| !r.rows.is_empty())
291                .unwrap_or_default()
292        });
293        validate_column_count_match(&requests)?;
294
295        // check and create physical table
296        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
297            .await?;
298
299        // check and create logical tables
300        let CreateAlterTableResult {
301            instant_table_ids,
302            table_infos,
303        } = self
304            .create_or_alter_tables_on_demand(
305                &mut requests,
306                &ctx,
307                AutoCreateTableType::Logical(physical_table.to_string()),
308                statement_executor,
309                true,
310                true,
311            )
312            .await?;
313        let name_to_info = table_infos
314            .values()
315            .map(|info| (info.name.clone(), info.clone()))
316            .collect::<HashMap<_, _>>();
317        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
318            .convert(requests)
319            .await?;
320
321        self.do_request(inserts, &table_infos, &ctx).await
322    }
323
324    pub async fn handle_table_insert(
325        &self,
326        request: TableInsertRequest,
327        ctx: QueryContextRef,
328    ) -> Result<Output> {
329        let catalog = request.catalog_name.as_str();
330        let schema = request.schema_name.as_str();
331        let table_name = request.table_name.as_str();
332        let table = self.get_table(catalog, schema, table_name).await?;
333        let table = table.with_context(|| TableNotFoundSnafu {
334            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
335        })?;
336        let table_info = table.table_info();
337
338        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
339            .convert(request)
340            .await?;
341
342        let table_infos =
343            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
344
345        self.do_request(inserts, &table_infos, &ctx).await
346    }
347
348    pub async fn handle_statement_insert(
349        &self,
350        insert: &Insert,
351        ctx: &QueryContextRef,
352    ) -> Result<Output> {
353        let (inserts, table_info) =
354            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
355                .convert(insert, ctx)
356                .await?;
357
358        let table_infos =
359            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
360
361        self.do_request(inserts, &table_infos, ctx).await
362    }
363}
364
365impl Inserter {
366    async fn do_request(
367        &self,
368        requests: InstantAndNormalInsertRequests,
369        table_infos: &HashMap<TableId, Arc<TableInfo>>,
370        ctx: &QueryContextRef,
371    ) -> Result<Output> {
372        // Fill impure default values in the request
373        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
374
375        let write_cost = write_meter!(
376            ctx.current_catalog(),
377            ctx.current_schema(),
378            requests,
379            ctx.channel() as u8
380        );
381        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
382            tracing_context: TracingContext::from_current_span().to_w3c(),
383            dbname: ctx.get_db_string(),
384            ..Default::default()
385        });
386
387        let InstantAndNormalInsertRequests {
388            normal_requests,
389            instant_requests,
390        } = requests;
391
392        // Mirror requests for source table to flownode asynchronously
393        let flow_mirror_task = FlowMirrorTask::new(
394            &self.table_flownode_set_cache,
395            normal_requests
396                .requests
397                .iter()
398                .chain(instant_requests.requests.iter()),
399        )
400        .await?;
401        flow_mirror_task.detach(self.node_manager.clone())?;
402
403        // Write requests to datanode and wait for response
404        let write_tasks = self
405            .group_requests_by_peer(normal_requests)
406            .await?
407            .into_iter()
408            .map(|(peer, inserts)| {
409                let node_manager = self.node_manager.clone();
410                let request = request_factory.build_insert(inserts);
411                common_runtime::spawn_global(async move {
412                    node_manager
413                        .datanode(&peer)
414                        .await
415                        .handle(request)
416                        .await
417                        .context(RequestInsertsSnafu)
418                })
419            });
420        let results = future::try_join_all(write_tasks)
421            .await
422            .context(JoinTaskSnafu)?;
423        let affected_rows = results
424            .into_iter()
425            .map(|resp| resp.map(|r| r.affected_rows))
426            .sum::<Result<AffectedRows>>()?;
427        crate::metrics::DIST_INGEST_ROW_COUNT
428            .with_label_values(&[ctx.get_db_string().as_str()])
429            .inc_by(affected_rows as u64);
430        Ok(Output::new(
431            OutputData::AffectedRows(affected_rows),
432            OutputMeta::new_with_cost(write_cost as _),
433        ))
434    }
435
436    async fn group_requests_by_peer(
437        &self,
438        requests: RegionInsertRequests,
439    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
440        // group by region ids first to reduce repeatedly call `find_region_leader`
441        // TODO(discord9): determine if a addition clone is worth it
442        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
443        for req in requests.requests {
444            let region_id = RegionId::from_u64(req.region_id);
445            requests_per_region
446                .entry(region_id)
447                .or_default()
448                .requests
449                .push(req);
450        }
451
452        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
453
454        for (region_id, reqs) in requests_per_region {
455            let peer = self
456                .partition_manager
457                .find_region_leader(region_id)
458                .await
459                .context(FindRegionLeaderSnafu)?;
460            inserts
461                .entry(peer)
462                .or_default()
463                .requests
464                .extend(reqs.requests);
465        }
466
467        Ok(inserts)
468    }
469
470    /// Creates or alter tables on demand:
471    /// - if table does not exist, create table by inferred CreateExpr
472    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
473    ///
474    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
475    /// This mapping is used in the conversion of RowToRegion.
476    ///
477    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
478    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
479    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
480    /// remote write. This will modify the `RowInsertRequests` in place.
481    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
482    async fn create_or_alter_tables_on_demand(
483        &self,
484        requests: &mut RowInsertRequests,
485        ctx: &QueryContextRef,
486        auto_create_table_type: AutoCreateTableType,
487        statement_executor: &StatementExecutor,
488        accommodate_existing_schema: bool,
489        is_single_value: bool,
490    ) -> Result<CreateAlterTableResult> {
491        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
492            .with_label_values(&[auto_create_table_type.as_str()])
493            .start_timer();
494
495        let catalog = ctx.current_catalog();
496        let schema = ctx.current_schema();
497
498        let mut table_infos = HashMap::new();
499        // If `auto_create_table` hint is disabled, skip creating/altering tables.
500        let auto_create_table_hint = ctx
501            .extension(AUTO_CREATE_TABLE_KEY)
502            .map(|v| v.parse::<bool>())
503            .transpose()
504            .map_err(|_| {
505                InvalidInsertRequestSnafu {
506                    reason: "`auto_create_table` hint must be a boolean",
507                }
508                .build()
509            })?
510            .unwrap_or(true);
511        if !auto_create_table_hint {
512            let mut instant_table_ids = HashSet::new();
513            for req in &requests.inserts {
514                let table = self
515                    .get_table(catalog, &schema, &req.table_name)
516                    .await?
517                    .context(InvalidInsertRequestSnafu {
518                        reason: format!(
519                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
520                            req.table_name
521                        ),
522                    })?;
523                let table_info = table.table_info();
524                if table_info.is_ttl_instant_table() {
525                    instant_table_ids.insert(table_info.table_id());
526                }
527                table_infos.insert(table_info.table_id(), table.table_info());
528            }
529            let ret = CreateAlterTableResult {
530                instant_table_ids,
531                table_infos,
532            };
533            return Ok(ret);
534        }
535
536        let mut create_tables = vec![];
537        let mut alter_tables = vec![];
538        let mut instant_table_ids = HashSet::new();
539
540        for req in &mut requests.inserts {
541            match self.get_table(catalog, &schema, &req.table_name).await? {
542                Some(table) => {
543                    let table_info = table.table_info();
544                    if table_info.is_ttl_instant_table() {
545                        instant_table_ids.insert(table_info.table_id());
546                    }
547                    table_infos.insert(table_info.table_id(), table.table_info());
548                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
549                        req,
550                        &table,
551                        ctx,
552                        accommodate_existing_schema,
553                        is_single_value,
554                    )? {
555                        alter_tables.push(alter_expr);
556                    }
557                }
558                None => {
559                    let create_expr =
560                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
561                    create_tables.push(create_expr);
562                }
563            }
564        }
565
566        match auto_create_table_type {
567            AutoCreateTableType::Logical(_) => {
568                if !create_tables.is_empty() {
569                    // Creates logical tables in batch.
570                    let tables = self
571                        .create_logical_tables(create_tables, ctx, statement_executor)
572                        .await?;
573
574                    for table in tables {
575                        let table_info = table.table_info();
576                        if table_info.is_ttl_instant_table() {
577                            instant_table_ids.insert(table_info.table_id());
578                        }
579                        table_infos.insert(table_info.table_id(), table.table_info());
580                    }
581                }
582                if !alter_tables.is_empty() {
583                    // Alter logical tables in batch.
584                    statement_executor
585                        .alter_logical_tables(alter_tables, ctx.clone())
586                        .await?;
587                }
588            }
589            AutoCreateTableType::Physical
590            | AutoCreateTableType::Log
591            | AutoCreateTableType::LastNonNull => {
592                // note that auto create table shouldn't be ttl instant table
593                // for it's a very unexpected behavior and should be set by user explicitly
594                for create_table in create_tables {
595                    let table = self
596                        .create_physical_table(create_table, None, ctx, statement_executor)
597                        .await?;
598                    let table_info = table.table_info();
599                    if table_info.is_ttl_instant_table() {
600                        instant_table_ids.insert(table_info.table_id());
601                    }
602                    table_infos.insert(table_info.table_id(), table.table_info());
603                }
604                for alter_expr in alter_tables.into_iter() {
605                    statement_executor
606                        .alter_table_inner(alter_expr, ctx.clone())
607                        .await?;
608                }
609            }
610
611            AutoCreateTableType::Trace => {
612                let trace_table_name = ctx
613                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
614                    .unwrap_or(TRACE_TABLE_NAME);
615
616                // note that auto create table shouldn't be ttl instant table
617                // for it's a very unexpected behavior and should be set by user explicitly
618                for mut create_table in create_tables {
619                    if create_table.table_name == trace_services_table_name(trace_table_name) {
620                        // Disable append mode for trace services table since it requires upsert behavior.
621                        create_table
622                            .table_options
623                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
624                        let table = self
625                            .create_physical_table(create_table, None, ctx, statement_executor)
626                            .await?;
627                        let table_info = table.table_info();
628                        if table_info.is_ttl_instant_table() {
629                            instant_table_ids.insert(table_info.table_id());
630                        }
631                        table_infos.insert(table_info.table_id(), table.table_info());
632                    } else {
633                        // prebuilt partition rules for uuid data: see the function
634                        // for more information
635                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
636                            .context(CreatePartitionRulesSnafu)?;
637                        // add skip index to
638                        // - trace_id: when searching by trace id
639                        // - parent_span_id: when searching root span
640                        // - span_name: when searching certain types of span
641                        let index_columns =
642                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
643                        for index_column in index_columns {
644                            if let Some(col) = create_table
645                                .column_defs
646                                .iter_mut()
647                                .find(|c| c.name == index_column)
648                            {
649                                col.options =
650                                    options_from_skipping(&SkippingIndexOptions::default())
651                                        .context(ColumnOptionsSnafu)?;
652                            } else {
653                                warn!(
654                                    "Column {} not found when creating index for trace table: {}.",
655                                    index_column, create_table.table_name
656                                );
657                            }
658                        }
659
660                        // use table_options to mark table model version
661                        create_table.table_options.insert(
662                            TABLE_DATA_MODEL.to_string(),
663                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
664                        );
665
666                        let table = self
667                            .create_physical_table(
668                                create_table,
669                                Some(partitions),
670                                ctx,
671                                statement_executor,
672                            )
673                            .await?;
674                        let table_info = table.table_info();
675                        if table_info.is_ttl_instant_table() {
676                            instant_table_ids.insert(table_info.table_id());
677                        }
678                        table_infos.insert(table_info.table_id(), table.table_info());
679                    }
680                }
681                for alter_expr in alter_tables.into_iter() {
682                    statement_executor
683                        .alter_table_inner(alter_expr, ctx.clone())
684                        .await?;
685                }
686            }
687        }
688
689        Ok(CreateAlterTableResult {
690            instant_table_ids,
691            table_infos,
692        })
693    }
694
695    async fn create_physical_table_on_demand(
696        &self,
697        ctx: &QueryContextRef,
698        physical_table: String,
699        statement_executor: &StatementExecutor,
700    ) -> Result<()> {
701        let catalog_name = ctx.current_catalog();
702        let schema_name = ctx.current_schema();
703
704        // check if exist
705        if self
706            .get_table(catalog_name, &schema_name, &physical_table)
707            .await?
708            .is_some()
709        {
710            return Ok(());
711        }
712
713        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
714        info!("Physical metric table `{table_reference}` does not exist, try creating table");
715
716        // schema with timestamp and field column
717        let default_schema = vec![
718            ColumnSchema {
719                column_name: GREPTIME_TIMESTAMP.to_string(),
720                datatype: ColumnDataType::TimestampMillisecond as _,
721                semantic_type: SemanticType::Timestamp as _,
722                datatype_extension: None,
723                options: None,
724            },
725            ColumnSchema {
726                column_name: GREPTIME_VALUE.to_string(),
727                datatype: ColumnDataType::Float64 as _,
728                semantic_type: SemanticType::Field as _,
729                datatype_extension: None,
730                options: None,
731            },
732        ];
733        let create_table_expr =
734            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
735
736        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
737        create_table_expr
738            .table_options
739            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
740
741        // create physical table
742        let res = statement_executor
743            .create_table_inner(create_table_expr, None, ctx.clone())
744            .await;
745
746        match res {
747            Ok(_) => {
748                info!("Successfully created table {table_reference}",);
749                Ok(())
750            }
751            Err(err) => {
752                error!(err; "Failed to create table {table_reference}");
753                Err(err)
754            }
755        }
756    }
757
758    async fn get_table(
759        &self,
760        catalog: &str,
761        schema: &str,
762        table: &str,
763    ) -> Result<Option<TableRef>> {
764        self.catalog_manager
765            .table(catalog, schema, table, None)
766            .await
767            .context(CatalogSnafu)
768    }
769
770    fn get_create_table_expr_on_demand(
771        &self,
772        req: &RowInsertRequest,
773        create_type: &AutoCreateTableType,
774        ctx: &QueryContextRef,
775    ) -> Result<CreateTableExpr> {
776        let mut table_options = std::collections::HashMap::with_capacity(4);
777        fill_table_options_for_create(&mut table_options, create_type, ctx);
778
779        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
780            // engine should be metric engine when creating logical tables.
781            METRIC_ENGINE_NAME
782        } else {
783            default_engine()
784        };
785
786        let schema = ctx.current_schema();
787        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
788        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
789        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
790        let mut create_table_expr =
791            build_create_table_expr(&table_ref, request_schema, engine_name)?;
792
793        info!("Table `{table_ref}` does not exist, try creating table");
794        create_table_expr.table_options.extend(table_options);
795        Ok(create_table_expr)
796    }
797
798    /// Returns an alter table expression if it finds new columns in the request.
799    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
800    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
801    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
802    /// for more details.
803    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
804    /// input `req`.
805    fn get_alter_table_expr_on_demand(
806        &self,
807        req: &mut RowInsertRequest,
808        table: &TableRef,
809        ctx: &QueryContextRef,
810        accommodate_existing_schema: bool,
811        is_single_value: bool,
812    ) -> Result<Option<AlterTableExpr>> {
813        let catalog_name = ctx.current_catalog();
814        let schema_name = ctx.current_schema();
815        let table_name = table.table_info().name.clone();
816
817        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
818        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
819        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
820        let Some(mut add_columns) = add_columns else {
821            return Ok(None);
822        };
823
824        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
825        if accommodate_existing_schema {
826            let table_schema = table.schema();
827            // Find timestamp column name
828            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
829            // Find field column name if there is only one and `is_single_value` is true.
830            let mut field_col_name = None;
831            if is_single_value {
832                let mut multiple_field_cols = false;
833                table.field_columns().for_each(|col| {
834                    if field_col_name.is_none() {
835                        field_col_name = Some(col.name.clone());
836                    } else {
837                        multiple_field_cols = true;
838                    }
839                });
840                if multiple_field_cols {
841                    field_col_name = None;
842                }
843            }
844
845            // Update column name in request schema for Timestamp/Field columns
846            if let Some(rows) = req.rows.as_mut() {
847                for col in &mut rows.schema {
848                    match col.semantic_type {
849                        x if x == SemanticType::Timestamp as i32 => {
850                            if let Some(ref ts_name) = ts_col_name {
851                                if col.column_name != *ts_name {
852                                    col.column_name = ts_name.clone();
853                                }
854                            }
855                        }
856                        x if x == SemanticType::Field as i32 => {
857                            if let Some(ref field_name) = field_col_name {
858                                if col.column_name != *field_name {
859                                    col.column_name = field_name.clone();
860                                }
861                            }
862                        }
863                        _ => {}
864                    }
865                }
866            }
867
868            // Only keep columns that are tags or non-single field.
869            add_columns.add_columns.retain(|col| {
870                let def = col.column_def.as_ref().unwrap();
871                def.semantic_type == SemanticType::Tag as i32
872                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
873            });
874
875            if add_columns.add_columns.is_empty() {
876                return Ok(None);
877            }
878        }
879
880        Ok(Some(AlterTableExpr {
881            catalog_name: catalog_name.to_string(),
882            schema_name: schema_name.to_string(),
883            table_name: table_name.to_string(),
884            kind: Some(Kind::AddColumns(add_columns)),
885        }))
886    }
887
888    /// Creates a table with options.
889    async fn create_physical_table(
890        &self,
891        mut create_table_expr: CreateTableExpr,
892        partitions: Option<Partitions>,
893        ctx: &QueryContextRef,
894        statement_executor: &StatementExecutor,
895    ) -> Result<TableRef> {
896        {
897            let table_ref = TableReference::full(
898                &create_table_expr.catalog_name,
899                &create_table_expr.schema_name,
900                &create_table_expr.table_name,
901            );
902
903            info!("Table `{table_ref}` does not exist, try creating table");
904        }
905        let res = statement_executor
906            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
907            .await;
908
909        let table_ref = TableReference::full(
910            &create_table_expr.catalog_name,
911            &create_table_expr.schema_name,
912            &create_table_expr.table_name,
913        );
914
915        match res {
916            Ok(table) => {
917                info!(
918                    "Successfully created table {} with options: {:?}",
919                    table_ref, create_table_expr.table_options,
920                );
921                Ok(table)
922            }
923            Err(err) => {
924                error!(err; "Failed to create table {}", table_ref);
925                Err(err)
926            }
927        }
928    }
929
930    async fn create_logical_tables(
931        &self,
932        create_table_exprs: Vec<CreateTableExpr>,
933        ctx: &QueryContextRef,
934        statement_executor: &StatementExecutor,
935    ) -> Result<Vec<TableRef>> {
936        let res = statement_executor
937            .create_logical_tables(&create_table_exprs, ctx.clone())
938            .await;
939
940        match res {
941            Ok(res) => {
942                info!("Successfully created logical tables");
943                Ok(res)
944            }
945            Err(err) => {
946                let failed_tables = create_table_exprs
947                    .into_iter()
948                    .map(|expr| {
949                        format!(
950                            "{}.{}.{}",
951                            expr.catalog_name, expr.schema_name, expr.table_name
952                        )
953                    })
954                    .collect::<Vec<_>>();
955                error!(
956                    err;
957                    "Failed to create logical tables {:?}",
958                    failed_tables
959                );
960                Err(err)
961            }
962        }
963    }
964
965    pub fn node_manager(&self) -> &NodeManagerRef {
966        &self.node_manager
967    }
968
969    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
970        &self.partition_manager
971    }
972}
973
974fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
975    for request in &requests.inserts {
976        let rows = request.rows.as_ref().unwrap();
977        let column_count = rows.schema.len();
978        rows.rows.iter().try_for_each(|r| {
979            ensure!(
980                r.values.len() == column_count,
981                InvalidInsertRequestSnafu {
982                    reason: format!(
983                        "column count mismatch, columns: {}, values: {}",
984                        column_count,
985                        r.values.len()
986                    )
987                }
988            );
989            Ok(())
990        })?;
991    }
992    Ok(())
993}
994
995/// Fill table options for a new table by create type.
996pub fn fill_table_options_for_create(
997    table_options: &mut std::collections::HashMap<String, String>,
998    create_type: &AutoCreateTableType,
999    ctx: &QueryContextRef,
1000) {
1001    for key in VALID_TABLE_OPTION_KEYS {
1002        if let Some(value) = ctx.extension(key) {
1003            table_options.insert(key.to_string(), value.to_string());
1004        }
1005    }
1006
1007    match create_type {
1008        AutoCreateTableType::Logical(physical_table) => {
1009            table_options.insert(
1010                LOGICAL_TABLE_METADATA_KEY.to_string(),
1011                physical_table.to_string(),
1012            );
1013        }
1014        AutoCreateTableType::Physical => {
1015            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1016                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1017            }
1018            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1019                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1020            }
1021        }
1022        // Set append_mode to true for log table.
1023        // because log tables should keep rows with the same ts and tags.
1024        AutoCreateTableType::Log => {
1025            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1026        }
1027        AutoCreateTableType::LastNonNull => {
1028            table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1029        }
1030        AutoCreateTableType::Trace => {
1031            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1032        }
1033    }
1034}
1035
1036pub fn build_create_table_expr(
1037    table: &TableReference,
1038    request_schema: &[ColumnSchema],
1039    engine: &str,
1040) -> Result<CreateTableExpr> {
1041    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1042}
1043
1044/// Result of `create_or_alter_tables_on_demand`.
1045struct CreateAlterTableResult {
1046    /// table ids of ttl=instant tables.
1047    instant_table_ids: HashSet<TableId>,
1048    /// Table Info of the created tables.
1049    table_infos: HashMap<TableId, Arc<TableInfo>>,
1050}
1051
1052struct FlowMirrorTask {
1053    requests: HashMap<Peer, RegionInsertRequests>,
1054    num_rows: usize,
1055}
1056
1057impl FlowMirrorTask {
1058    async fn new(
1059        cache: &TableFlownodeSetCacheRef,
1060        requests: impl Iterator<Item = &RegionInsertRequest>,
1061    ) -> Result<Self> {
1062        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1063            HashMap::new();
1064        let mut num_rows = 0;
1065
1066        for req in requests {
1067            let table_id = RegionId::from_u64(req.region_id).table_id();
1068            match src_table_reqs.get_mut(&table_id) {
1069                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1070                // already know this is not source table
1071                Some(None) => continue,
1072                _ => {
1073                    // dedup peers
1074                    let peers = cache
1075                        .get(table_id)
1076                        .await
1077                        .context(RequestInsertsSnafu)?
1078                        .unwrap_or_default()
1079                        .values()
1080                        .cloned()
1081                        .collect::<HashSet<_>>()
1082                        .into_iter()
1083                        .collect::<Vec<_>>();
1084
1085                    if !peers.is_empty() {
1086                        let mut reqs = RegionInsertRequests::default();
1087                        reqs.requests.push(req.clone());
1088                        num_rows += reqs
1089                            .requests
1090                            .iter()
1091                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1092                            .sum::<usize>();
1093                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1094                    } else {
1095                        // insert a empty entry to avoid repeat query
1096                        src_table_reqs.insert(table_id, None);
1097                    }
1098                }
1099            }
1100        }
1101
1102        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1103
1104        for (_table_id, (peers, reqs)) in src_table_reqs
1105            .into_iter()
1106            .filter_map(|(k, v)| v.map(|v| (k, v)))
1107        {
1108            if peers.len() == 1 {
1109                // fast path, zero copy
1110                inserts
1111                    .entry(peers[0].clone())
1112                    .or_default()
1113                    .requests
1114                    .extend(reqs.requests);
1115                continue;
1116            } else {
1117                // TODO(discord9): need to split requests to multiple flownodes
1118                for flownode in peers {
1119                    inserts
1120                        .entry(flownode.clone())
1121                        .or_default()
1122                        .requests
1123                        .extend(reqs.requests.clone());
1124                }
1125            }
1126        }
1127
1128        Ok(Self {
1129            requests: inserts,
1130            num_rows,
1131        })
1132    }
1133
1134    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1135        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1136        for (peer, inserts) in self.requests {
1137            let node_manager = node_manager.clone();
1138            common_runtime::spawn_global(async move {
1139                let result = node_manager
1140                    .flownode(&peer)
1141                    .await
1142                    .handle_inserts(inserts)
1143                    .await
1144                    .context(RequestInsertsSnafu);
1145
1146                match result {
1147                    Ok(resp) => {
1148                        let affected_rows = resp.affected_rows;
1149                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1150                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1151                    }
1152                    Err(err) => {
1153                        error!(err; "Failed to insert data into flownode {}", peer);
1154                    }
1155                }
1156            });
1157        }
1158
1159        Ok(())
1160    }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165    use std::sync::Arc;
1166
1167    use api::v1::helper::{field_column_schema, time_index_column_schema};
1168    use api::v1::{RowInsertRequest, Rows, Value};
1169    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1170    use common_meta::cache::new_table_flownode_set_cache;
1171    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1172    use common_meta::test_util::MockDatanodeManager;
1173    use datatypes::data_type::ConcreteDataType;
1174    use datatypes::schema::ColumnSchema;
1175    use moka::future::Cache;
1176    use session::context::QueryContext;
1177    use table::dist_table::DummyDataSource;
1178    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1179    use table::TableRef;
1180
1181    use super::*;
1182    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1183
1184    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1185        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1186            ColumnSchema::new(
1187                ts_name,
1188                ConcreteDataType::timestamp_millisecond_datatype(),
1189                false,
1190            )
1191            .with_time_index(true),
1192            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1193        ])
1194        .unwrap()
1195        .build()
1196        .unwrap();
1197        let meta = TableMetaBuilder::empty()
1198            .schema(Arc::new(schema))
1199            .primary_key_indices(vec![])
1200            .value_indices(vec![1])
1201            .engine("mito")
1202            .next_column_id(0)
1203            .options(Default::default())
1204            .created_on(Default::default())
1205            .region_numbers(vec![0])
1206            .build()
1207            .unwrap();
1208        let info = Arc::new(
1209            TableInfoBuilder::default()
1210                .table_id(1)
1211                .table_version(0)
1212                .name("test_table")
1213                .schema_name(DEFAULT_SCHEMA_NAME)
1214                .catalog_name(DEFAULT_CATALOG_NAME)
1215                .desc(None)
1216                .table_type(TableType::Base)
1217                .meta(meta)
1218                .build()
1219                .unwrap(),
1220        );
1221        Arc::new(table::Table::new(
1222            info,
1223            table::metadata::FilterPushDownType::Unsupported,
1224            Arc::new(DummyDataSource),
1225        ))
1226    }
1227
1228    #[tokio::test]
1229    async fn test_accommodate_existing_schema_logic() {
1230        let ts_name = "my_ts";
1231        let field_name = "my_field";
1232        let table = make_table_ref_with_schema(ts_name, field_name);
1233
1234        // The request uses different names for timestamp and field columns
1235        let mut req = RowInsertRequest {
1236            table_name: "test_table".to_string(),
1237            rows: Some(Rows {
1238                schema: vec![
1239                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1240                    field_column_schema("field_wrong", ColumnDataType::Float64),
1241                ],
1242                rows: vec![api::v1::Row {
1243                    values: vec![Value::default(), Value::default()],
1244                }],
1245            }),
1246        };
1247        let ctx = Arc::new(QueryContext::with(
1248            DEFAULT_CATALOG_NAME,
1249            DEFAULT_SCHEMA_NAME,
1250        ));
1251
1252        let kv_backend = prepare_mocked_backend().await;
1253        let inserter = Inserter::new(
1254            catalog::memory::MemoryCatalogManager::new(),
1255            create_partition_rule_manager(kv_backend.clone()).await,
1256            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1257            Arc::new(new_table_flownode_set_cache(
1258                String::new(),
1259                Cache::new(100),
1260                kv_backend.clone(),
1261            )),
1262        );
1263        let alter_expr = inserter
1264            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1265            .unwrap();
1266        assert!(alter_expr.is_none());
1267
1268        // The request's schema should have updated names for timestamp and field columns
1269        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1270        assert_eq!(req_schema[0].column_name, ts_name);
1271        assert_eq!(req_schema[1].column_name, field_name);
1272    }
1273}