operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32    TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33    trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57    APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TTL_KEY,
58    TWCS_TIME_WINDOW,
59};
60use store_api::storage::{RegionId, TableId};
61use table::TableRef;
62use table::metadata::TableInfo;
63use table::requests::{
64    AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
65    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
66};
67use table::table_reference::TableReference;
68
69use crate::error::{
70    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
71    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
72};
73use crate::expr_helper;
74use crate::region_req_factory::RegionRequestFactory;
75use crate::req_convert::common::preprocess_row_insert_requests;
76use crate::req_convert::insert::{
77    ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
78};
79use crate::statement::StatementExecutor;
80
81pub struct Inserter {
82    catalog_manager: CatalogManagerRef,
83    pub(crate) partition_manager: PartitionRuleManagerRef,
84    pub(crate) node_manager: NodeManagerRef,
85    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
86}
87
88pub type InserterRef = Arc<Inserter>;
89
90/// Hint for the table type to create automatically.
91#[derive(Clone)]
92pub enum AutoCreateTableType {
93    /// A logical table with the physical table name.
94    Logical(String),
95    /// A physical table.
96    Physical,
97    /// A log table which is append-only.
98    Log,
99    /// A table that merges rows by `last_non_null` strategy.
100    LastNonNull,
101    /// Create table that build index and default partition rules on trace_id
102    Trace,
103}
104
105impl AutoCreateTableType {
106    fn as_str(&self) -> &'static str {
107        match self {
108            AutoCreateTableType::Logical(_) => "logical",
109            AutoCreateTableType::Physical => "physical",
110            AutoCreateTableType::Log => "log",
111            AutoCreateTableType::LastNonNull => "last_non_null",
112            AutoCreateTableType::Trace => "trace",
113        }
114    }
115}
116
117/// Split insert requests into normal and instant requests.
118///
119/// Where instant requests are requests with ttl=instant,
120/// and normal requests are requests with ttl set to other values.
121///
122/// This is used to split requests for different processing.
123#[derive(Clone)]
124pub struct InstantAndNormalInsertRequests {
125    /// Requests with normal ttl.
126    pub normal_requests: RegionInsertRequests,
127    /// Requests with ttl=instant.
128    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
129    pub instant_requests: RegionInsertRequests,
130}
131
132impl Inserter {
133    pub fn new(
134        catalog_manager: CatalogManagerRef,
135        partition_manager: PartitionRuleManagerRef,
136        node_manager: NodeManagerRef,
137        table_flownode_set_cache: TableFlownodeSetCacheRef,
138    ) -> Self {
139        Self {
140            catalog_manager,
141            partition_manager,
142            node_manager,
143            table_flownode_set_cache,
144        }
145    }
146
147    pub async fn handle_column_inserts(
148        &self,
149        requests: InsertRequests,
150        ctx: QueryContextRef,
151        statement_executor: &StatementExecutor,
152    ) -> Result<Output> {
153        let row_inserts = ColumnToRow::convert(requests)?;
154        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
155            .await
156    }
157
158    /// Handles row inserts request and creates a physical table on demand.
159    pub async fn handle_row_inserts(
160        &self,
161        mut requests: RowInsertRequests,
162        ctx: QueryContextRef,
163        statement_executor: &StatementExecutor,
164        accommodate_existing_schema: bool,
165        is_single_value: bool,
166    ) -> Result<Output> {
167        preprocess_row_insert_requests(&mut requests.inserts)?;
168        self.handle_row_inserts_with_create_type(
169            requests,
170            ctx,
171            statement_executor,
172            AutoCreateTableType::Physical,
173            accommodate_existing_schema,
174            is_single_value,
175        )
176        .await
177    }
178
179    /// Handles row inserts request and creates a log table on demand.
180    pub async fn handle_log_inserts(
181        &self,
182        requests: RowInsertRequests,
183        ctx: QueryContextRef,
184        statement_executor: &StatementExecutor,
185    ) -> Result<Output> {
186        self.handle_row_inserts_with_create_type(
187            requests,
188            ctx,
189            statement_executor,
190            AutoCreateTableType::Log,
191            false,
192            false,
193        )
194        .await
195    }
196
197    pub async fn handle_trace_inserts(
198        &self,
199        requests: RowInsertRequests,
200        ctx: QueryContextRef,
201        statement_executor: &StatementExecutor,
202    ) -> Result<Output> {
203        self.handle_row_inserts_with_create_type(
204            requests,
205            ctx,
206            statement_executor,
207            AutoCreateTableType::Trace,
208            false,
209            false,
210        )
211        .await
212    }
213
214    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
215    pub async fn handle_last_non_null_inserts(
216        &self,
217        requests: RowInsertRequests,
218        ctx: QueryContextRef,
219        statement_executor: &StatementExecutor,
220        accommodate_existing_schema: bool,
221        is_single_value: bool,
222    ) -> Result<Output> {
223        self.handle_row_inserts_with_create_type(
224            requests,
225            ctx,
226            statement_executor,
227            AutoCreateTableType::LastNonNull,
228            accommodate_existing_schema,
229            is_single_value,
230        )
231        .await
232    }
233
234    /// Handles row inserts request with specified [AutoCreateTableType].
235    async fn handle_row_inserts_with_create_type(
236        &self,
237        mut requests: RowInsertRequests,
238        ctx: QueryContextRef,
239        statement_executor: &StatementExecutor,
240        create_type: AutoCreateTableType,
241        accommodate_existing_schema: bool,
242        is_single_value: bool,
243    ) -> Result<Output> {
244        // remove empty requests
245        requests.inserts.retain(|req| {
246            req.rows
247                .as_ref()
248                .map(|r| !r.rows.is_empty())
249                .unwrap_or_default()
250        });
251        validate_column_count_match(&requests)?;
252
253        let CreateAlterTableResult {
254            instant_table_ids,
255            table_infos,
256        } = self
257            .create_or_alter_tables_on_demand(
258                &mut requests,
259                &ctx,
260                create_type,
261                statement_executor,
262                accommodate_existing_schema,
263                is_single_value,
264            )
265            .await?;
266
267        let name_to_info = table_infos
268            .values()
269            .map(|info| (info.name.clone(), info.clone()))
270            .collect::<HashMap<_, _>>();
271        let inserts = RowToRegion::new(
272            name_to_info,
273            instant_table_ids,
274            self.partition_manager.as_ref(),
275        )
276        .convert(requests)
277        .await?;
278
279        self.do_request(inserts, &table_infos, &ctx).await
280    }
281
282    /// Handles row inserts request with metric engine.
283    pub async fn handle_metric_row_inserts(
284        &self,
285        mut requests: RowInsertRequests,
286        ctx: QueryContextRef,
287        statement_executor: &StatementExecutor,
288        physical_table: String,
289    ) -> Result<Output> {
290        // remove empty requests
291        requests.inserts.retain(|req| {
292            req.rows
293                .as_ref()
294                .map(|r| !r.rows.is_empty())
295                .unwrap_or_default()
296        });
297        validate_column_count_match(&requests)?;
298
299        // check and create physical table
300        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
301            .await?;
302
303        // check and create logical tables
304        let CreateAlterTableResult {
305            instant_table_ids,
306            table_infos,
307        } = self
308            .create_or_alter_tables_on_demand(
309                &mut requests,
310                &ctx,
311                AutoCreateTableType::Logical(physical_table.clone()),
312                statement_executor,
313                true,
314                true,
315            )
316            .await?;
317        let name_to_info = table_infos
318            .values()
319            .map(|info| (info.name.clone(), info.clone()))
320            .collect::<HashMap<_, _>>();
321        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
322            .convert(requests)
323            .await?;
324
325        self.do_request(inserts, &table_infos, &ctx).await
326    }
327
328    pub async fn handle_table_insert(
329        &self,
330        request: TableInsertRequest,
331        ctx: QueryContextRef,
332    ) -> Result<Output> {
333        let catalog = request.catalog_name.as_str();
334        let schema = request.schema_name.as_str();
335        let table_name = request.table_name.as_str();
336        let table = self.get_table(catalog, schema, table_name).await?;
337        let table = table.with_context(|| TableNotFoundSnafu {
338            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
339        })?;
340        let table_info = table.table_info();
341
342        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
343            .convert(request)
344            .await?;
345
346        let table_infos =
347            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
348
349        self.do_request(inserts, &table_infos, &ctx).await
350    }
351
352    pub async fn handle_statement_insert(
353        &self,
354        insert: &Insert,
355        ctx: &QueryContextRef,
356        statement_executor: &StatementExecutor,
357    ) -> Result<Output> {
358        let (inserts, table_info) =
359            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
360                .convert(insert, ctx, statement_executor)
361                .await?;
362
363        let table_infos =
364            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
365
366        self.do_request(inserts, &table_infos, ctx).await
367    }
368}
369
370impl Inserter {
371    async fn do_request(
372        &self,
373        requests: InstantAndNormalInsertRequests,
374        table_infos: &HashMap<TableId, Arc<TableInfo>>,
375        ctx: &QueryContextRef,
376    ) -> Result<Output> {
377        // Fill impure default values in the request
378        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
379
380        let write_cost = write_meter!(
381            ctx.current_catalog(),
382            ctx.current_schema(),
383            requests,
384            ctx.channel() as u8
385        );
386        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
387            tracing_context: TracingContext::from_current_span().to_w3c(),
388            dbname: ctx.get_db_string(),
389            ..Default::default()
390        });
391
392        let InstantAndNormalInsertRequests {
393            normal_requests,
394            instant_requests,
395        } = requests;
396
397        // Mirror requests for source table to flownode asynchronously
398        let flow_mirror_task = FlowMirrorTask::new(
399            &self.table_flownode_set_cache,
400            normal_requests
401                .requests
402                .iter()
403                .chain(instant_requests.requests.iter()),
404        )
405        .await?;
406        flow_mirror_task.detach(self.node_manager.clone())?;
407
408        // Write requests to datanode and wait for response
409        let write_tasks = self
410            .group_requests_by_peer(normal_requests)
411            .await?
412            .into_iter()
413            .map(|(peer, inserts)| {
414                let node_manager = self.node_manager.clone();
415                let request = request_factory.build_insert(inserts);
416                common_runtime::spawn_global(async move {
417                    node_manager
418                        .datanode(&peer)
419                        .await
420                        .handle(request)
421                        .await
422                        .context(RequestInsertsSnafu)
423                })
424            });
425        let results = future::try_join_all(write_tasks)
426            .await
427            .context(JoinTaskSnafu)?;
428        let affected_rows = results
429            .into_iter()
430            .map(|resp| resp.map(|r| r.affected_rows))
431            .sum::<Result<AffectedRows>>()?;
432        crate::metrics::DIST_INGEST_ROW_COUNT
433            .with_label_values(&[ctx.get_db_string().as_str()])
434            .inc_by(affected_rows as u64);
435        Ok(Output::new(
436            OutputData::AffectedRows(affected_rows),
437            OutputMeta::new_with_cost(write_cost as _),
438        ))
439    }
440
441    async fn group_requests_by_peer(
442        &self,
443        requests: RegionInsertRequests,
444    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
445        // group by region ids first to reduce repeatedly call `find_region_leader`
446        // TODO(discord9): determine if a addition clone is worth it
447        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
448        for req in requests.requests {
449            let region_id = RegionId::from_u64(req.region_id);
450            requests_per_region
451                .entry(region_id)
452                .or_default()
453                .requests
454                .push(req);
455        }
456
457        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
458
459        for (region_id, reqs) in requests_per_region {
460            let peer = self
461                .partition_manager
462                .find_region_leader(region_id)
463                .await
464                .context(FindRegionLeaderSnafu)?;
465            inserts
466                .entry(peer)
467                .or_default()
468                .requests
469                .extend(reqs.requests);
470        }
471
472        Ok(inserts)
473    }
474
475    /// Creates or alter tables on demand:
476    /// - if table does not exist, create table by inferred CreateExpr
477    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
478    ///
479    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
480    /// This mapping is used in the conversion of RowToRegion.
481    ///
482    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
483    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
484    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
485    /// remote write. This will modify the `RowInsertRequests` in place.
486    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
487    async fn create_or_alter_tables_on_demand(
488        &self,
489        requests: &mut RowInsertRequests,
490        ctx: &QueryContextRef,
491        auto_create_table_type: AutoCreateTableType,
492        statement_executor: &StatementExecutor,
493        accommodate_existing_schema: bool,
494        is_single_value: bool,
495    ) -> Result<CreateAlterTableResult> {
496        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
497            .with_label_values(&[auto_create_table_type.as_str()])
498            .start_timer();
499
500        let catalog = ctx.current_catalog();
501        let schema = ctx.current_schema();
502
503        let mut table_infos = HashMap::new();
504        // If `auto_create_table` hint is disabled, skip creating/altering tables.
505        let auto_create_table_hint = ctx
506            .extension(AUTO_CREATE_TABLE_KEY)
507            .map(|v| v.parse::<bool>())
508            .transpose()
509            .map_err(|_| {
510                InvalidInsertRequestSnafu {
511                    reason: "`auto_create_table` hint must be a boolean",
512                }
513                .build()
514            })?
515            .unwrap_or(true);
516        if !auto_create_table_hint {
517            let mut instant_table_ids = HashSet::new();
518            for req in &requests.inserts {
519                let table = self
520                    .get_table(catalog, &schema, &req.table_name)
521                    .await?
522                    .context(InvalidInsertRequestSnafu {
523                        reason: format!(
524                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
525                            req.table_name
526                        ),
527                    })?;
528                let table_info = table.table_info();
529                if table_info.is_ttl_instant_table() {
530                    instant_table_ids.insert(table_info.table_id());
531                }
532                table_infos.insert(table_info.table_id(), table.table_info());
533            }
534            let ret = CreateAlterTableResult {
535                instant_table_ids,
536                table_infos,
537            };
538            return Ok(ret);
539        }
540
541        let mut create_tables = vec![];
542        let mut alter_tables = vec![];
543        let mut instant_table_ids = HashSet::new();
544
545        for req in &mut requests.inserts {
546            match self.get_table(catalog, &schema, &req.table_name).await? {
547                Some(table) => {
548                    let table_info = table.table_info();
549                    if table_info.is_ttl_instant_table() {
550                        instant_table_ids.insert(table_info.table_id());
551                    }
552                    table_infos.insert(table_info.table_id(), table.table_info());
553                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
554                        req,
555                        &table,
556                        ctx,
557                        accommodate_existing_schema,
558                        is_single_value,
559                    )? {
560                        alter_tables.push(alter_expr);
561                    }
562                }
563                None => {
564                    let create_expr =
565                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
566                    create_tables.push(create_expr);
567                }
568            }
569        }
570
571        match auto_create_table_type {
572            AutoCreateTableType::Logical(_) => {
573                if !create_tables.is_empty() {
574                    // Creates logical tables in batch.
575                    let tables = self
576                        .create_logical_tables(create_tables, ctx, statement_executor)
577                        .await?;
578
579                    for table in tables {
580                        let table_info = table.table_info();
581                        if table_info.is_ttl_instant_table() {
582                            instant_table_ids.insert(table_info.table_id());
583                        }
584                        table_infos.insert(table_info.table_id(), table.table_info());
585                    }
586                }
587                if !alter_tables.is_empty() {
588                    // Alter logical tables in batch.
589                    statement_executor
590                        .alter_logical_tables(alter_tables, ctx.clone())
591                        .await?;
592                }
593            }
594            AutoCreateTableType::Physical
595            | AutoCreateTableType::Log
596            | AutoCreateTableType::LastNonNull => {
597                // note that auto create table shouldn't be ttl instant table
598                // for it's a very unexpected behavior and should be set by user explicitly
599                for create_table in create_tables {
600                    let table = self
601                        .create_physical_table(create_table, None, ctx, statement_executor)
602                        .await?;
603                    let table_info = table.table_info();
604                    if table_info.is_ttl_instant_table() {
605                        instant_table_ids.insert(table_info.table_id());
606                    }
607                    table_infos.insert(table_info.table_id(), table.table_info());
608                }
609                for alter_expr in alter_tables.into_iter() {
610                    statement_executor
611                        .alter_table_inner(alter_expr, ctx.clone())
612                        .await?;
613                }
614            }
615
616            AutoCreateTableType::Trace => {
617                let trace_table_name = ctx
618                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
619                    .unwrap_or(TRACE_TABLE_NAME);
620
621                // note that auto create table shouldn't be ttl instant table
622                // for it's a very unexpected behavior and should be set by user explicitly
623                for mut create_table in create_tables {
624                    if create_table.table_name == trace_services_table_name(trace_table_name)
625                        || create_table.table_name == trace_operations_table_name(trace_table_name)
626                    {
627                        // Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
628                        create_table
629                            .table_options
630                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
631                        // Remove `ttl` key from table options if it exists
632                        create_table.table_options.remove(TTL_KEY);
633
634                        let table = self
635                            .create_physical_table(create_table, None, ctx, statement_executor)
636                            .await?;
637                        let table_info = table.table_info();
638                        if table_info.is_ttl_instant_table() {
639                            instant_table_ids.insert(table_info.table_id());
640                        }
641                        table_infos.insert(table_info.table_id(), table.table_info());
642                    } else {
643                        // prebuilt partition rules for uuid data: see the function
644                        // for more information
645                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
646                            .context(CreatePartitionRulesSnafu)?;
647                        // add skip index to
648                        // - trace_id: when searching by trace id
649                        // - parent_span_id: when searching root span
650                        // - span_name: when searching certain types of span
651                        let index_columns =
652                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
653                        for index_column in index_columns {
654                            if let Some(col) = create_table
655                                .column_defs
656                                .iter_mut()
657                                .find(|c| c.name == index_column)
658                            {
659                                col.options =
660                                    options_from_skipping(&SkippingIndexOptions::default())
661                                        .context(ColumnOptionsSnafu)?;
662                            } else {
663                                warn!(
664                                    "Column {} not found when creating index for trace table: {}.",
665                                    index_column, create_table.table_name
666                                );
667                            }
668                        }
669
670                        // use table_options to mark table model version
671                        create_table.table_options.insert(
672                            TABLE_DATA_MODEL.to_string(),
673                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
674                        );
675
676                        let table = self
677                            .create_physical_table(
678                                create_table,
679                                Some(partitions),
680                                ctx,
681                                statement_executor,
682                            )
683                            .await?;
684                        let table_info = table.table_info();
685                        if table_info.is_ttl_instant_table() {
686                            instant_table_ids.insert(table_info.table_id());
687                        }
688                        table_infos.insert(table_info.table_id(), table.table_info());
689                    }
690                }
691                for alter_expr in alter_tables.into_iter() {
692                    statement_executor
693                        .alter_table_inner(alter_expr, ctx.clone())
694                        .await?;
695                }
696            }
697        }
698
699        Ok(CreateAlterTableResult {
700            instant_table_ids,
701            table_infos,
702        })
703    }
704
705    async fn create_physical_table_on_demand(
706        &self,
707        ctx: &QueryContextRef,
708        physical_table: String,
709        statement_executor: &StatementExecutor,
710    ) -> Result<()> {
711        let catalog_name = ctx.current_catalog();
712        let schema_name = ctx.current_schema();
713
714        // check if exist
715        if self
716            .get_table(catalog_name, &schema_name, &physical_table)
717            .await?
718            .is_some()
719        {
720            return Ok(());
721        }
722
723        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
724        info!("Physical metric table `{table_reference}` does not exist, try creating table");
725
726        // schema with timestamp and field column
727        let default_schema = vec![
728            ColumnSchema {
729                column_name: greptime_timestamp().to_string(),
730                datatype: ColumnDataType::TimestampMillisecond as _,
731                semantic_type: SemanticType::Timestamp as _,
732                datatype_extension: None,
733                options: None,
734            },
735            ColumnSchema {
736                column_name: greptime_value().to_string(),
737                datatype: ColumnDataType::Float64 as _,
738                semantic_type: SemanticType::Field as _,
739                datatype_extension: None,
740                options: None,
741            },
742        ];
743        let create_table_expr =
744            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
745
746        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
747        create_table_expr
748            .table_options
749            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
750
751        // create physical table
752        let res = statement_executor
753            .create_table_inner(create_table_expr, None, ctx.clone())
754            .await;
755
756        match res {
757            Ok(_) => {
758                info!("Successfully created table {table_reference}",);
759                Ok(())
760            }
761            Err(err) => {
762                error!(err; "Failed to create table {table_reference}");
763                Err(err)
764            }
765        }
766    }
767
768    async fn get_table(
769        &self,
770        catalog: &str,
771        schema: &str,
772        table: &str,
773    ) -> Result<Option<TableRef>> {
774        self.catalog_manager
775            .table(catalog, schema, table, None)
776            .await
777            .context(CatalogSnafu)
778    }
779
780    fn get_create_table_expr_on_demand(
781        &self,
782        req: &RowInsertRequest,
783        create_type: &AutoCreateTableType,
784        ctx: &QueryContextRef,
785    ) -> Result<CreateTableExpr> {
786        let mut table_options = std::collections::HashMap::with_capacity(4);
787        fill_table_options_for_create(&mut table_options, create_type, ctx);
788
789        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
790            // engine should be metric engine when creating logical tables.
791            METRIC_ENGINE_NAME
792        } else {
793            default_engine()
794        };
795
796        let schema = ctx.current_schema();
797        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
798        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
799        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
800        let mut create_table_expr =
801            build_create_table_expr(&table_ref, request_schema, engine_name)?;
802
803        info!("Table `{table_ref}` does not exist, try creating table");
804        create_table_expr.table_options.extend(table_options);
805        Ok(create_table_expr)
806    }
807
808    /// Returns an alter table expression if it finds new columns in the request.
809    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
810    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
811    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
812    /// for more details.
813    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
814    /// input `req`.
815    fn get_alter_table_expr_on_demand(
816        &self,
817        req: &mut RowInsertRequest,
818        table: &TableRef,
819        ctx: &QueryContextRef,
820        accommodate_existing_schema: bool,
821        is_single_value: bool,
822    ) -> Result<Option<AlterTableExpr>> {
823        let catalog_name = ctx.current_catalog();
824        let schema_name = ctx.current_schema();
825        let table_name = table.table_info().name.clone();
826
827        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
828        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
829        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
830        let Some(mut add_columns) = add_columns else {
831            return Ok(None);
832        };
833
834        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
835        if accommodate_existing_schema {
836            let table_schema = table.schema();
837            // Find timestamp column name
838            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
839            // Find field column name if there is only one and `is_single_value` is true.
840            let mut field_col_name = None;
841            if is_single_value {
842                let mut multiple_field_cols = false;
843                table.field_columns().for_each(|col| {
844                    if field_col_name.is_none() {
845                        field_col_name = Some(col.name.clone());
846                    } else {
847                        multiple_field_cols = true;
848                    }
849                });
850                if multiple_field_cols {
851                    field_col_name = None;
852                }
853            }
854
855            // Update column name in request schema for Timestamp/Field columns
856            if let Some(rows) = req.rows.as_mut() {
857                for col in &mut rows.schema {
858                    match col.semantic_type {
859                        x if x == SemanticType::Timestamp as i32 => {
860                            if let Some(ref ts_name) = ts_col_name
861                                && col.column_name != *ts_name
862                            {
863                                col.column_name = ts_name.clone();
864                            }
865                        }
866                        x if x == SemanticType::Field as i32 => {
867                            if let Some(ref field_name) = field_col_name
868                                && col.column_name != *field_name
869                            {
870                                col.column_name = field_name.clone();
871                            }
872                        }
873                        _ => {}
874                    }
875                }
876            }
877
878            // Only keep columns that are tags or non-single field.
879            add_columns.add_columns.retain(|col| {
880                let def = col.column_def.as_ref().unwrap();
881                def.semantic_type == SemanticType::Tag as i32
882                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
883            });
884
885            if add_columns.add_columns.is_empty() {
886                return Ok(None);
887            }
888        }
889
890        Ok(Some(AlterTableExpr {
891            catalog_name: catalog_name.to_string(),
892            schema_name: schema_name.clone(),
893            table_name: table_name.clone(),
894            kind: Some(Kind::AddColumns(add_columns)),
895        }))
896    }
897
898    /// Creates a table with options.
899    async fn create_physical_table(
900        &self,
901        mut create_table_expr: CreateTableExpr,
902        partitions: Option<Partitions>,
903        ctx: &QueryContextRef,
904        statement_executor: &StatementExecutor,
905    ) -> Result<TableRef> {
906        {
907            let table_ref = TableReference::full(
908                &create_table_expr.catalog_name,
909                &create_table_expr.schema_name,
910                &create_table_expr.table_name,
911            );
912
913            info!("Table `{table_ref}` does not exist, try creating table");
914        }
915        let res = statement_executor
916            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
917            .await;
918
919        let table_ref = TableReference::full(
920            &create_table_expr.catalog_name,
921            &create_table_expr.schema_name,
922            &create_table_expr.table_name,
923        );
924
925        match res {
926            Ok(table) => {
927                info!(
928                    "Successfully created table {} with options: {:?}",
929                    table_ref, create_table_expr.table_options,
930                );
931                Ok(table)
932            }
933            Err(err) => {
934                error!(err; "Failed to create table {}", table_ref);
935                Err(err)
936            }
937        }
938    }
939
940    async fn create_logical_tables(
941        &self,
942        create_table_exprs: Vec<CreateTableExpr>,
943        ctx: &QueryContextRef,
944        statement_executor: &StatementExecutor,
945    ) -> Result<Vec<TableRef>> {
946        let res = statement_executor
947            .create_logical_tables(&create_table_exprs, ctx.clone())
948            .await;
949
950        match res {
951            Ok(res) => {
952                info!("Successfully created logical tables");
953                Ok(res)
954            }
955            Err(err) => {
956                let failed_tables = create_table_exprs
957                    .into_iter()
958                    .map(|expr| {
959                        format!(
960                            "{}.{}.{}",
961                            expr.catalog_name, expr.schema_name, expr.table_name
962                        )
963                    })
964                    .collect::<Vec<_>>();
965                error!(
966                    err;
967                    "Failed to create logical tables {:?}",
968                    failed_tables
969                );
970                Err(err)
971            }
972        }
973    }
974
975    pub fn node_manager(&self) -> &NodeManagerRef {
976        &self.node_manager
977    }
978
979    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
980        &self.partition_manager
981    }
982}
983
984fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
985    for request in &requests.inserts {
986        let rows = request.rows.as_ref().unwrap();
987        let column_count = rows.schema.len();
988        rows.rows.iter().try_for_each(|r| {
989            ensure!(
990                r.values.len() == column_count,
991                InvalidInsertRequestSnafu {
992                    reason: format!(
993                        "column count mismatch, columns: {}, values: {}",
994                        column_count,
995                        r.values.len()
996                    )
997                }
998            );
999            Ok(())
1000        })?;
1001    }
1002    Ok(())
1003}
1004
1005/// Fill table options for a new table by create type.
1006pub fn fill_table_options_for_create(
1007    table_options: &mut std::collections::HashMap<String, String>,
1008    create_type: &AutoCreateTableType,
1009    ctx: &QueryContextRef,
1010) {
1011    for key in VALID_TABLE_OPTION_KEYS {
1012        if let Some(value) = ctx.extension(key) {
1013            table_options.insert(key.to_string(), value.to_string());
1014        }
1015    }
1016
1017    match create_type {
1018        AutoCreateTableType::Logical(physical_table) => {
1019            table_options.insert(
1020                LOGICAL_TABLE_METADATA_KEY.to_string(),
1021                physical_table.clone(),
1022            );
1023        }
1024        AutoCreateTableType::Physical => {
1025            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1026                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1027            }
1028            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1029                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1030            }
1031            if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1032                table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1033                // We need to set the compaction type explicitly.
1034                table_options.insert(
1035                    COMPACTION_TYPE.to_string(),
1036                    COMPACTION_TYPE_TWCS.to_string(),
1037                );
1038            }
1039        }
1040        // Set append_mode to true for log table.
1041        // because log tables should keep rows with the same ts and tags.
1042        AutoCreateTableType::Log => {
1043            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1044        }
1045        AutoCreateTableType::LastNonNull => {
1046            table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1047        }
1048        AutoCreateTableType::Trace => {
1049            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1050        }
1051    }
1052}
1053
1054pub fn build_create_table_expr(
1055    table: &TableReference,
1056    request_schema: &[ColumnSchema],
1057    engine: &str,
1058) -> Result<CreateTableExpr> {
1059    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1060}
1061
1062/// Result of `create_or_alter_tables_on_demand`.
1063struct CreateAlterTableResult {
1064    /// table ids of ttl=instant tables.
1065    instant_table_ids: HashSet<TableId>,
1066    /// Table Info of the created tables.
1067    table_infos: HashMap<TableId, Arc<TableInfo>>,
1068}
1069
1070struct FlowMirrorTask {
1071    requests: HashMap<Peer, RegionInsertRequests>,
1072    num_rows: usize,
1073}
1074
1075impl FlowMirrorTask {
1076    async fn new(
1077        cache: &TableFlownodeSetCacheRef,
1078        requests: impl Iterator<Item = &RegionInsertRequest>,
1079    ) -> Result<Self> {
1080        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1081            HashMap::new();
1082        let mut num_rows = 0;
1083
1084        for req in requests {
1085            let table_id = RegionId::from_u64(req.region_id).table_id();
1086            match src_table_reqs.get_mut(&table_id) {
1087                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1088                // already know this is not source table
1089                Some(None) => continue,
1090                _ => {
1091                    // dedup peers
1092                    let peers = cache
1093                        .get(table_id)
1094                        .await
1095                        .context(RequestInsertsSnafu)?
1096                        .unwrap_or_default()
1097                        .values()
1098                        .cloned()
1099                        .collect::<HashSet<_>>()
1100                        .into_iter()
1101                        .collect::<Vec<_>>();
1102
1103                    if !peers.is_empty() {
1104                        let mut reqs = RegionInsertRequests::default();
1105                        reqs.requests.push(req.clone());
1106                        num_rows += reqs
1107                            .requests
1108                            .iter()
1109                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1110                            .sum::<usize>();
1111                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1112                    } else {
1113                        // insert a empty entry to avoid repeat query
1114                        src_table_reqs.insert(table_id, None);
1115                    }
1116                }
1117            }
1118        }
1119
1120        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1121
1122        for (_table_id, (peers, reqs)) in src_table_reqs
1123            .into_iter()
1124            .filter_map(|(k, v)| v.map(|v| (k, v)))
1125        {
1126            if peers.len() == 1 {
1127                // fast path, zero copy
1128                inserts
1129                    .entry(peers[0].clone())
1130                    .or_default()
1131                    .requests
1132                    .extend(reqs.requests);
1133                continue;
1134            } else {
1135                // TODO(discord9): need to split requests to multiple flownodes
1136                for flownode in peers {
1137                    inserts
1138                        .entry(flownode.clone())
1139                        .or_default()
1140                        .requests
1141                        .extend(reqs.requests.clone());
1142                }
1143            }
1144        }
1145
1146        Ok(Self {
1147            requests: inserts,
1148            num_rows,
1149        })
1150    }
1151
1152    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1153        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1154        for (peer, inserts) in self.requests {
1155            let node_manager = node_manager.clone();
1156            common_runtime::spawn_global(async move {
1157                let result = node_manager
1158                    .flownode(&peer)
1159                    .await
1160                    .handle_inserts(inserts)
1161                    .await
1162                    .context(RequestInsertsSnafu);
1163
1164                match result {
1165                    Ok(resp) => {
1166                        let affected_rows = resp.affected_rows;
1167                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1168                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1169                    }
1170                    Err(err) => {
1171                        error!(err; "Failed to insert data into flownode {}", peer);
1172                    }
1173                }
1174            });
1175        }
1176
1177        Ok(())
1178    }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    use std::sync::Arc;
1184
1185    use api::v1::helper::{field_column_schema, time_index_column_schema};
1186    use api::v1::{RowInsertRequest, Rows, Value};
1187    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1188    use common_meta::cache::new_table_flownode_set_cache;
1189    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1190    use common_meta::test_util::MockDatanodeManager;
1191    use datatypes::data_type::ConcreteDataType;
1192    use datatypes::schema::ColumnSchema;
1193    use moka::future::Cache;
1194    use session::context::QueryContext;
1195    use table::TableRef;
1196    use table::dist_table::DummyDataSource;
1197    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1198
1199    use super::*;
1200    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1201
1202    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1203        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1204            ColumnSchema::new(
1205                ts_name,
1206                ConcreteDataType::timestamp_millisecond_datatype(),
1207                false,
1208            )
1209            .with_time_index(true),
1210            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1211        ])
1212        .unwrap()
1213        .build()
1214        .unwrap();
1215        let meta = TableMetaBuilder::empty()
1216            .schema(Arc::new(schema))
1217            .primary_key_indices(vec![])
1218            .value_indices(vec![1])
1219            .engine("mito")
1220            .next_column_id(0)
1221            .options(Default::default())
1222            .created_on(Default::default())
1223            .region_numbers(vec![0])
1224            .build()
1225            .unwrap();
1226        let info = Arc::new(
1227            TableInfoBuilder::default()
1228                .table_id(1)
1229                .table_version(0)
1230                .name("test_table")
1231                .schema_name(DEFAULT_SCHEMA_NAME)
1232                .catalog_name(DEFAULT_CATALOG_NAME)
1233                .desc(None)
1234                .table_type(TableType::Base)
1235                .meta(meta)
1236                .build()
1237                .unwrap(),
1238        );
1239        Arc::new(table::Table::new(
1240            info,
1241            table::metadata::FilterPushDownType::Unsupported,
1242            Arc::new(DummyDataSource),
1243        ))
1244    }
1245
1246    #[tokio::test]
1247    async fn test_accommodate_existing_schema_logic() {
1248        let ts_name = "my_ts";
1249        let field_name = "my_field";
1250        let table = make_table_ref_with_schema(ts_name, field_name);
1251
1252        // The request uses different names for timestamp and field columns
1253        let mut req = RowInsertRequest {
1254            table_name: "test_table".to_string(),
1255            rows: Some(Rows {
1256                schema: vec![
1257                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1258                    field_column_schema("field_wrong", ColumnDataType::Float64),
1259                ],
1260                rows: vec![api::v1::Row {
1261                    values: vec![Value::default(), Value::default()],
1262                }],
1263            }),
1264        };
1265        let ctx = Arc::new(QueryContext::with(
1266            DEFAULT_CATALOG_NAME,
1267            DEFAULT_SCHEMA_NAME,
1268        ));
1269
1270        let kv_backend = prepare_mocked_backend().await;
1271        let inserter = Inserter::new(
1272            catalog::memory::MemoryCatalogManager::new(),
1273            create_partition_rule_manager(kv_backend.clone()).await,
1274            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1275            Arc::new(new_table_flownode_set_cache(
1276                String::new(),
1277                Cache::new(100),
1278                kv_backend.clone(),
1279            )),
1280        );
1281        let alter_expr = inserter
1282            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1283            .unwrap();
1284        assert!(alter_expr.is_none());
1285
1286        // The request's schema should have updated names for timestamp and field columns
1287        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1288        assert_eq!(req_schema[0].column_name, ts_name);
1289        assert_eq!(req_schema[1].column_name, field_name);
1290    }
1291}