operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32    TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33    trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57    APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TWCS_TIME_WINDOW,
58};
59use store_api::storage::{RegionId, TableId};
60use table::TableRef;
61use table::metadata::TableInfo;
62use table::requests::{
63    AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
64    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
65};
66use table::table_reference::TableReference;
67
68use crate::error::{
69    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
70    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
71};
72use crate::expr_helper;
73use crate::region_req_factory::RegionRequestFactory;
74use crate::req_convert::common::preprocess_row_insert_requests;
75use crate::req_convert::insert::{
76    ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
77};
78use crate::statement::StatementExecutor;
79
80pub struct Inserter {
81    catalog_manager: CatalogManagerRef,
82    pub(crate) partition_manager: PartitionRuleManagerRef,
83    pub(crate) node_manager: NodeManagerRef,
84    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
85}
86
87pub type InserterRef = Arc<Inserter>;
88
89/// Hint for the table type to create automatically.
90#[derive(Clone)]
91pub enum AutoCreateTableType {
92    /// A logical table with the physical table name.
93    Logical(String),
94    /// A physical table.
95    Physical,
96    /// A log table which is append-only.
97    Log,
98    /// A table that merges rows by `last_non_null` strategy.
99    LastNonNull,
100    /// Create table that build index and default partition rules on trace_id
101    Trace,
102}
103
104impl AutoCreateTableType {
105    fn as_str(&self) -> &'static str {
106        match self {
107            AutoCreateTableType::Logical(_) => "logical",
108            AutoCreateTableType::Physical => "physical",
109            AutoCreateTableType::Log => "log",
110            AutoCreateTableType::LastNonNull => "last_non_null",
111            AutoCreateTableType::Trace => "trace",
112        }
113    }
114}
115
116/// Split insert requests into normal and instant requests.
117///
118/// Where instant requests are requests with ttl=instant,
119/// and normal requests are requests with ttl set to other values.
120///
121/// This is used to split requests for different processing.
122#[derive(Clone)]
123pub struct InstantAndNormalInsertRequests {
124    /// Requests with normal ttl.
125    pub normal_requests: RegionInsertRequests,
126    /// Requests with ttl=instant.
127    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
128    pub instant_requests: RegionInsertRequests,
129}
130
131impl Inserter {
132    pub fn new(
133        catalog_manager: CatalogManagerRef,
134        partition_manager: PartitionRuleManagerRef,
135        node_manager: NodeManagerRef,
136        table_flownode_set_cache: TableFlownodeSetCacheRef,
137    ) -> Self {
138        Self {
139            catalog_manager,
140            partition_manager,
141            node_manager,
142            table_flownode_set_cache,
143        }
144    }
145
146    pub async fn handle_column_inserts(
147        &self,
148        requests: InsertRequests,
149        ctx: QueryContextRef,
150        statement_executor: &StatementExecutor,
151    ) -> Result<Output> {
152        let row_inserts = ColumnToRow::convert(requests)?;
153        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
154            .await
155    }
156
157    /// Handles row inserts request and creates a physical table on demand.
158    pub async fn handle_row_inserts(
159        &self,
160        mut requests: RowInsertRequests,
161        ctx: QueryContextRef,
162        statement_executor: &StatementExecutor,
163        accommodate_existing_schema: bool,
164        is_single_value: bool,
165    ) -> Result<Output> {
166        preprocess_row_insert_requests(&mut requests.inserts)?;
167        self.handle_row_inserts_with_create_type(
168            requests,
169            ctx,
170            statement_executor,
171            AutoCreateTableType::Physical,
172            accommodate_existing_schema,
173            is_single_value,
174        )
175        .await
176    }
177
178    /// Handles row inserts request and creates a log table on demand.
179    pub async fn handle_log_inserts(
180        &self,
181        requests: RowInsertRequests,
182        ctx: QueryContextRef,
183        statement_executor: &StatementExecutor,
184    ) -> Result<Output> {
185        self.handle_row_inserts_with_create_type(
186            requests,
187            ctx,
188            statement_executor,
189            AutoCreateTableType::Log,
190            false,
191            false,
192        )
193        .await
194    }
195
196    pub async fn handle_trace_inserts(
197        &self,
198        requests: RowInsertRequests,
199        ctx: QueryContextRef,
200        statement_executor: &StatementExecutor,
201    ) -> Result<Output> {
202        self.handle_row_inserts_with_create_type(
203            requests,
204            ctx,
205            statement_executor,
206            AutoCreateTableType::Trace,
207            false,
208            false,
209        )
210        .await
211    }
212
213    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
214    pub async fn handle_last_non_null_inserts(
215        &self,
216        requests: RowInsertRequests,
217        ctx: QueryContextRef,
218        statement_executor: &StatementExecutor,
219        accommodate_existing_schema: bool,
220        is_single_value: bool,
221    ) -> Result<Output> {
222        self.handle_row_inserts_with_create_type(
223            requests,
224            ctx,
225            statement_executor,
226            AutoCreateTableType::LastNonNull,
227            accommodate_existing_schema,
228            is_single_value,
229        )
230        .await
231    }
232
233    /// Handles row inserts request with specified [AutoCreateTableType].
234    async fn handle_row_inserts_with_create_type(
235        &self,
236        mut requests: RowInsertRequests,
237        ctx: QueryContextRef,
238        statement_executor: &StatementExecutor,
239        create_type: AutoCreateTableType,
240        accommodate_existing_schema: bool,
241        is_single_value: bool,
242    ) -> Result<Output> {
243        // remove empty requests
244        requests.inserts.retain(|req| {
245            req.rows
246                .as_ref()
247                .map(|r| !r.rows.is_empty())
248                .unwrap_or_default()
249        });
250        validate_column_count_match(&requests)?;
251
252        let CreateAlterTableResult {
253            instant_table_ids,
254            table_infos,
255        } = self
256            .create_or_alter_tables_on_demand(
257                &mut requests,
258                &ctx,
259                create_type,
260                statement_executor,
261                accommodate_existing_schema,
262                is_single_value,
263            )
264            .await?;
265
266        let name_to_info = table_infos
267            .values()
268            .map(|info| (info.name.clone(), info.clone()))
269            .collect::<HashMap<_, _>>();
270        let inserts = RowToRegion::new(
271            name_to_info,
272            instant_table_ids,
273            self.partition_manager.as_ref(),
274        )
275        .convert(requests)
276        .await?;
277
278        self.do_request(inserts, &table_infos, &ctx).await
279    }
280
281    /// Handles row inserts request with metric engine.
282    pub async fn handle_metric_row_inserts(
283        &self,
284        mut requests: RowInsertRequests,
285        ctx: QueryContextRef,
286        statement_executor: &StatementExecutor,
287        physical_table: String,
288    ) -> Result<Output> {
289        // remove empty requests
290        requests.inserts.retain(|req| {
291            req.rows
292                .as_ref()
293                .map(|r| !r.rows.is_empty())
294                .unwrap_or_default()
295        });
296        validate_column_count_match(&requests)?;
297
298        // check and create physical table
299        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
300            .await?;
301
302        // check and create logical tables
303        let CreateAlterTableResult {
304            instant_table_ids,
305            table_infos,
306        } = self
307            .create_or_alter_tables_on_demand(
308                &mut requests,
309                &ctx,
310                AutoCreateTableType::Logical(physical_table.clone()),
311                statement_executor,
312                true,
313                true,
314            )
315            .await?;
316        let name_to_info = table_infos
317            .values()
318            .map(|info| (info.name.clone(), info.clone()))
319            .collect::<HashMap<_, _>>();
320        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
321            .convert(requests)
322            .await?;
323
324        self.do_request(inserts, &table_infos, &ctx).await
325    }
326
327    pub async fn handle_table_insert(
328        &self,
329        request: TableInsertRequest,
330        ctx: QueryContextRef,
331    ) -> Result<Output> {
332        let catalog = request.catalog_name.as_str();
333        let schema = request.schema_name.as_str();
334        let table_name = request.table_name.as_str();
335        let table = self.get_table(catalog, schema, table_name).await?;
336        let table = table.with_context(|| TableNotFoundSnafu {
337            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
338        })?;
339        let table_info = table.table_info();
340
341        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
342            .convert(request)
343            .await?;
344
345        let table_infos =
346            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
347
348        self.do_request(inserts, &table_infos, &ctx).await
349    }
350
351    pub async fn handle_statement_insert(
352        &self,
353        insert: &Insert,
354        ctx: &QueryContextRef,
355    ) -> Result<Output> {
356        let (inserts, table_info) =
357            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
358                .convert(insert, ctx)
359                .await?;
360
361        let table_infos =
362            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
363
364        self.do_request(inserts, &table_infos, ctx).await
365    }
366}
367
368impl Inserter {
369    async fn do_request(
370        &self,
371        requests: InstantAndNormalInsertRequests,
372        table_infos: &HashMap<TableId, Arc<TableInfo>>,
373        ctx: &QueryContextRef,
374    ) -> Result<Output> {
375        // Fill impure default values in the request
376        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
377
378        let write_cost = write_meter!(
379            ctx.current_catalog(),
380            ctx.current_schema(),
381            requests,
382            ctx.channel() as u8
383        );
384        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
385            tracing_context: TracingContext::from_current_span().to_w3c(),
386            dbname: ctx.get_db_string(),
387            ..Default::default()
388        });
389
390        let InstantAndNormalInsertRequests {
391            normal_requests,
392            instant_requests,
393        } = requests;
394
395        // Mirror requests for source table to flownode asynchronously
396        let flow_mirror_task = FlowMirrorTask::new(
397            &self.table_flownode_set_cache,
398            normal_requests
399                .requests
400                .iter()
401                .chain(instant_requests.requests.iter()),
402        )
403        .await?;
404        flow_mirror_task.detach(self.node_manager.clone())?;
405
406        // Write requests to datanode and wait for response
407        let write_tasks = self
408            .group_requests_by_peer(normal_requests)
409            .await?
410            .into_iter()
411            .map(|(peer, inserts)| {
412                let node_manager = self.node_manager.clone();
413                let request = request_factory.build_insert(inserts);
414                common_runtime::spawn_global(async move {
415                    node_manager
416                        .datanode(&peer)
417                        .await
418                        .handle(request)
419                        .await
420                        .context(RequestInsertsSnafu)
421                })
422            });
423        let results = future::try_join_all(write_tasks)
424            .await
425            .context(JoinTaskSnafu)?;
426        let affected_rows = results
427            .into_iter()
428            .map(|resp| resp.map(|r| r.affected_rows))
429            .sum::<Result<AffectedRows>>()?;
430        crate::metrics::DIST_INGEST_ROW_COUNT
431            .with_label_values(&[ctx.get_db_string().as_str()])
432            .inc_by(affected_rows as u64);
433        Ok(Output::new(
434            OutputData::AffectedRows(affected_rows),
435            OutputMeta::new_with_cost(write_cost as _),
436        ))
437    }
438
439    async fn group_requests_by_peer(
440        &self,
441        requests: RegionInsertRequests,
442    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
443        // group by region ids first to reduce repeatedly call `find_region_leader`
444        // TODO(discord9): determine if a addition clone is worth it
445        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
446        for req in requests.requests {
447            let region_id = RegionId::from_u64(req.region_id);
448            requests_per_region
449                .entry(region_id)
450                .or_default()
451                .requests
452                .push(req);
453        }
454
455        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
456
457        for (region_id, reqs) in requests_per_region {
458            let peer = self
459                .partition_manager
460                .find_region_leader(region_id)
461                .await
462                .context(FindRegionLeaderSnafu)?;
463            inserts
464                .entry(peer)
465                .or_default()
466                .requests
467                .extend(reqs.requests);
468        }
469
470        Ok(inserts)
471    }
472
473    /// Creates or alter tables on demand:
474    /// - if table does not exist, create table by inferred CreateExpr
475    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
476    ///
477    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
478    /// This mapping is used in the conversion of RowToRegion.
479    ///
480    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
481    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
482    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
483    /// remote write. This will modify the `RowInsertRequests` in place.
484    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
485    async fn create_or_alter_tables_on_demand(
486        &self,
487        requests: &mut RowInsertRequests,
488        ctx: &QueryContextRef,
489        auto_create_table_type: AutoCreateTableType,
490        statement_executor: &StatementExecutor,
491        accommodate_existing_schema: bool,
492        is_single_value: bool,
493    ) -> Result<CreateAlterTableResult> {
494        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
495            .with_label_values(&[auto_create_table_type.as_str()])
496            .start_timer();
497
498        let catalog = ctx.current_catalog();
499        let schema = ctx.current_schema();
500
501        let mut table_infos = HashMap::new();
502        // If `auto_create_table` hint is disabled, skip creating/altering tables.
503        let auto_create_table_hint = ctx
504            .extension(AUTO_CREATE_TABLE_KEY)
505            .map(|v| v.parse::<bool>())
506            .transpose()
507            .map_err(|_| {
508                InvalidInsertRequestSnafu {
509                    reason: "`auto_create_table` hint must be a boolean",
510                }
511                .build()
512            })?
513            .unwrap_or(true);
514        if !auto_create_table_hint {
515            let mut instant_table_ids = HashSet::new();
516            for req in &requests.inserts {
517                let table = self
518                    .get_table(catalog, &schema, &req.table_name)
519                    .await?
520                    .context(InvalidInsertRequestSnafu {
521                        reason: format!(
522                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
523                            req.table_name
524                        ),
525                    })?;
526                let table_info = table.table_info();
527                if table_info.is_ttl_instant_table() {
528                    instant_table_ids.insert(table_info.table_id());
529                }
530                table_infos.insert(table_info.table_id(), table.table_info());
531            }
532            let ret = CreateAlterTableResult {
533                instant_table_ids,
534                table_infos,
535            };
536            return Ok(ret);
537        }
538
539        let mut create_tables = vec![];
540        let mut alter_tables = vec![];
541        let mut instant_table_ids = HashSet::new();
542
543        for req in &mut requests.inserts {
544            match self.get_table(catalog, &schema, &req.table_name).await? {
545                Some(table) => {
546                    let table_info = table.table_info();
547                    if table_info.is_ttl_instant_table() {
548                        instant_table_ids.insert(table_info.table_id());
549                    }
550                    table_infos.insert(table_info.table_id(), table.table_info());
551                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
552                        req,
553                        &table,
554                        ctx,
555                        accommodate_existing_schema,
556                        is_single_value,
557                    )? {
558                        alter_tables.push(alter_expr);
559                    }
560                }
561                None => {
562                    let create_expr =
563                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
564                    create_tables.push(create_expr);
565                }
566            }
567        }
568
569        match auto_create_table_type {
570            AutoCreateTableType::Logical(_) => {
571                if !create_tables.is_empty() {
572                    // Creates logical tables in batch.
573                    let tables = self
574                        .create_logical_tables(create_tables, ctx, statement_executor)
575                        .await?;
576
577                    for table in tables {
578                        let table_info = table.table_info();
579                        if table_info.is_ttl_instant_table() {
580                            instant_table_ids.insert(table_info.table_id());
581                        }
582                        table_infos.insert(table_info.table_id(), table.table_info());
583                    }
584                }
585                if !alter_tables.is_empty() {
586                    // Alter logical tables in batch.
587                    statement_executor
588                        .alter_logical_tables(alter_tables, ctx.clone())
589                        .await?;
590                }
591            }
592            AutoCreateTableType::Physical
593            | AutoCreateTableType::Log
594            | AutoCreateTableType::LastNonNull => {
595                // note that auto create table shouldn't be ttl instant table
596                // for it's a very unexpected behavior and should be set by user explicitly
597                for create_table in create_tables {
598                    let table = self
599                        .create_physical_table(create_table, None, ctx, statement_executor)
600                        .await?;
601                    let table_info = table.table_info();
602                    if table_info.is_ttl_instant_table() {
603                        instant_table_ids.insert(table_info.table_id());
604                    }
605                    table_infos.insert(table_info.table_id(), table.table_info());
606                }
607                for alter_expr in alter_tables.into_iter() {
608                    statement_executor
609                        .alter_table_inner(alter_expr, ctx.clone())
610                        .await?;
611                }
612            }
613
614            AutoCreateTableType::Trace => {
615                let trace_table_name = ctx
616                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
617                    .unwrap_or(TRACE_TABLE_NAME);
618
619                // note that auto create table shouldn't be ttl instant table
620                // for it's a very unexpected behavior and should be set by user explicitly
621                for mut create_table in create_tables {
622                    if create_table.table_name == trace_services_table_name(trace_table_name)
623                        || create_table.table_name == trace_operations_table_name(trace_table_name)
624                    {
625                        // Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
626                        create_table
627                            .table_options
628                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
629                        let table = self
630                            .create_physical_table(create_table, None, ctx, statement_executor)
631                            .await?;
632                        let table_info = table.table_info();
633                        if table_info.is_ttl_instant_table() {
634                            instant_table_ids.insert(table_info.table_id());
635                        }
636                        table_infos.insert(table_info.table_id(), table.table_info());
637                    } else {
638                        // prebuilt partition rules for uuid data: see the function
639                        // for more information
640                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
641                            .context(CreatePartitionRulesSnafu)?;
642                        // add skip index to
643                        // - trace_id: when searching by trace id
644                        // - parent_span_id: when searching root span
645                        // - span_name: when searching certain types of span
646                        let index_columns =
647                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
648                        for index_column in index_columns {
649                            if let Some(col) = create_table
650                                .column_defs
651                                .iter_mut()
652                                .find(|c| c.name == index_column)
653                            {
654                                col.options =
655                                    options_from_skipping(&SkippingIndexOptions::default())
656                                        .context(ColumnOptionsSnafu)?;
657                            } else {
658                                warn!(
659                                    "Column {} not found when creating index for trace table: {}.",
660                                    index_column, create_table.table_name
661                                );
662                            }
663                        }
664
665                        // use table_options to mark table model version
666                        create_table.table_options.insert(
667                            TABLE_DATA_MODEL.to_string(),
668                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
669                        );
670
671                        let table = self
672                            .create_physical_table(
673                                create_table,
674                                Some(partitions),
675                                ctx,
676                                statement_executor,
677                            )
678                            .await?;
679                        let table_info = table.table_info();
680                        if table_info.is_ttl_instant_table() {
681                            instant_table_ids.insert(table_info.table_id());
682                        }
683                        table_infos.insert(table_info.table_id(), table.table_info());
684                    }
685                }
686                for alter_expr in alter_tables.into_iter() {
687                    statement_executor
688                        .alter_table_inner(alter_expr, ctx.clone())
689                        .await?;
690                }
691            }
692        }
693
694        Ok(CreateAlterTableResult {
695            instant_table_ids,
696            table_infos,
697        })
698    }
699
700    async fn create_physical_table_on_demand(
701        &self,
702        ctx: &QueryContextRef,
703        physical_table: String,
704        statement_executor: &StatementExecutor,
705    ) -> Result<()> {
706        let catalog_name = ctx.current_catalog();
707        let schema_name = ctx.current_schema();
708
709        // check if exist
710        if self
711            .get_table(catalog_name, &schema_name, &physical_table)
712            .await?
713            .is_some()
714        {
715            return Ok(());
716        }
717
718        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
719        info!("Physical metric table `{table_reference}` does not exist, try creating table");
720
721        // schema with timestamp and field column
722        let default_schema = vec![
723            ColumnSchema {
724                column_name: greptime_timestamp().to_string(),
725                datatype: ColumnDataType::TimestampMillisecond as _,
726                semantic_type: SemanticType::Timestamp as _,
727                datatype_extension: None,
728                options: None,
729            },
730            ColumnSchema {
731                column_name: greptime_value().to_string(),
732                datatype: ColumnDataType::Float64 as _,
733                semantic_type: SemanticType::Field as _,
734                datatype_extension: None,
735                options: None,
736            },
737        ];
738        let create_table_expr =
739            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
740
741        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
742        create_table_expr
743            .table_options
744            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
745
746        // create physical table
747        let res = statement_executor
748            .create_table_inner(create_table_expr, None, ctx.clone())
749            .await;
750
751        match res {
752            Ok(_) => {
753                info!("Successfully created table {table_reference}",);
754                Ok(())
755            }
756            Err(err) => {
757                error!(err; "Failed to create table {table_reference}");
758                Err(err)
759            }
760        }
761    }
762
763    async fn get_table(
764        &self,
765        catalog: &str,
766        schema: &str,
767        table: &str,
768    ) -> Result<Option<TableRef>> {
769        self.catalog_manager
770            .table(catalog, schema, table, None)
771            .await
772            .context(CatalogSnafu)
773    }
774
775    fn get_create_table_expr_on_demand(
776        &self,
777        req: &RowInsertRequest,
778        create_type: &AutoCreateTableType,
779        ctx: &QueryContextRef,
780    ) -> Result<CreateTableExpr> {
781        let mut table_options = std::collections::HashMap::with_capacity(4);
782        fill_table_options_for_create(&mut table_options, create_type, ctx);
783
784        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
785            // engine should be metric engine when creating logical tables.
786            METRIC_ENGINE_NAME
787        } else {
788            default_engine()
789        };
790
791        let schema = ctx.current_schema();
792        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
793        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
794        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
795        let mut create_table_expr =
796            build_create_table_expr(&table_ref, request_schema, engine_name)?;
797
798        info!("Table `{table_ref}` does not exist, try creating table");
799        create_table_expr.table_options.extend(table_options);
800        Ok(create_table_expr)
801    }
802
803    /// Returns an alter table expression if it finds new columns in the request.
804    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
805    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
806    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
807    /// for more details.
808    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
809    /// input `req`.
810    fn get_alter_table_expr_on_demand(
811        &self,
812        req: &mut RowInsertRequest,
813        table: &TableRef,
814        ctx: &QueryContextRef,
815        accommodate_existing_schema: bool,
816        is_single_value: bool,
817    ) -> Result<Option<AlterTableExpr>> {
818        let catalog_name = ctx.current_catalog();
819        let schema_name = ctx.current_schema();
820        let table_name = table.table_info().name.clone();
821
822        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
823        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
824        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
825        let Some(mut add_columns) = add_columns else {
826            return Ok(None);
827        };
828
829        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
830        if accommodate_existing_schema {
831            let table_schema = table.schema();
832            // Find timestamp column name
833            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
834            // Find field column name if there is only one and `is_single_value` is true.
835            let mut field_col_name = None;
836            if is_single_value {
837                let mut multiple_field_cols = false;
838                table.field_columns().for_each(|col| {
839                    if field_col_name.is_none() {
840                        field_col_name = Some(col.name.clone());
841                    } else {
842                        multiple_field_cols = true;
843                    }
844                });
845                if multiple_field_cols {
846                    field_col_name = None;
847                }
848            }
849
850            // Update column name in request schema for Timestamp/Field columns
851            if let Some(rows) = req.rows.as_mut() {
852                for col in &mut rows.schema {
853                    match col.semantic_type {
854                        x if x == SemanticType::Timestamp as i32 => {
855                            if let Some(ref ts_name) = ts_col_name
856                                && col.column_name != *ts_name
857                            {
858                                col.column_name = ts_name.clone();
859                            }
860                        }
861                        x if x == SemanticType::Field as i32 => {
862                            if let Some(ref field_name) = field_col_name
863                                && col.column_name != *field_name
864                            {
865                                col.column_name = field_name.clone();
866                            }
867                        }
868                        _ => {}
869                    }
870                }
871            }
872
873            // Only keep columns that are tags or non-single field.
874            add_columns.add_columns.retain(|col| {
875                let def = col.column_def.as_ref().unwrap();
876                def.semantic_type == SemanticType::Tag as i32
877                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
878            });
879
880            if add_columns.add_columns.is_empty() {
881                return Ok(None);
882            }
883        }
884
885        Ok(Some(AlterTableExpr {
886            catalog_name: catalog_name.to_string(),
887            schema_name: schema_name.clone(),
888            table_name: table_name.clone(),
889            kind: Some(Kind::AddColumns(add_columns)),
890        }))
891    }
892
893    /// Creates a table with options.
894    async fn create_physical_table(
895        &self,
896        mut create_table_expr: CreateTableExpr,
897        partitions: Option<Partitions>,
898        ctx: &QueryContextRef,
899        statement_executor: &StatementExecutor,
900    ) -> Result<TableRef> {
901        {
902            let table_ref = TableReference::full(
903                &create_table_expr.catalog_name,
904                &create_table_expr.schema_name,
905                &create_table_expr.table_name,
906            );
907
908            info!("Table `{table_ref}` does not exist, try creating table");
909        }
910        let res = statement_executor
911            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
912            .await;
913
914        let table_ref = TableReference::full(
915            &create_table_expr.catalog_name,
916            &create_table_expr.schema_name,
917            &create_table_expr.table_name,
918        );
919
920        match res {
921            Ok(table) => {
922                info!(
923                    "Successfully created table {} with options: {:?}",
924                    table_ref, create_table_expr.table_options,
925                );
926                Ok(table)
927            }
928            Err(err) => {
929                error!(err; "Failed to create table {}", table_ref);
930                Err(err)
931            }
932        }
933    }
934
935    async fn create_logical_tables(
936        &self,
937        create_table_exprs: Vec<CreateTableExpr>,
938        ctx: &QueryContextRef,
939        statement_executor: &StatementExecutor,
940    ) -> Result<Vec<TableRef>> {
941        let res = statement_executor
942            .create_logical_tables(&create_table_exprs, ctx.clone())
943            .await;
944
945        match res {
946            Ok(res) => {
947                info!("Successfully created logical tables");
948                Ok(res)
949            }
950            Err(err) => {
951                let failed_tables = create_table_exprs
952                    .into_iter()
953                    .map(|expr| {
954                        format!(
955                            "{}.{}.{}",
956                            expr.catalog_name, expr.schema_name, expr.table_name
957                        )
958                    })
959                    .collect::<Vec<_>>();
960                error!(
961                    err;
962                    "Failed to create logical tables {:?}",
963                    failed_tables
964                );
965                Err(err)
966            }
967        }
968    }
969
970    pub fn node_manager(&self) -> &NodeManagerRef {
971        &self.node_manager
972    }
973
974    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
975        &self.partition_manager
976    }
977}
978
979fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
980    for request in &requests.inserts {
981        let rows = request.rows.as_ref().unwrap();
982        let column_count = rows.schema.len();
983        rows.rows.iter().try_for_each(|r| {
984            ensure!(
985                r.values.len() == column_count,
986                InvalidInsertRequestSnafu {
987                    reason: format!(
988                        "column count mismatch, columns: {}, values: {}",
989                        column_count,
990                        r.values.len()
991                    )
992                }
993            );
994            Ok(())
995        })?;
996    }
997    Ok(())
998}
999
1000/// Fill table options for a new table by create type.
1001pub fn fill_table_options_for_create(
1002    table_options: &mut std::collections::HashMap<String, String>,
1003    create_type: &AutoCreateTableType,
1004    ctx: &QueryContextRef,
1005) {
1006    for key in VALID_TABLE_OPTION_KEYS {
1007        if let Some(value) = ctx.extension(key) {
1008            table_options.insert(key.to_string(), value.to_string());
1009        }
1010    }
1011
1012    match create_type {
1013        AutoCreateTableType::Logical(physical_table) => {
1014            table_options.insert(
1015                LOGICAL_TABLE_METADATA_KEY.to_string(),
1016                physical_table.clone(),
1017            );
1018        }
1019        AutoCreateTableType::Physical => {
1020            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1021                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1022            }
1023            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1024                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1025            }
1026            if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1027                table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1028                // We need to set the compaction type explicitly.
1029                table_options.insert(
1030                    COMPACTION_TYPE.to_string(),
1031                    COMPACTION_TYPE_TWCS.to_string(),
1032                );
1033            }
1034        }
1035        // Set append_mode to true for log table.
1036        // because log tables should keep rows with the same ts and tags.
1037        AutoCreateTableType::Log => {
1038            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1039        }
1040        AutoCreateTableType::LastNonNull => {
1041            table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1042        }
1043        AutoCreateTableType::Trace => {
1044            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1045        }
1046    }
1047}
1048
1049pub fn build_create_table_expr(
1050    table: &TableReference,
1051    request_schema: &[ColumnSchema],
1052    engine: &str,
1053) -> Result<CreateTableExpr> {
1054    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1055}
1056
1057/// Result of `create_or_alter_tables_on_demand`.
1058struct CreateAlterTableResult {
1059    /// table ids of ttl=instant tables.
1060    instant_table_ids: HashSet<TableId>,
1061    /// Table Info of the created tables.
1062    table_infos: HashMap<TableId, Arc<TableInfo>>,
1063}
1064
1065struct FlowMirrorTask {
1066    requests: HashMap<Peer, RegionInsertRequests>,
1067    num_rows: usize,
1068}
1069
1070impl FlowMirrorTask {
1071    async fn new(
1072        cache: &TableFlownodeSetCacheRef,
1073        requests: impl Iterator<Item = &RegionInsertRequest>,
1074    ) -> Result<Self> {
1075        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1076            HashMap::new();
1077        let mut num_rows = 0;
1078
1079        for req in requests {
1080            let table_id = RegionId::from_u64(req.region_id).table_id();
1081            match src_table_reqs.get_mut(&table_id) {
1082                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1083                // already know this is not source table
1084                Some(None) => continue,
1085                _ => {
1086                    // dedup peers
1087                    let peers = cache
1088                        .get(table_id)
1089                        .await
1090                        .context(RequestInsertsSnafu)?
1091                        .unwrap_or_default()
1092                        .values()
1093                        .cloned()
1094                        .collect::<HashSet<_>>()
1095                        .into_iter()
1096                        .collect::<Vec<_>>();
1097
1098                    if !peers.is_empty() {
1099                        let mut reqs = RegionInsertRequests::default();
1100                        reqs.requests.push(req.clone());
1101                        num_rows += reqs
1102                            .requests
1103                            .iter()
1104                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1105                            .sum::<usize>();
1106                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1107                    } else {
1108                        // insert a empty entry to avoid repeat query
1109                        src_table_reqs.insert(table_id, None);
1110                    }
1111                }
1112            }
1113        }
1114
1115        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1116
1117        for (_table_id, (peers, reqs)) in src_table_reqs
1118            .into_iter()
1119            .filter_map(|(k, v)| v.map(|v| (k, v)))
1120        {
1121            if peers.len() == 1 {
1122                // fast path, zero copy
1123                inserts
1124                    .entry(peers[0].clone())
1125                    .or_default()
1126                    .requests
1127                    .extend(reqs.requests);
1128                continue;
1129            } else {
1130                // TODO(discord9): need to split requests to multiple flownodes
1131                for flownode in peers {
1132                    inserts
1133                        .entry(flownode.clone())
1134                        .or_default()
1135                        .requests
1136                        .extend(reqs.requests.clone());
1137                }
1138            }
1139        }
1140
1141        Ok(Self {
1142            requests: inserts,
1143            num_rows,
1144        })
1145    }
1146
1147    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1148        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1149        for (peer, inserts) in self.requests {
1150            let node_manager = node_manager.clone();
1151            common_runtime::spawn_global(async move {
1152                let result = node_manager
1153                    .flownode(&peer)
1154                    .await
1155                    .handle_inserts(inserts)
1156                    .await
1157                    .context(RequestInsertsSnafu);
1158
1159                match result {
1160                    Ok(resp) => {
1161                        let affected_rows = resp.affected_rows;
1162                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1163                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1164                    }
1165                    Err(err) => {
1166                        error!(err; "Failed to insert data into flownode {}", peer);
1167                    }
1168                }
1169            });
1170        }
1171
1172        Ok(())
1173    }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use std::sync::Arc;
1179
1180    use api::v1::helper::{field_column_schema, time_index_column_schema};
1181    use api::v1::{RowInsertRequest, Rows, Value};
1182    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1183    use common_meta::cache::new_table_flownode_set_cache;
1184    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1185    use common_meta::test_util::MockDatanodeManager;
1186    use datatypes::data_type::ConcreteDataType;
1187    use datatypes::schema::ColumnSchema;
1188    use moka::future::Cache;
1189    use session::context::QueryContext;
1190    use table::TableRef;
1191    use table::dist_table::DummyDataSource;
1192    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1193
1194    use super::*;
1195    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1196
1197    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1198        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1199            ColumnSchema::new(
1200                ts_name,
1201                ConcreteDataType::timestamp_millisecond_datatype(),
1202                false,
1203            )
1204            .with_time_index(true),
1205            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1206        ])
1207        .unwrap()
1208        .build()
1209        .unwrap();
1210        let meta = TableMetaBuilder::empty()
1211            .schema(Arc::new(schema))
1212            .primary_key_indices(vec![])
1213            .value_indices(vec![1])
1214            .engine("mito")
1215            .next_column_id(0)
1216            .options(Default::default())
1217            .created_on(Default::default())
1218            .region_numbers(vec![0])
1219            .build()
1220            .unwrap();
1221        let info = Arc::new(
1222            TableInfoBuilder::default()
1223                .table_id(1)
1224                .table_version(0)
1225                .name("test_table")
1226                .schema_name(DEFAULT_SCHEMA_NAME)
1227                .catalog_name(DEFAULT_CATALOG_NAME)
1228                .desc(None)
1229                .table_type(TableType::Base)
1230                .meta(meta)
1231                .build()
1232                .unwrap(),
1233        );
1234        Arc::new(table::Table::new(
1235            info,
1236            table::metadata::FilterPushDownType::Unsupported,
1237            Arc::new(DummyDataSource),
1238        ))
1239    }
1240
1241    #[tokio::test]
1242    async fn test_accommodate_existing_schema_logic() {
1243        let ts_name = "my_ts";
1244        let field_name = "my_field";
1245        let table = make_table_ref_with_schema(ts_name, field_name);
1246
1247        // The request uses different names for timestamp and field columns
1248        let mut req = RowInsertRequest {
1249            table_name: "test_table".to_string(),
1250            rows: Some(Rows {
1251                schema: vec![
1252                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1253                    field_column_schema("field_wrong", ColumnDataType::Float64),
1254                ],
1255                rows: vec![api::v1::Row {
1256                    values: vec![Value::default(), Value::default()],
1257                }],
1258            }),
1259        };
1260        let ctx = Arc::new(QueryContext::with(
1261            DEFAULT_CATALOG_NAME,
1262            DEFAULT_SCHEMA_NAME,
1263        ));
1264
1265        let kv_backend = prepare_mocked_backend().await;
1266        let inserter = Inserter::new(
1267            catalog::memory::MemoryCatalogManager::new(),
1268            create_partition_rule_manager(kv_backend.clone()).await,
1269            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1270            Arc::new(new_table_flownode_set_cache(
1271                String::new(),
1272                Cache::new(100),
1273                kv_backend.clone(),
1274            )),
1275        );
1276        let alter_expr = inserter
1277            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1278            .unwrap();
1279        assert!(alter_expr.is_none());
1280
1281        // The request's schema should have updated names for timestamp and field columns
1282        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1283        assert_eq!(req_schema[0].column_name, ts_name);
1284        assert_eq!(req_schema[1].column_name, field_name);
1285    }
1286}