operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32    TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_services_table_name,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::Output;
39use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::ResultExt;
48use snafu::prelude::*;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{
56    APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TWCS_TIME_WINDOW,
57};
58use store_api::storage::{RegionId, TableId};
59use table::TableRef;
60use table::metadata::TableInfo;
61use table::requests::{
62    AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
63    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
64};
65use table::table_reference::TableReference;
66
67use crate::error::{
68    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
69    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
70};
71use crate::expr_helper;
72use crate::region_req_factory::RegionRequestFactory;
73use crate::req_convert::common::preprocess_row_insert_requests;
74use crate::req_convert::insert::{
75    ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
76};
77use crate::statement::StatementExecutor;
78
79pub struct Inserter {
80    catalog_manager: CatalogManagerRef,
81    pub(crate) partition_manager: PartitionRuleManagerRef,
82    pub(crate) node_manager: NodeManagerRef,
83    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
84}
85
86pub type InserterRef = Arc<Inserter>;
87
88/// Hint for the table type to create automatically.
89#[derive(Clone)]
90pub enum AutoCreateTableType {
91    /// A logical table with the physical table name.
92    Logical(String),
93    /// A physical table.
94    Physical,
95    /// A log table which is append-only.
96    Log,
97    /// A table that merges rows by `last_non_null` strategy.
98    LastNonNull,
99    /// Create table that build index and default partition rules on trace_id
100    Trace,
101}
102
103impl AutoCreateTableType {
104    fn as_str(&self) -> &'static str {
105        match self {
106            AutoCreateTableType::Logical(_) => "logical",
107            AutoCreateTableType::Physical => "physical",
108            AutoCreateTableType::Log => "log",
109            AutoCreateTableType::LastNonNull => "last_non_null",
110            AutoCreateTableType::Trace => "trace",
111        }
112    }
113}
114
115/// Split insert requests into normal and instant requests.
116///
117/// Where instant requests are requests with ttl=instant,
118/// and normal requests are requests with ttl set to other values.
119///
120/// This is used to split requests for different processing.
121#[derive(Clone)]
122pub struct InstantAndNormalInsertRequests {
123    /// Requests with normal ttl.
124    pub normal_requests: RegionInsertRequests,
125    /// Requests with ttl=instant.
126    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
127    pub instant_requests: RegionInsertRequests,
128}
129
130impl Inserter {
131    pub fn new(
132        catalog_manager: CatalogManagerRef,
133        partition_manager: PartitionRuleManagerRef,
134        node_manager: NodeManagerRef,
135        table_flownode_set_cache: TableFlownodeSetCacheRef,
136    ) -> Self {
137        Self {
138            catalog_manager,
139            partition_manager,
140            node_manager,
141            table_flownode_set_cache,
142        }
143    }
144
145    pub async fn handle_column_inserts(
146        &self,
147        requests: InsertRequests,
148        ctx: QueryContextRef,
149        statement_executor: &StatementExecutor,
150    ) -> Result<Output> {
151        let row_inserts = ColumnToRow::convert(requests)?;
152        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
153            .await
154    }
155
156    /// Handles row inserts request and creates a physical table on demand.
157    pub async fn handle_row_inserts(
158        &self,
159        mut requests: RowInsertRequests,
160        ctx: QueryContextRef,
161        statement_executor: &StatementExecutor,
162        accommodate_existing_schema: bool,
163        is_single_value: bool,
164    ) -> Result<Output> {
165        preprocess_row_insert_requests(&mut requests.inserts)?;
166        self.handle_row_inserts_with_create_type(
167            requests,
168            ctx,
169            statement_executor,
170            AutoCreateTableType::Physical,
171            accommodate_existing_schema,
172            is_single_value,
173        )
174        .await
175    }
176
177    /// Handles row inserts request and creates a log table on demand.
178    pub async fn handle_log_inserts(
179        &self,
180        requests: RowInsertRequests,
181        ctx: QueryContextRef,
182        statement_executor: &StatementExecutor,
183    ) -> Result<Output> {
184        self.handle_row_inserts_with_create_type(
185            requests,
186            ctx,
187            statement_executor,
188            AutoCreateTableType::Log,
189            false,
190            false,
191        )
192        .await
193    }
194
195    pub async fn handle_trace_inserts(
196        &self,
197        requests: RowInsertRequests,
198        ctx: QueryContextRef,
199        statement_executor: &StatementExecutor,
200    ) -> Result<Output> {
201        self.handle_row_inserts_with_create_type(
202            requests,
203            ctx,
204            statement_executor,
205            AutoCreateTableType::Trace,
206            false,
207            false,
208        )
209        .await
210    }
211
212    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
213    pub async fn handle_last_non_null_inserts(
214        &self,
215        requests: RowInsertRequests,
216        ctx: QueryContextRef,
217        statement_executor: &StatementExecutor,
218        accommodate_existing_schema: bool,
219        is_single_value: bool,
220    ) -> Result<Output> {
221        self.handle_row_inserts_with_create_type(
222            requests,
223            ctx,
224            statement_executor,
225            AutoCreateTableType::LastNonNull,
226            accommodate_existing_schema,
227            is_single_value,
228        )
229        .await
230    }
231
232    /// Handles row inserts request with specified [AutoCreateTableType].
233    async fn handle_row_inserts_with_create_type(
234        &self,
235        mut requests: RowInsertRequests,
236        ctx: QueryContextRef,
237        statement_executor: &StatementExecutor,
238        create_type: AutoCreateTableType,
239        accommodate_existing_schema: bool,
240        is_single_value: bool,
241    ) -> Result<Output> {
242        // remove empty requests
243        requests.inserts.retain(|req| {
244            req.rows
245                .as_ref()
246                .map(|r| !r.rows.is_empty())
247                .unwrap_or_default()
248        });
249        validate_column_count_match(&requests)?;
250
251        let CreateAlterTableResult {
252            instant_table_ids,
253            table_infos,
254        } = self
255            .create_or_alter_tables_on_demand(
256                &mut requests,
257                &ctx,
258                create_type,
259                statement_executor,
260                accommodate_existing_schema,
261                is_single_value,
262            )
263            .await?;
264
265        let name_to_info = table_infos
266            .values()
267            .map(|info| (info.name.clone(), info.clone()))
268            .collect::<HashMap<_, _>>();
269        let inserts = RowToRegion::new(
270            name_to_info,
271            instant_table_ids,
272            self.partition_manager.as_ref(),
273        )
274        .convert(requests)
275        .await?;
276
277        self.do_request(inserts, &table_infos, &ctx).await
278    }
279
280    /// Handles row inserts request with metric engine.
281    pub async fn handle_metric_row_inserts(
282        &self,
283        mut requests: RowInsertRequests,
284        ctx: QueryContextRef,
285        statement_executor: &StatementExecutor,
286        physical_table: String,
287    ) -> Result<Output> {
288        // remove empty requests
289        requests.inserts.retain(|req| {
290            req.rows
291                .as_ref()
292                .map(|r| !r.rows.is_empty())
293                .unwrap_or_default()
294        });
295        validate_column_count_match(&requests)?;
296
297        // check and create physical table
298        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
299            .await?;
300
301        // check and create logical tables
302        let CreateAlterTableResult {
303            instant_table_ids,
304            table_infos,
305        } = self
306            .create_or_alter_tables_on_demand(
307                &mut requests,
308                &ctx,
309                AutoCreateTableType::Logical(physical_table.to_string()),
310                statement_executor,
311                true,
312                true,
313            )
314            .await?;
315        let name_to_info = table_infos
316            .values()
317            .map(|info| (info.name.clone(), info.clone()))
318            .collect::<HashMap<_, _>>();
319        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
320            .convert(requests)
321            .await?;
322
323        self.do_request(inserts, &table_infos, &ctx).await
324    }
325
326    pub async fn handle_table_insert(
327        &self,
328        request: TableInsertRequest,
329        ctx: QueryContextRef,
330    ) -> Result<Output> {
331        let catalog = request.catalog_name.as_str();
332        let schema = request.schema_name.as_str();
333        let table_name = request.table_name.as_str();
334        let table = self.get_table(catalog, schema, table_name).await?;
335        let table = table.with_context(|| TableNotFoundSnafu {
336            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
337        })?;
338        let table_info = table.table_info();
339
340        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
341            .convert(request)
342            .await?;
343
344        let table_infos =
345            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
346
347        self.do_request(inserts, &table_infos, &ctx).await
348    }
349
350    pub async fn handle_statement_insert(
351        &self,
352        insert: &Insert,
353        ctx: &QueryContextRef,
354    ) -> Result<Output> {
355        let (inserts, table_info) =
356            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
357                .convert(insert, ctx)
358                .await?;
359
360        let table_infos =
361            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
362
363        self.do_request(inserts, &table_infos, ctx).await
364    }
365}
366
367impl Inserter {
368    async fn do_request(
369        &self,
370        requests: InstantAndNormalInsertRequests,
371        table_infos: &HashMap<TableId, Arc<TableInfo>>,
372        ctx: &QueryContextRef,
373    ) -> Result<Output> {
374        // Fill impure default values in the request
375        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
376
377        let write_cost = write_meter!(
378            ctx.current_catalog(),
379            ctx.current_schema(),
380            requests,
381            ctx.channel() as u8
382        );
383        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
384            tracing_context: TracingContext::from_current_span().to_w3c(),
385            dbname: ctx.get_db_string(),
386            ..Default::default()
387        });
388
389        let InstantAndNormalInsertRequests {
390            normal_requests,
391            instant_requests,
392        } = requests;
393
394        // Mirror requests for source table to flownode asynchronously
395        let flow_mirror_task = FlowMirrorTask::new(
396            &self.table_flownode_set_cache,
397            normal_requests
398                .requests
399                .iter()
400                .chain(instant_requests.requests.iter()),
401        )
402        .await?;
403        flow_mirror_task.detach(self.node_manager.clone())?;
404
405        // Write requests to datanode and wait for response
406        let write_tasks = self
407            .group_requests_by_peer(normal_requests)
408            .await?
409            .into_iter()
410            .map(|(peer, inserts)| {
411                let node_manager = self.node_manager.clone();
412                let request = request_factory.build_insert(inserts);
413                common_runtime::spawn_global(async move {
414                    node_manager
415                        .datanode(&peer)
416                        .await
417                        .handle(request)
418                        .await
419                        .context(RequestInsertsSnafu)
420                })
421            });
422        let results = future::try_join_all(write_tasks)
423            .await
424            .context(JoinTaskSnafu)?;
425        let affected_rows = results
426            .into_iter()
427            .map(|resp| resp.map(|r| r.affected_rows))
428            .sum::<Result<AffectedRows>>()?;
429        crate::metrics::DIST_INGEST_ROW_COUNT
430            .with_label_values(&[ctx.get_db_string().as_str()])
431            .inc_by(affected_rows as u64);
432        Ok(Output::new(
433            OutputData::AffectedRows(affected_rows),
434            OutputMeta::new_with_cost(write_cost as _),
435        ))
436    }
437
438    async fn group_requests_by_peer(
439        &self,
440        requests: RegionInsertRequests,
441    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
442        // group by region ids first to reduce repeatedly call `find_region_leader`
443        // TODO(discord9): determine if a addition clone is worth it
444        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
445        for req in requests.requests {
446            let region_id = RegionId::from_u64(req.region_id);
447            requests_per_region
448                .entry(region_id)
449                .or_default()
450                .requests
451                .push(req);
452        }
453
454        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
455
456        for (region_id, reqs) in requests_per_region {
457            let peer = self
458                .partition_manager
459                .find_region_leader(region_id)
460                .await
461                .context(FindRegionLeaderSnafu)?;
462            inserts
463                .entry(peer)
464                .or_default()
465                .requests
466                .extend(reqs.requests);
467        }
468
469        Ok(inserts)
470    }
471
472    /// Creates or alter tables on demand:
473    /// - if table does not exist, create table by inferred CreateExpr
474    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
475    ///
476    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
477    /// This mapping is used in the conversion of RowToRegion.
478    ///
479    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
480    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
481    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
482    /// remote write. This will modify the `RowInsertRequests` in place.
483    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
484    async fn create_or_alter_tables_on_demand(
485        &self,
486        requests: &mut RowInsertRequests,
487        ctx: &QueryContextRef,
488        auto_create_table_type: AutoCreateTableType,
489        statement_executor: &StatementExecutor,
490        accommodate_existing_schema: bool,
491        is_single_value: bool,
492    ) -> Result<CreateAlterTableResult> {
493        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
494            .with_label_values(&[auto_create_table_type.as_str()])
495            .start_timer();
496
497        let catalog = ctx.current_catalog();
498        let schema = ctx.current_schema();
499
500        let mut table_infos = HashMap::new();
501        // If `auto_create_table` hint is disabled, skip creating/altering tables.
502        let auto_create_table_hint = ctx
503            .extension(AUTO_CREATE_TABLE_KEY)
504            .map(|v| v.parse::<bool>())
505            .transpose()
506            .map_err(|_| {
507                InvalidInsertRequestSnafu {
508                    reason: "`auto_create_table` hint must be a boolean",
509                }
510                .build()
511            })?
512            .unwrap_or(true);
513        if !auto_create_table_hint {
514            let mut instant_table_ids = HashSet::new();
515            for req in &requests.inserts {
516                let table = self
517                    .get_table(catalog, &schema, &req.table_name)
518                    .await?
519                    .context(InvalidInsertRequestSnafu {
520                        reason: format!(
521                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
522                            req.table_name
523                        ),
524                    })?;
525                let table_info = table.table_info();
526                if table_info.is_ttl_instant_table() {
527                    instant_table_ids.insert(table_info.table_id());
528                }
529                table_infos.insert(table_info.table_id(), table.table_info());
530            }
531            let ret = CreateAlterTableResult {
532                instant_table_ids,
533                table_infos,
534            };
535            return Ok(ret);
536        }
537
538        let mut create_tables = vec![];
539        let mut alter_tables = vec![];
540        let mut instant_table_ids = HashSet::new();
541
542        for req in &mut requests.inserts {
543            match self.get_table(catalog, &schema, &req.table_name).await? {
544                Some(table) => {
545                    let table_info = table.table_info();
546                    if table_info.is_ttl_instant_table() {
547                        instant_table_ids.insert(table_info.table_id());
548                    }
549                    table_infos.insert(table_info.table_id(), table.table_info());
550                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
551                        req,
552                        &table,
553                        ctx,
554                        accommodate_existing_schema,
555                        is_single_value,
556                    )? {
557                        alter_tables.push(alter_expr);
558                    }
559                }
560                None => {
561                    let create_expr =
562                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
563                    create_tables.push(create_expr);
564                }
565            }
566        }
567
568        match auto_create_table_type {
569            AutoCreateTableType::Logical(_) => {
570                if !create_tables.is_empty() {
571                    // Creates logical tables in batch.
572                    let tables = self
573                        .create_logical_tables(create_tables, ctx, statement_executor)
574                        .await?;
575
576                    for table in tables {
577                        let table_info = table.table_info();
578                        if table_info.is_ttl_instant_table() {
579                            instant_table_ids.insert(table_info.table_id());
580                        }
581                        table_infos.insert(table_info.table_id(), table.table_info());
582                    }
583                }
584                if !alter_tables.is_empty() {
585                    // Alter logical tables in batch.
586                    statement_executor
587                        .alter_logical_tables(alter_tables, ctx.clone())
588                        .await?;
589                }
590            }
591            AutoCreateTableType::Physical
592            | AutoCreateTableType::Log
593            | AutoCreateTableType::LastNonNull => {
594                // note that auto create table shouldn't be ttl instant table
595                // for it's a very unexpected behavior and should be set by user explicitly
596                for create_table in create_tables {
597                    let table = self
598                        .create_physical_table(create_table, None, ctx, statement_executor)
599                        .await?;
600                    let table_info = table.table_info();
601                    if table_info.is_ttl_instant_table() {
602                        instant_table_ids.insert(table_info.table_id());
603                    }
604                    table_infos.insert(table_info.table_id(), table.table_info());
605                }
606                for alter_expr in alter_tables.into_iter() {
607                    statement_executor
608                        .alter_table_inner(alter_expr, ctx.clone())
609                        .await?;
610                }
611            }
612
613            AutoCreateTableType::Trace => {
614                let trace_table_name = ctx
615                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
616                    .unwrap_or(TRACE_TABLE_NAME);
617
618                // note that auto create table shouldn't be ttl instant table
619                // for it's a very unexpected behavior and should be set by user explicitly
620                for mut create_table in create_tables {
621                    if create_table.table_name == trace_services_table_name(trace_table_name) {
622                        // Disable append mode for trace services table since it requires upsert behavior.
623                        create_table
624                            .table_options
625                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
626                        let table = self
627                            .create_physical_table(create_table, None, ctx, statement_executor)
628                            .await?;
629                        let table_info = table.table_info();
630                        if table_info.is_ttl_instant_table() {
631                            instant_table_ids.insert(table_info.table_id());
632                        }
633                        table_infos.insert(table_info.table_id(), table.table_info());
634                    } else {
635                        // prebuilt partition rules for uuid data: see the function
636                        // for more information
637                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
638                            .context(CreatePartitionRulesSnafu)?;
639                        // add skip index to
640                        // - trace_id: when searching by trace id
641                        // - parent_span_id: when searching root span
642                        // - span_name: when searching certain types of span
643                        let index_columns =
644                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
645                        for index_column in index_columns {
646                            if let Some(col) = create_table
647                                .column_defs
648                                .iter_mut()
649                                .find(|c| c.name == index_column)
650                            {
651                                col.options =
652                                    options_from_skipping(&SkippingIndexOptions::default())
653                                        .context(ColumnOptionsSnafu)?;
654                            } else {
655                                warn!(
656                                    "Column {} not found when creating index for trace table: {}.",
657                                    index_column, create_table.table_name
658                                );
659                            }
660                        }
661
662                        // use table_options to mark table model version
663                        create_table.table_options.insert(
664                            TABLE_DATA_MODEL.to_string(),
665                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
666                        );
667
668                        let table = self
669                            .create_physical_table(
670                                create_table,
671                                Some(partitions),
672                                ctx,
673                                statement_executor,
674                            )
675                            .await?;
676                        let table_info = table.table_info();
677                        if table_info.is_ttl_instant_table() {
678                            instant_table_ids.insert(table_info.table_id());
679                        }
680                        table_infos.insert(table_info.table_id(), table.table_info());
681                    }
682                }
683                for alter_expr in alter_tables.into_iter() {
684                    statement_executor
685                        .alter_table_inner(alter_expr, ctx.clone())
686                        .await?;
687                }
688            }
689        }
690
691        Ok(CreateAlterTableResult {
692            instant_table_ids,
693            table_infos,
694        })
695    }
696
697    async fn create_physical_table_on_demand(
698        &self,
699        ctx: &QueryContextRef,
700        physical_table: String,
701        statement_executor: &StatementExecutor,
702    ) -> Result<()> {
703        let catalog_name = ctx.current_catalog();
704        let schema_name = ctx.current_schema();
705
706        // check if exist
707        if self
708            .get_table(catalog_name, &schema_name, &physical_table)
709            .await?
710            .is_some()
711        {
712            return Ok(());
713        }
714
715        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
716        info!("Physical metric table `{table_reference}` does not exist, try creating table");
717
718        // schema with timestamp and field column
719        let default_schema = vec![
720            ColumnSchema {
721                column_name: GREPTIME_TIMESTAMP.to_string(),
722                datatype: ColumnDataType::TimestampMillisecond as _,
723                semantic_type: SemanticType::Timestamp as _,
724                datatype_extension: None,
725                options: None,
726            },
727            ColumnSchema {
728                column_name: GREPTIME_VALUE.to_string(),
729                datatype: ColumnDataType::Float64 as _,
730                semantic_type: SemanticType::Field as _,
731                datatype_extension: None,
732                options: None,
733            },
734        ];
735        let create_table_expr =
736            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
737
738        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
739        create_table_expr
740            .table_options
741            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
742
743        // create physical table
744        let res = statement_executor
745            .create_table_inner(create_table_expr, None, ctx.clone())
746            .await;
747
748        match res {
749            Ok(_) => {
750                info!("Successfully created table {table_reference}",);
751                Ok(())
752            }
753            Err(err) => {
754                error!(err; "Failed to create table {table_reference}");
755                Err(err)
756            }
757        }
758    }
759
760    async fn get_table(
761        &self,
762        catalog: &str,
763        schema: &str,
764        table: &str,
765    ) -> Result<Option<TableRef>> {
766        self.catalog_manager
767            .table(catalog, schema, table, None)
768            .await
769            .context(CatalogSnafu)
770    }
771
772    fn get_create_table_expr_on_demand(
773        &self,
774        req: &RowInsertRequest,
775        create_type: &AutoCreateTableType,
776        ctx: &QueryContextRef,
777    ) -> Result<CreateTableExpr> {
778        let mut table_options = std::collections::HashMap::with_capacity(4);
779        fill_table_options_for_create(&mut table_options, create_type, ctx);
780
781        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
782            // engine should be metric engine when creating logical tables.
783            METRIC_ENGINE_NAME
784        } else {
785            default_engine()
786        };
787
788        let schema = ctx.current_schema();
789        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
790        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
791        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
792        let mut create_table_expr =
793            build_create_table_expr(&table_ref, request_schema, engine_name)?;
794
795        info!("Table `{table_ref}` does not exist, try creating table");
796        create_table_expr.table_options.extend(table_options);
797        Ok(create_table_expr)
798    }
799
800    /// Returns an alter table expression if it finds new columns in the request.
801    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
802    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
803    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
804    /// for more details.
805    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
806    /// input `req`.
807    fn get_alter_table_expr_on_demand(
808        &self,
809        req: &mut RowInsertRequest,
810        table: &TableRef,
811        ctx: &QueryContextRef,
812        accommodate_existing_schema: bool,
813        is_single_value: bool,
814    ) -> Result<Option<AlterTableExpr>> {
815        let catalog_name = ctx.current_catalog();
816        let schema_name = ctx.current_schema();
817        let table_name = table.table_info().name.clone();
818
819        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
820        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
821        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
822        let Some(mut add_columns) = add_columns else {
823            return Ok(None);
824        };
825
826        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
827        if accommodate_existing_schema {
828            let table_schema = table.schema();
829            // Find timestamp column name
830            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
831            // Find field column name if there is only one and `is_single_value` is true.
832            let mut field_col_name = None;
833            if is_single_value {
834                let mut multiple_field_cols = false;
835                table.field_columns().for_each(|col| {
836                    if field_col_name.is_none() {
837                        field_col_name = Some(col.name.clone());
838                    } else {
839                        multiple_field_cols = true;
840                    }
841                });
842                if multiple_field_cols {
843                    field_col_name = None;
844                }
845            }
846
847            // Update column name in request schema for Timestamp/Field columns
848            if let Some(rows) = req.rows.as_mut() {
849                for col in &mut rows.schema {
850                    match col.semantic_type {
851                        x if x == SemanticType::Timestamp as i32 => {
852                            if let Some(ref ts_name) = ts_col_name
853                                && col.column_name != *ts_name
854                            {
855                                col.column_name = ts_name.clone();
856                            }
857                        }
858                        x if x == SemanticType::Field as i32 => {
859                            if let Some(ref field_name) = field_col_name
860                                && col.column_name != *field_name
861                            {
862                                col.column_name = field_name.clone();
863                            }
864                        }
865                        _ => {}
866                    }
867                }
868            }
869
870            // Only keep columns that are tags or non-single field.
871            add_columns.add_columns.retain(|col| {
872                let def = col.column_def.as_ref().unwrap();
873                def.semantic_type == SemanticType::Tag as i32
874                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
875            });
876
877            if add_columns.add_columns.is_empty() {
878                return Ok(None);
879            }
880        }
881
882        Ok(Some(AlterTableExpr {
883            catalog_name: catalog_name.to_string(),
884            schema_name: schema_name.to_string(),
885            table_name: table_name.to_string(),
886            kind: Some(Kind::AddColumns(add_columns)),
887        }))
888    }
889
890    /// Creates a table with options.
891    async fn create_physical_table(
892        &self,
893        mut create_table_expr: CreateTableExpr,
894        partitions: Option<Partitions>,
895        ctx: &QueryContextRef,
896        statement_executor: &StatementExecutor,
897    ) -> Result<TableRef> {
898        {
899            let table_ref = TableReference::full(
900                &create_table_expr.catalog_name,
901                &create_table_expr.schema_name,
902                &create_table_expr.table_name,
903            );
904
905            info!("Table `{table_ref}` does not exist, try creating table");
906        }
907        let res = statement_executor
908            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
909            .await;
910
911        let table_ref = TableReference::full(
912            &create_table_expr.catalog_name,
913            &create_table_expr.schema_name,
914            &create_table_expr.table_name,
915        );
916
917        match res {
918            Ok(table) => {
919                info!(
920                    "Successfully created table {} with options: {:?}",
921                    table_ref, create_table_expr.table_options,
922                );
923                Ok(table)
924            }
925            Err(err) => {
926                error!(err; "Failed to create table {}", table_ref);
927                Err(err)
928            }
929        }
930    }
931
932    async fn create_logical_tables(
933        &self,
934        create_table_exprs: Vec<CreateTableExpr>,
935        ctx: &QueryContextRef,
936        statement_executor: &StatementExecutor,
937    ) -> Result<Vec<TableRef>> {
938        let res = statement_executor
939            .create_logical_tables(&create_table_exprs, ctx.clone())
940            .await;
941
942        match res {
943            Ok(res) => {
944                info!("Successfully created logical tables");
945                Ok(res)
946            }
947            Err(err) => {
948                let failed_tables = create_table_exprs
949                    .into_iter()
950                    .map(|expr| {
951                        format!(
952                            "{}.{}.{}",
953                            expr.catalog_name, expr.schema_name, expr.table_name
954                        )
955                    })
956                    .collect::<Vec<_>>();
957                error!(
958                    err;
959                    "Failed to create logical tables {:?}",
960                    failed_tables
961                );
962                Err(err)
963            }
964        }
965    }
966
967    pub fn node_manager(&self) -> &NodeManagerRef {
968        &self.node_manager
969    }
970
971    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
972        &self.partition_manager
973    }
974}
975
976fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
977    for request in &requests.inserts {
978        let rows = request.rows.as_ref().unwrap();
979        let column_count = rows.schema.len();
980        rows.rows.iter().try_for_each(|r| {
981            ensure!(
982                r.values.len() == column_count,
983                InvalidInsertRequestSnafu {
984                    reason: format!(
985                        "column count mismatch, columns: {}, values: {}",
986                        column_count,
987                        r.values.len()
988                    )
989                }
990            );
991            Ok(())
992        })?;
993    }
994    Ok(())
995}
996
997/// Fill table options for a new table by create type.
998pub fn fill_table_options_for_create(
999    table_options: &mut std::collections::HashMap<String, String>,
1000    create_type: &AutoCreateTableType,
1001    ctx: &QueryContextRef,
1002) {
1003    for key in VALID_TABLE_OPTION_KEYS {
1004        if let Some(value) = ctx.extension(key) {
1005            table_options.insert(key.to_string(), value.to_string());
1006        }
1007    }
1008
1009    match create_type {
1010        AutoCreateTableType::Logical(physical_table) => {
1011            table_options.insert(
1012                LOGICAL_TABLE_METADATA_KEY.to_string(),
1013                physical_table.to_string(),
1014            );
1015        }
1016        AutoCreateTableType::Physical => {
1017            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1018                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1019            }
1020            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1021                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1022            }
1023            if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1024                table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1025                // We need to set the compaction type explicitly.
1026                table_options.insert(
1027                    COMPACTION_TYPE.to_string(),
1028                    COMPACTION_TYPE_TWCS.to_string(),
1029                );
1030            }
1031        }
1032        // Set append_mode to true for log table.
1033        // because log tables should keep rows with the same ts and tags.
1034        AutoCreateTableType::Log => {
1035            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1036        }
1037        AutoCreateTableType::LastNonNull => {
1038            table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1039        }
1040        AutoCreateTableType::Trace => {
1041            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1042        }
1043    }
1044}
1045
1046pub fn build_create_table_expr(
1047    table: &TableReference,
1048    request_schema: &[ColumnSchema],
1049    engine: &str,
1050) -> Result<CreateTableExpr> {
1051    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1052}
1053
1054/// Result of `create_or_alter_tables_on_demand`.
1055struct CreateAlterTableResult {
1056    /// table ids of ttl=instant tables.
1057    instant_table_ids: HashSet<TableId>,
1058    /// Table Info of the created tables.
1059    table_infos: HashMap<TableId, Arc<TableInfo>>,
1060}
1061
1062struct FlowMirrorTask {
1063    requests: HashMap<Peer, RegionInsertRequests>,
1064    num_rows: usize,
1065}
1066
1067impl FlowMirrorTask {
1068    async fn new(
1069        cache: &TableFlownodeSetCacheRef,
1070        requests: impl Iterator<Item = &RegionInsertRequest>,
1071    ) -> Result<Self> {
1072        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1073            HashMap::new();
1074        let mut num_rows = 0;
1075
1076        for req in requests {
1077            let table_id = RegionId::from_u64(req.region_id).table_id();
1078            match src_table_reqs.get_mut(&table_id) {
1079                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1080                // already know this is not source table
1081                Some(None) => continue,
1082                _ => {
1083                    // dedup peers
1084                    let peers = cache
1085                        .get(table_id)
1086                        .await
1087                        .context(RequestInsertsSnafu)?
1088                        .unwrap_or_default()
1089                        .values()
1090                        .cloned()
1091                        .collect::<HashSet<_>>()
1092                        .into_iter()
1093                        .collect::<Vec<_>>();
1094
1095                    if !peers.is_empty() {
1096                        let mut reqs = RegionInsertRequests::default();
1097                        reqs.requests.push(req.clone());
1098                        num_rows += reqs
1099                            .requests
1100                            .iter()
1101                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1102                            .sum::<usize>();
1103                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1104                    } else {
1105                        // insert a empty entry to avoid repeat query
1106                        src_table_reqs.insert(table_id, None);
1107                    }
1108                }
1109            }
1110        }
1111
1112        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1113
1114        for (_table_id, (peers, reqs)) in src_table_reqs
1115            .into_iter()
1116            .filter_map(|(k, v)| v.map(|v| (k, v)))
1117        {
1118            if peers.len() == 1 {
1119                // fast path, zero copy
1120                inserts
1121                    .entry(peers[0].clone())
1122                    .or_default()
1123                    .requests
1124                    .extend(reqs.requests);
1125                continue;
1126            } else {
1127                // TODO(discord9): need to split requests to multiple flownodes
1128                for flownode in peers {
1129                    inserts
1130                        .entry(flownode.clone())
1131                        .or_default()
1132                        .requests
1133                        .extend(reqs.requests.clone());
1134                }
1135            }
1136        }
1137
1138        Ok(Self {
1139            requests: inserts,
1140            num_rows,
1141        })
1142    }
1143
1144    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1145        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1146        for (peer, inserts) in self.requests {
1147            let node_manager = node_manager.clone();
1148            common_runtime::spawn_global(async move {
1149                let result = node_manager
1150                    .flownode(&peer)
1151                    .await
1152                    .handle_inserts(inserts)
1153                    .await
1154                    .context(RequestInsertsSnafu);
1155
1156                match result {
1157                    Ok(resp) => {
1158                        let affected_rows = resp.affected_rows;
1159                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1160                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1161                    }
1162                    Err(err) => {
1163                        error!(err; "Failed to insert data into flownode {}", peer);
1164                    }
1165                }
1166            });
1167        }
1168
1169        Ok(())
1170    }
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175    use std::sync::Arc;
1176
1177    use api::v1::helper::{field_column_schema, time_index_column_schema};
1178    use api::v1::{RowInsertRequest, Rows, Value};
1179    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1180    use common_meta::cache::new_table_flownode_set_cache;
1181    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1182    use common_meta::test_util::MockDatanodeManager;
1183    use datatypes::data_type::ConcreteDataType;
1184    use datatypes::schema::ColumnSchema;
1185    use moka::future::Cache;
1186    use session::context::QueryContext;
1187    use table::TableRef;
1188    use table::dist_table::DummyDataSource;
1189    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1190
1191    use super::*;
1192    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1193
1194    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1195        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1196            ColumnSchema::new(
1197                ts_name,
1198                ConcreteDataType::timestamp_millisecond_datatype(),
1199                false,
1200            )
1201            .with_time_index(true),
1202            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1203        ])
1204        .unwrap()
1205        .build()
1206        .unwrap();
1207        let meta = TableMetaBuilder::empty()
1208            .schema(Arc::new(schema))
1209            .primary_key_indices(vec![])
1210            .value_indices(vec![1])
1211            .engine("mito")
1212            .next_column_id(0)
1213            .options(Default::default())
1214            .created_on(Default::default())
1215            .region_numbers(vec![0])
1216            .build()
1217            .unwrap();
1218        let info = Arc::new(
1219            TableInfoBuilder::default()
1220                .table_id(1)
1221                .table_version(0)
1222                .name("test_table")
1223                .schema_name(DEFAULT_SCHEMA_NAME)
1224                .catalog_name(DEFAULT_CATALOG_NAME)
1225                .desc(None)
1226                .table_type(TableType::Base)
1227                .meta(meta)
1228                .build()
1229                .unwrap(),
1230        );
1231        Arc::new(table::Table::new(
1232            info,
1233            table::metadata::FilterPushDownType::Unsupported,
1234            Arc::new(DummyDataSource),
1235        ))
1236    }
1237
1238    #[tokio::test]
1239    async fn test_accommodate_existing_schema_logic() {
1240        let ts_name = "my_ts";
1241        let field_name = "my_field";
1242        let table = make_table_ref_with_schema(ts_name, field_name);
1243
1244        // The request uses different names for timestamp and field columns
1245        let mut req = RowInsertRequest {
1246            table_name: "test_table".to_string(),
1247            rows: Some(Rows {
1248                schema: vec![
1249                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1250                    field_column_schema("field_wrong", ColumnDataType::Float64),
1251                ],
1252                rows: vec![api::v1::Row {
1253                    values: vec![Value::default(), Value::default()],
1254                }],
1255            }),
1256        };
1257        let ctx = Arc::new(QueryContext::with(
1258            DEFAULT_CATALOG_NAME,
1259            DEFAULT_SCHEMA_NAME,
1260        ));
1261
1262        let kv_backend = prepare_mocked_backend().await;
1263        let inserter = Inserter::new(
1264            catalog::memory::MemoryCatalogManager::new(),
1265            create_partition_rule_manager(kv_backend.clone()).await,
1266            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1267            Arc::new(new_table_flownode_set_cache(
1268                String::new(),
1269                Cache::new(100),
1270                kv_backend.clone(),
1271            )),
1272        );
1273        let alter_expr = inserter
1274            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1275            .unwrap();
1276        assert!(alter_expr.is_none());
1277
1278        // The request's schema should have updated names for timestamp and field columns
1279        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1280        assert_eq!(req_schema[0].column_name, ts_name);
1281        assert_eq!(req_schema[1].column_name, field_name);
1282    }
1283}