operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32    TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33    trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57    APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TTL_KEY,
58    TWCS_TIME_WINDOW,
59};
60use store_api::storage::{RegionId, TableId};
61use table::TableRef;
62use table::metadata::TableInfo;
63use table::requests::{
64    AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
65    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
66};
67use table::table_reference::TableReference;
68
69use crate::error::{
70    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
71    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
72};
73use crate::expr_helper;
74use crate::region_req_factory::RegionRequestFactory;
75use crate::req_convert::common::preprocess_row_insert_requests;
76use crate::req_convert::insert::{
77    ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
78};
79use crate::statement::StatementExecutor;
80
81pub struct Inserter {
82    catalog_manager: CatalogManagerRef,
83    pub(crate) partition_manager: PartitionRuleManagerRef,
84    pub(crate) node_manager: NodeManagerRef,
85    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
86}
87
88pub type InserterRef = Arc<Inserter>;
89
90/// Hint for the table type to create automatically.
91#[derive(Clone)]
92pub enum AutoCreateTableType {
93    /// A logical table with the physical table name.
94    Logical(String),
95    /// A physical table.
96    Physical,
97    /// A log table which is append-only.
98    Log,
99    /// A table that merges rows by `last_non_null` strategy.
100    LastNonNull,
101    /// Create table that build index and default partition rules on trace_id
102    Trace,
103}
104
105impl AutoCreateTableType {
106    fn as_str(&self) -> &'static str {
107        match self {
108            AutoCreateTableType::Logical(_) => "logical",
109            AutoCreateTableType::Physical => "physical",
110            AutoCreateTableType::Log => "log",
111            AutoCreateTableType::LastNonNull => "last_non_null",
112            AutoCreateTableType::Trace => "trace",
113        }
114    }
115}
116
117/// Split insert requests into normal and instant requests.
118///
119/// Where instant requests are requests with ttl=instant,
120/// and normal requests are requests with ttl set to other values.
121///
122/// This is used to split requests for different processing.
123#[derive(Clone)]
124pub struct InstantAndNormalInsertRequests {
125    /// Requests with normal ttl.
126    pub normal_requests: RegionInsertRequests,
127    /// Requests with ttl=instant.
128    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
129    pub instant_requests: RegionInsertRequests,
130}
131
132impl Inserter {
133    pub fn new(
134        catalog_manager: CatalogManagerRef,
135        partition_manager: PartitionRuleManagerRef,
136        node_manager: NodeManagerRef,
137        table_flownode_set_cache: TableFlownodeSetCacheRef,
138    ) -> Self {
139        Self {
140            catalog_manager,
141            partition_manager,
142            node_manager,
143            table_flownode_set_cache,
144        }
145    }
146
147    pub async fn handle_column_inserts(
148        &self,
149        requests: InsertRequests,
150        ctx: QueryContextRef,
151        statement_executor: &StatementExecutor,
152    ) -> Result<Output> {
153        let row_inserts = ColumnToRow::convert(requests)?;
154        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
155            .await
156    }
157
158    /// Handles row inserts request and creates a physical table on demand.
159    pub async fn handle_row_inserts(
160        &self,
161        mut requests: RowInsertRequests,
162        ctx: QueryContextRef,
163        statement_executor: &StatementExecutor,
164        accommodate_existing_schema: bool,
165        is_single_value: bool,
166    ) -> Result<Output> {
167        preprocess_row_insert_requests(&mut requests.inserts)?;
168        self.handle_row_inserts_with_create_type(
169            requests,
170            ctx,
171            statement_executor,
172            AutoCreateTableType::Physical,
173            accommodate_existing_schema,
174            is_single_value,
175        )
176        .await
177    }
178
179    /// Handles row inserts request and creates a log table on demand.
180    pub async fn handle_log_inserts(
181        &self,
182        requests: RowInsertRequests,
183        ctx: QueryContextRef,
184        statement_executor: &StatementExecutor,
185    ) -> Result<Output> {
186        self.handle_row_inserts_with_create_type(
187            requests,
188            ctx,
189            statement_executor,
190            AutoCreateTableType::Log,
191            false,
192            false,
193        )
194        .await
195    }
196
197    pub async fn handle_trace_inserts(
198        &self,
199        requests: RowInsertRequests,
200        ctx: QueryContextRef,
201        statement_executor: &StatementExecutor,
202    ) -> Result<Output> {
203        self.handle_row_inserts_with_create_type(
204            requests,
205            ctx,
206            statement_executor,
207            AutoCreateTableType::Trace,
208            false,
209            false,
210        )
211        .await
212    }
213
214    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
215    pub async fn handle_last_non_null_inserts(
216        &self,
217        requests: RowInsertRequests,
218        ctx: QueryContextRef,
219        statement_executor: &StatementExecutor,
220        accommodate_existing_schema: bool,
221        is_single_value: bool,
222    ) -> Result<Output> {
223        self.handle_row_inserts_with_create_type(
224            requests,
225            ctx,
226            statement_executor,
227            AutoCreateTableType::LastNonNull,
228            accommodate_existing_schema,
229            is_single_value,
230        )
231        .await
232    }
233
234    /// Handles row inserts request with specified [AutoCreateTableType].
235    async fn handle_row_inserts_with_create_type(
236        &self,
237        mut requests: RowInsertRequests,
238        ctx: QueryContextRef,
239        statement_executor: &StatementExecutor,
240        create_type: AutoCreateTableType,
241        accommodate_existing_schema: bool,
242        is_single_value: bool,
243    ) -> Result<Output> {
244        // remove empty requests
245        requests.inserts.retain(|req| {
246            req.rows
247                .as_ref()
248                .map(|r| !r.rows.is_empty())
249                .unwrap_or_default()
250        });
251        validate_column_count_match(&requests)?;
252
253        let CreateAlterTableResult {
254            instant_table_ids,
255            table_infos,
256        } = self
257            .create_or_alter_tables_on_demand(
258                &mut requests,
259                &ctx,
260                create_type,
261                statement_executor,
262                accommodate_existing_schema,
263                is_single_value,
264            )
265            .await?;
266
267        let name_to_info = table_infos
268            .values()
269            .map(|info| (info.name.clone(), info.clone()))
270            .collect::<HashMap<_, _>>();
271        let inserts = RowToRegion::new(
272            name_to_info,
273            instant_table_ids,
274            self.partition_manager.as_ref(),
275        )
276        .convert(requests)
277        .await?;
278
279        self.do_request(inserts, &table_infos, &ctx).await
280    }
281
282    /// Handles row inserts request with metric engine.
283    pub async fn handle_metric_row_inserts(
284        &self,
285        mut requests: RowInsertRequests,
286        ctx: QueryContextRef,
287        statement_executor: &StatementExecutor,
288        physical_table: String,
289    ) -> Result<Output> {
290        // remove empty requests
291        requests.inserts.retain(|req| {
292            req.rows
293                .as_ref()
294                .map(|r| !r.rows.is_empty())
295                .unwrap_or_default()
296        });
297        validate_column_count_match(&requests)?;
298
299        // check and create physical table
300        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
301            .await?;
302
303        // check and create logical tables
304        let CreateAlterTableResult {
305            instant_table_ids,
306            table_infos,
307        } = self
308            .create_or_alter_tables_on_demand(
309                &mut requests,
310                &ctx,
311                AutoCreateTableType::Logical(physical_table.clone()),
312                statement_executor,
313                true,
314                true,
315            )
316            .await?;
317        let name_to_info = table_infos
318            .values()
319            .map(|info| (info.name.clone(), info.clone()))
320            .collect::<HashMap<_, _>>();
321        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
322            .convert(requests)
323            .await?;
324
325        self.do_request(inserts, &table_infos, &ctx).await
326    }
327
328    pub async fn handle_table_insert(
329        &self,
330        request: TableInsertRequest,
331        ctx: QueryContextRef,
332    ) -> Result<Output> {
333        let catalog = request.catalog_name.as_str();
334        let schema = request.schema_name.as_str();
335        let table_name = request.table_name.as_str();
336        let table = self.get_table(catalog, schema, table_name).await?;
337        let table = table.with_context(|| TableNotFoundSnafu {
338            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
339        })?;
340        let table_info = table.table_info();
341
342        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
343            .convert(request)
344            .await?;
345
346        let table_infos =
347            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
348
349        self.do_request(inserts, &table_infos, &ctx).await
350    }
351
352    pub async fn handle_statement_insert(
353        &self,
354        insert: &Insert,
355        ctx: &QueryContextRef,
356        statement_executor: &StatementExecutor,
357    ) -> Result<Output> {
358        let (inserts, table_info) =
359            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
360                .convert(insert, ctx, statement_executor)
361                .await?;
362
363        let table_infos =
364            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
365
366        self.do_request(inserts, &table_infos, ctx).await
367    }
368}
369
370impl Inserter {
371    async fn do_request(
372        &self,
373        requests: InstantAndNormalInsertRequests,
374        table_infos: &HashMap<TableId, Arc<TableInfo>>,
375        ctx: &QueryContextRef,
376    ) -> Result<Output> {
377        // Fill impure default values in the request
378        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
379
380        let write_cost = write_meter!(
381            ctx.current_catalog(),
382            ctx.current_schema(),
383            requests,
384            ctx.channel() as u8
385        );
386        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
387            tracing_context: TracingContext::from_current_span().to_w3c(),
388            dbname: ctx.get_db_string(),
389            ..Default::default()
390        });
391
392        let InstantAndNormalInsertRequests {
393            normal_requests,
394            instant_requests,
395        } = requests;
396
397        // Mirror requests for source table to flownode asynchronously
398        let flow_mirror_task = FlowMirrorTask::new(
399            &self.table_flownode_set_cache,
400            normal_requests
401                .requests
402                .iter()
403                .chain(instant_requests.requests.iter()),
404        )
405        .await?;
406        flow_mirror_task.detach(self.node_manager.clone())?;
407
408        // Write requests to datanode and wait for response
409        let write_tasks = self
410            .group_requests_by_peer(normal_requests)
411            .await?
412            .into_iter()
413            .map(|(peer, inserts)| {
414                let node_manager = self.node_manager.clone();
415                let request = request_factory.build_insert(inserts);
416                common_runtime::spawn_global(async move {
417                    node_manager
418                        .datanode(&peer)
419                        .await
420                        .handle(request)
421                        .await
422                        .context(RequestInsertsSnafu)
423                })
424            });
425        let results = future::try_join_all(write_tasks)
426            .await
427            .context(JoinTaskSnafu)?;
428        let affected_rows = results
429            .into_iter()
430            .map(|resp| resp.map(|r| r.affected_rows))
431            .sum::<Result<AffectedRows>>()?;
432        crate::metrics::DIST_INGEST_ROW_COUNT
433            .with_label_values(&[ctx.get_db_string().as_str()])
434            .inc_by(affected_rows as u64);
435        Ok(Output::new(
436            OutputData::AffectedRows(affected_rows),
437            OutputMeta::new_with_cost(write_cost as _),
438        ))
439    }
440
441    async fn group_requests_by_peer(
442        &self,
443        requests: RegionInsertRequests,
444    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
445        // group by region ids first to reduce repeatedly call `find_region_leader`
446        // TODO(discord9): determine if a addition clone is worth it
447        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
448        for req in requests.requests {
449            let region_id = RegionId::from_u64(req.region_id);
450            requests_per_region
451                .entry(region_id)
452                .or_default()
453                .requests
454                .push(req);
455        }
456
457        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
458
459        for (region_id, reqs) in requests_per_region {
460            let peer = self
461                .partition_manager
462                .find_region_leader(region_id)
463                .await
464                .context(FindRegionLeaderSnafu)?;
465            inserts
466                .entry(peer)
467                .or_default()
468                .requests
469                .extend(reqs.requests);
470        }
471
472        Ok(inserts)
473    }
474
475    /// Creates or alter tables on demand:
476    /// - if table does not exist, create table by inferred CreateExpr
477    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
478    ///
479    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
480    /// This mapping is used in the conversion of RowToRegion.
481    ///
482    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
483    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
484    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
485    /// remote write. This will modify the `RowInsertRequests` in place.
486    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
487    async fn create_or_alter_tables_on_demand(
488        &self,
489        requests: &mut RowInsertRequests,
490        ctx: &QueryContextRef,
491        auto_create_table_type: AutoCreateTableType,
492        statement_executor: &StatementExecutor,
493        accommodate_existing_schema: bool,
494        is_single_value: bool,
495    ) -> Result<CreateAlterTableResult> {
496        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
497            .with_label_values(&[auto_create_table_type.as_str()])
498            .start_timer();
499
500        let catalog = ctx.current_catalog();
501        let schema = ctx.current_schema();
502
503        let mut table_infos = HashMap::new();
504        // If `auto_create_table` hint is disabled, skip creating/altering tables.
505        let auto_create_table_hint = ctx
506            .extension(AUTO_CREATE_TABLE_KEY)
507            .map(|v| v.parse::<bool>())
508            .transpose()
509            .map_err(|_| {
510                InvalidInsertRequestSnafu {
511                    reason: "`auto_create_table` hint must be a boolean",
512                }
513                .build()
514            })?
515            .unwrap_or(true);
516        if !auto_create_table_hint {
517            let mut instant_table_ids = HashSet::new();
518            for req in &requests.inserts {
519                let table = self
520                    .get_table(catalog, &schema, &req.table_name)
521                    .await?
522                    .context(InvalidInsertRequestSnafu {
523                        reason: format!(
524                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
525                            req.table_name
526                        ),
527                    })?;
528                let table_info = table.table_info();
529                if table_info.is_ttl_instant_table() {
530                    instant_table_ids.insert(table_info.table_id());
531                }
532                table_infos.insert(table_info.table_id(), table.table_info());
533            }
534            let ret = CreateAlterTableResult {
535                instant_table_ids,
536                table_infos,
537            };
538            return Ok(ret);
539        }
540
541        let mut create_tables = vec![];
542        let mut alter_tables = vec![];
543        let mut need_refresh_table_infos = HashSet::new();
544        let mut instant_table_ids = HashSet::new();
545
546        for req in &mut requests.inserts {
547            match self.get_table(catalog, &schema, &req.table_name).await? {
548                Some(table) => {
549                    let table_info = table.table_info();
550                    if table_info.is_ttl_instant_table() {
551                        instant_table_ids.insert(table_info.table_id());
552                    }
553                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
554                        req,
555                        &table,
556                        ctx,
557                        accommodate_existing_schema,
558                        is_single_value,
559                    )? {
560                        alter_tables.push(alter_expr);
561                        need_refresh_table_infos.insert((
562                            catalog.to_string(),
563                            schema.clone(),
564                            req.table_name.clone(),
565                        ));
566                    } else {
567                        table_infos.insert(table_info.table_id(), table.table_info());
568                    }
569                }
570                None => {
571                    let create_expr =
572                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
573                    create_tables.push(create_expr);
574                }
575            }
576        }
577
578        match auto_create_table_type {
579            AutoCreateTableType::Logical(_) => {
580                if !create_tables.is_empty() {
581                    // Creates logical tables in batch.
582                    let tables = self
583                        .create_logical_tables(create_tables, ctx, statement_executor)
584                        .await?;
585
586                    for table in tables {
587                        let table_info = table.table_info();
588                        if table_info.is_ttl_instant_table() {
589                            instant_table_ids.insert(table_info.table_id());
590                        }
591                        table_infos.insert(table_info.table_id(), table.table_info());
592                    }
593                }
594                if !alter_tables.is_empty() {
595                    // Alter logical tables in batch.
596                    statement_executor
597                        .alter_logical_tables(alter_tables, ctx.clone())
598                        .await?;
599                }
600            }
601            AutoCreateTableType::Physical
602            | AutoCreateTableType::Log
603            | AutoCreateTableType::LastNonNull => {
604                // note that auto create table shouldn't be ttl instant table
605                // for it's a very unexpected behavior and should be set by user explicitly
606                for create_table in create_tables {
607                    let table = self
608                        .create_physical_table(create_table, None, ctx, statement_executor)
609                        .await?;
610                    let table_info = table.table_info();
611                    if table_info.is_ttl_instant_table() {
612                        instant_table_ids.insert(table_info.table_id());
613                    }
614                    table_infos.insert(table_info.table_id(), table.table_info());
615                }
616                for alter_expr in alter_tables.into_iter() {
617                    statement_executor
618                        .alter_table_inner(alter_expr, ctx.clone())
619                        .await?;
620                }
621            }
622
623            AutoCreateTableType::Trace => {
624                let trace_table_name = ctx
625                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
626                    .unwrap_or(TRACE_TABLE_NAME);
627
628                // note that auto create table shouldn't be ttl instant table
629                // for it's a very unexpected behavior and should be set by user explicitly
630                for mut create_table in create_tables {
631                    if create_table.table_name == trace_services_table_name(trace_table_name)
632                        || create_table.table_name == trace_operations_table_name(trace_table_name)
633                    {
634                        // Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
635                        create_table
636                            .table_options
637                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
638                        // Remove `ttl` key from table options if it exists
639                        create_table.table_options.remove(TTL_KEY);
640
641                        let table = self
642                            .create_physical_table(create_table, None, ctx, statement_executor)
643                            .await?;
644                        let table_info = table.table_info();
645                        if table_info.is_ttl_instant_table() {
646                            instant_table_ids.insert(table_info.table_id());
647                        }
648                        table_infos.insert(table_info.table_id(), table.table_info());
649                    } else {
650                        // prebuilt partition rules for uuid data: see the function
651                        // for more information
652                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
653                            .context(CreatePartitionRulesSnafu)?;
654                        // add skip index to
655                        // - trace_id: when searching by trace id
656                        // - parent_span_id: when searching root span
657                        // - span_name: when searching certain types of span
658                        let index_columns =
659                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
660                        for index_column in index_columns {
661                            if let Some(col) = create_table
662                                .column_defs
663                                .iter_mut()
664                                .find(|c| c.name == index_column)
665                            {
666                                col.options =
667                                    options_from_skipping(&SkippingIndexOptions::default())
668                                        .context(ColumnOptionsSnafu)?;
669                            } else {
670                                warn!(
671                                    "Column {} not found when creating index for trace table: {}.",
672                                    index_column, create_table.table_name
673                                );
674                            }
675                        }
676
677                        // use table_options to mark table model version
678                        create_table.table_options.insert(
679                            TABLE_DATA_MODEL.to_string(),
680                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
681                        );
682
683                        let table = self
684                            .create_physical_table(
685                                create_table,
686                                Some(partitions),
687                                ctx,
688                                statement_executor,
689                            )
690                            .await?;
691                        let table_info = table.table_info();
692                        if table_info.is_ttl_instant_table() {
693                            instant_table_ids.insert(table_info.table_id());
694                        }
695                        table_infos.insert(table_info.table_id(), table.table_info());
696                    }
697                }
698                for alter_expr in alter_tables.into_iter() {
699                    statement_executor
700                        .alter_table_inner(alter_expr, ctx.clone())
701                        .await?;
702                }
703            }
704        }
705
706        // refresh table infos for altered tables
707        for (catalog, schema, table_name) in need_refresh_table_infos {
708            let table = self
709                .get_table(&catalog, &schema, &table_name)
710                .await?
711                .context(TableNotFoundSnafu {
712                    table_name: common_catalog::format_full_table_name(
713                        &catalog,
714                        &schema,
715                        &table_name,
716                    ),
717                })?;
718            let table_info = table.table_info();
719            table_infos.insert(table_info.table_id(), table.table_info());
720        }
721
722        Ok(CreateAlterTableResult {
723            instant_table_ids,
724            table_infos,
725        })
726    }
727
728    async fn create_physical_table_on_demand(
729        &self,
730        ctx: &QueryContextRef,
731        physical_table: String,
732        statement_executor: &StatementExecutor,
733    ) -> Result<()> {
734        let catalog_name = ctx.current_catalog();
735        let schema_name = ctx.current_schema();
736
737        // check if exist
738        if self
739            .get_table(catalog_name, &schema_name, &physical_table)
740            .await?
741            .is_some()
742        {
743            return Ok(());
744        }
745
746        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
747        info!("Physical metric table `{table_reference}` does not exist, try creating table");
748
749        // schema with timestamp and field column
750        let default_schema = vec![
751            ColumnSchema {
752                column_name: greptime_timestamp().to_string(),
753                datatype: ColumnDataType::TimestampMillisecond as _,
754                semantic_type: SemanticType::Timestamp as _,
755                datatype_extension: None,
756                options: None,
757            },
758            ColumnSchema {
759                column_name: greptime_value().to_string(),
760                datatype: ColumnDataType::Float64 as _,
761                semantic_type: SemanticType::Field as _,
762                datatype_extension: None,
763                options: None,
764            },
765        ];
766        let create_table_expr =
767            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
768
769        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
770        create_table_expr
771            .table_options
772            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
773
774        // create physical table
775        let res = statement_executor
776            .create_table_inner(create_table_expr, None, ctx.clone())
777            .await;
778
779        match res {
780            Ok(_) => {
781                info!("Successfully created table {table_reference}",);
782                Ok(())
783            }
784            Err(err) => {
785                error!(err; "Failed to create table {table_reference}");
786                Err(err)
787            }
788        }
789    }
790
791    async fn get_table(
792        &self,
793        catalog: &str,
794        schema: &str,
795        table: &str,
796    ) -> Result<Option<TableRef>> {
797        self.catalog_manager
798            .table(catalog, schema, table, None)
799            .await
800            .context(CatalogSnafu)
801    }
802
803    fn get_create_table_expr_on_demand(
804        &self,
805        req: &RowInsertRequest,
806        create_type: &AutoCreateTableType,
807        ctx: &QueryContextRef,
808    ) -> Result<CreateTableExpr> {
809        let mut table_options = std::collections::HashMap::with_capacity(4);
810        fill_table_options_for_create(&mut table_options, create_type, ctx);
811
812        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
813            // engine should be metric engine when creating logical tables.
814            METRIC_ENGINE_NAME
815        } else {
816            default_engine()
817        };
818
819        let schema = ctx.current_schema();
820        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
821        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
822        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
823        let mut create_table_expr =
824            build_create_table_expr(&table_ref, request_schema, engine_name)?;
825
826        info!("Table `{table_ref}` does not exist, try creating table");
827        create_table_expr.table_options.extend(table_options);
828        Ok(create_table_expr)
829    }
830
831    /// Returns an alter table expression if it finds new columns in the request.
832    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
833    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
834    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
835    /// for more details.
836    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
837    /// input `req`.
838    fn get_alter_table_expr_on_demand(
839        &self,
840        req: &mut RowInsertRequest,
841        table: &TableRef,
842        ctx: &QueryContextRef,
843        accommodate_existing_schema: bool,
844        is_single_value: bool,
845    ) -> Result<Option<AlterTableExpr>> {
846        let catalog_name = ctx.current_catalog();
847        let schema_name = ctx.current_schema();
848        let table_name = table.table_info().name.clone();
849
850        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
851        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
852        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
853        let Some(mut add_columns) = add_columns else {
854            return Ok(None);
855        };
856
857        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
858        if accommodate_existing_schema {
859            let table_schema = table.schema();
860            // Find timestamp column name
861            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
862            // Find field column name if there is only one and `is_single_value` is true.
863            let mut field_col_name = None;
864            if is_single_value {
865                let mut multiple_field_cols = false;
866                table.field_columns().for_each(|col| {
867                    if field_col_name.is_none() {
868                        field_col_name = Some(col.name.clone());
869                    } else {
870                        multiple_field_cols = true;
871                    }
872                });
873                if multiple_field_cols {
874                    field_col_name = None;
875                }
876            }
877
878            // Update column name in request schema for Timestamp/Field columns
879            if let Some(rows) = req.rows.as_mut() {
880                for col in &mut rows.schema {
881                    match col.semantic_type {
882                        x if x == SemanticType::Timestamp as i32 => {
883                            if let Some(ref ts_name) = ts_col_name
884                                && col.column_name != *ts_name
885                            {
886                                col.column_name = ts_name.clone();
887                            }
888                        }
889                        x if x == SemanticType::Field as i32 => {
890                            if let Some(ref field_name) = field_col_name
891                                && col.column_name != *field_name
892                            {
893                                col.column_name = field_name.clone();
894                            }
895                        }
896                        _ => {}
897                    }
898                }
899            }
900
901            // Only keep columns that are tags or non-single field.
902            add_columns.add_columns.retain(|col| {
903                let def = col.column_def.as_ref().unwrap();
904                def.semantic_type == SemanticType::Tag as i32
905                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
906            });
907
908            if add_columns.add_columns.is_empty() {
909                return Ok(None);
910            }
911        }
912
913        Ok(Some(AlterTableExpr {
914            catalog_name: catalog_name.to_string(),
915            schema_name: schema_name.clone(),
916            table_name: table_name.clone(),
917            kind: Some(Kind::AddColumns(add_columns)),
918        }))
919    }
920
921    /// Creates a table with options.
922    async fn create_physical_table(
923        &self,
924        mut create_table_expr: CreateTableExpr,
925        partitions: Option<Partitions>,
926        ctx: &QueryContextRef,
927        statement_executor: &StatementExecutor,
928    ) -> Result<TableRef> {
929        {
930            let table_ref = TableReference::full(
931                &create_table_expr.catalog_name,
932                &create_table_expr.schema_name,
933                &create_table_expr.table_name,
934            );
935
936            info!("Table `{table_ref}` does not exist, try creating table");
937        }
938        let res = statement_executor
939            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
940            .await;
941
942        let table_ref = TableReference::full(
943            &create_table_expr.catalog_name,
944            &create_table_expr.schema_name,
945            &create_table_expr.table_name,
946        );
947
948        match res {
949            Ok(table) => {
950                info!(
951                    "Successfully created table {} with options: {:?}",
952                    table_ref, create_table_expr.table_options,
953                );
954                Ok(table)
955            }
956            Err(err) => {
957                error!(err; "Failed to create table {}", table_ref);
958                Err(err)
959            }
960        }
961    }
962
963    async fn create_logical_tables(
964        &self,
965        create_table_exprs: Vec<CreateTableExpr>,
966        ctx: &QueryContextRef,
967        statement_executor: &StatementExecutor,
968    ) -> Result<Vec<TableRef>> {
969        let res = statement_executor
970            .create_logical_tables(&create_table_exprs, ctx.clone())
971            .await;
972
973        match res {
974            Ok(res) => {
975                info!("Successfully created logical tables");
976                Ok(res)
977            }
978            Err(err) => {
979                let failed_tables = create_table_exprs
980                    .into_iter()
981                    .map(|expr| {
982                        format!(
983                            "{}.{}.{}",
984                            expr.catalog_name, expr.schema_name, expr.table_name
985                        )
986                    })
987                    .collect::<Vec<_>>();
988                error!(
989                    err;
990                    "Failed to create logical tables {:?}",
991                    failed_tables
992                );
993                Err(err)
994            }
995        }
996    }
997
998    pub fn node_manager(&self) -> &NodeManagerRef {
999        &self.node_manager
1000    }
1001
1002    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
1003        &self.partition_manager
1004    }
1005}
1006
1007fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
1008    for request in &requests.inserts {
1009        let rows = request.rows.as_ref().unwrap();
1010        let column_count = rows.schema.len();
1011        rows.rows.iter().try_for_each(|r| {
1012            ensure!(
1013                r.values.len() == column_count,
1014                InvalidInsertRequestSnafu {
1015                    reason: format!(
1016                        "column count mismatch, columns: {}, values: {}",
1017                        column_count,
1018                        r.values.len()
1019                    )
1020                }
1021            );
1022            Ok(())
1023        })?;
1024    }
1025    Ok(())
1026}
1027
1028/// Fill table options for a new table by create type.
1029pub fn fill_table_options_for_create(
1030    table_options: &mut std::collections::HashMap<String, String>,
1031    create_type: &AutoCreateTableType,
1032    ctx: &QueryContextRef,
1033) {
1034    for key in VALID_TABLE_OPTION_KEYS {
1035        if let Some(value) = ctx.extension(key) {
1036            table_options.insert(key.to_string(), value.to_string());
1037        }
1038    }
1039
1040    match create_type {
1041        AutoCreateTableType::Logical(physical_table) => {
1042            table_options.insert(
1043                LOGICAL_TABLE_METADATA_KEY.to_string(),
1044                physical_table.clone(),
1045            );
1046        }
1047        AutoCreateTableType::Physical => {
1048            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1049                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1050            }
1051            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1052                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1053            }
1054            if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1055                table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1056                // We need to set the compaction type explicitly.
1057                table_options.insert(
1058                    COMPACTION_TYPE.to_string(),
1059                    COMPACTION_TYPE_TWCS.to_string(),
1060                );
1061            }
1062        }
1063        // Set append_mode to true for log table.
1064        // because log tables should keep rows with the same ts and tags.
1065        AutoCreateTableType::Log => {
1066            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1067        }
1068        AutoCreateTableType::LastNonNull => {
1069            table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1070        }
1071        AutoCreateTableType::Trace => {
1072            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1073        }
1074    }
1075}
1076
1077pub fn build_create_table_expr(
1078    table: &TableReference,
1079    request_schema: &[ColumnSchema],
1080    engine: &str,
1081) -> Result<CreateTableExpr> {
1082    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1083}
1084
1085/// Result of `create_or_alter_tables_on_demand`.
1086struct CreateAlterTableResult {
1087    /// table ids of ttl=instant tables.
1088    instant_table_ids: HashSet<TableId>,
1089    /// Table Info of the created tables.
1090    table_infos: HashMap<TableId, Arc<TableInfo>>,
1091}
1092
1093struct FlowMirrorTask {
1094    requests: HashMap<Peer, RegionInsertRequests>,
1095    num_rows: usize,
1096}
1097
1098impl FlowMirrorTask {
1099    async fn new(
1100        cache: &TableFlownodeSetCacheRef,
1101        requests: impl Iterator<Item = &RegionInsertRequest>,
1102    ) -> Result<Self> {
1103        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1104            HashMap::new();
1105        let mut num_rows = 0;
1106
1107        for req in requests {
1108            let table_id = RegionId::from_u64(req.region_id).table_id();
1109            match src_table_reqs.get_mut(&table_id) {
1110                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1111                // already know this is not source table
1112                Some(None) => continue,
1113                _ => {
1114                    // dedup peers
1115                    let peers = cache
1116                        .get(table_id)
1117                        .await
1118                        .context(RequestInsertsSnafu)?
1119                        .unwrap_or_default()
1120                        .values()
1121                        .cloned()
1122                        .collect::<HashSet<_>>()
1123                        .into_iter()
1124                        .collect::<Vec<_>>();
1125
1126                    if !peers.is_empty() {
1127                        let mut reqs = RegionInsertRequests::default();
1128                        reqs.requests.push(req.clone());
1129                        num_rows += reqs
1130                            .requests
1131                            .iter()
1132                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1133                            .sum::<usize>();
1134                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1135                    } else {
1136                        // insert a empty entry to avoid repeat query
1137                        src_table_reqs.insert(table_id, None);
1138                    }
1139                }
1140            }
1141        }
1142
1143        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1144
1145        for (_table_id, (peers, reqs)) in src_table_reqs
1146            .into_iter()
1147            .filter_map(|(k, v)| v.map(|v| (k, v)))
1148        {
1149            if peers.len() == 1 {
1150                // fast path, zero copy
1151                inserts
1152                    .entry(peers[0].clone())
1153                    .or_default()
1154                    .requests
1155                    .extend(reqs.requests);
1156                continue;
1157            } else {
1158                // TODO(discord9): need to split requests to multiple flownodes
1159                for flownode in peers {
1160                    inserts
1161                        .entry(flownode.clone())
1162                        .or_default()
1163                        .requests
1164                        .extend(reqs.requests.clone());
1165                }
1166            }
1167        }
1168
1169        Ok(Self {
1170            requests: inserts,
1171            num_rows,
1172        })
1173    }
1174
1175    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1176        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1177        for (peer, inserts) in self.requests {
1178            let node_manager = node_manager.clone();
1179            common_runtime::spawn_global(async move {
1180                let result = node_manager
1181                    .flownode(&peer)
1182                    .await
1183                    .handle_inserts(inserts)
1184                    .await
1185                    .context(RequestInsertsSnafu);
1186
1187                match result {
1188                    Ok(resp) => {
1189                        let affected_rows = resp.affected_rows;
1190                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1191                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1192                    }
1193                    Err(err) => {
1194                        error!(err; "Failed to insert data into flownode {}", peer);
1195                    }
1196                }
1197            });
1198        }
1199
1200        Ok(())
1201    }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206    use std::sync::Arc;
1207
1208    use api::v1::helper::{field_column_schema, time_index_column_schema};
1209    use api::v1::{RowInsertRequest, Rows, Value};
1210    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1211    use common_meta::cache::new_table_flownode_set_cache;
1212    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1213    use common_meta::test_util::MockDatanodeManager;
1214    use datatypes::data_type::ConcreteDataType;
1215    use datatypes::schema::ColumnSchema;
1216    use moka::future::Cache;
1217    use session::context::QueryContext;
1218    use table::TableRef;
1219    use table::dist_table::DummyDataSource;
1220    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1221
1222    use super::*;
1223    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1224
1225    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1226        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1227            ColumnSchema::new(
1228                ts_name,
1229                ConcreteDataType::timestamp_millisecond_datatype(),
1230                false,
1231            )
1232            .with_time_index(true),
1233            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1234        ])
1235        .unwrap()
1236        .build()
1237        .unwrap();
1238        let meta = TableMetaBuilder::empty()
1239            .schema(Arc::new(schema))
1240            .primary_key_indices(vec![])
1241            .value_indices(vec![1])
1242            .engine("mito")
1243            .next_column_id(0)
1244            .options(Default::default())
1245            .created_on(Default::default())
1246            .build()
1247            .unwrap();
1248        let info = Arc::new(
1249            TableInfoBuilder::default()
1250                .table_id(1)
1251                .table_version(0)
1252                .name("test_table")
1253                .schema_name(DEFAULT_SCHEMA_NAME)
1254                .catalog_name(DEFAULT_CATALOG_NAME)
1255                .desc(None)
1256                .table_type(TableType::Base)
1257                .meta(meta)
1258                .build()
1259                .unwrap(),
1260        );
1261        Arc::new(table::Table::new(
1262            info,
1263            table::metadata::FilterPushDownType::Unsupported,
1264            Arc::new(DummyDataSource),
1265        ))
1266    }
1267
1268    #[tokio::test]
1269    async fn test_accommodate_existing_schema_logic() {
1270        let ts_name = "my_ts";
1271        let field_name = "my_field";
1272        let table = make_table_ref_with_schema(ts_name, field_name);
1273
1274        // The request uses different names for timestamp and field columns
1275        let mut req = RowInsertRequest {
1276            table_name: "test_table".to_string(),
1277            rows: Some(Rows {
1278                schema: vec![
1279                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1280                    field_column_schema("field_wrong", ColumnDataType::Float64),
1281                ],
1282                rows: vec![api::v1::Row {
1283                    values: vec![Value::default(), Value::default()],
1284                }],
1285            }),
1286        };
1287        let ctx = Arc::new(QueryContext::with(
1288            DEFAULT_CATALOG_NAME,
1289            DEFAULT_SCHEMA_NAME,
1290        ));
1291
1292        let kv_backend = prepare_mocked_backend().await;
1293        let inserter = Inserter::new(
1294            catalog::memory::MemoryCatalogManager::new(),
1295            create_partition_rule_manager(kv_backend.clone()).await,
1296            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1297            Arc::new(new_table_flownode_set_cache(
1298                String::new(),
1299                Cache::new(100),
1300                kv_backend.clone(),
1301            )),
1302        );
1303        let alter_expr = inserter
1304            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1305            .unwrap();
1306        assert!(alter_expr.is_none());
1307
1308        // The request's schema should have updated names for timestamp and field columns
1309        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1310        assert_eq!(req_schema[0].column_name, ts_name);
1311        assert_eq!(req_schema[1].column_name, field_name);
1312    }
1313}