operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32    TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33    trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57    APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TTL_KEY,
58    TWCS_TIME_WINDOW,
59};
60use store_api::storage::{RegionId, TableId};
61use table::TableRef;
62use table::metadata::TableInfo;
63use table::requests::{
64    AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
65    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
66};
67use table::table_reference::TableReference;
68
69use crate::error::{
70    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
71    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
72};
73use crate::expr_helper;
74use crate::region_req_factory::RegionRequestFactory;
75use crate::req_convert::common::preprocess_row_insert_requests;
76use crate::req_convert::insert::{
77    ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
78};
79use crate::statement::StatementExecutor;
80
81pub struct Inserter {
82    catalog_manager: CatalogManagerRef,
83    pub(crate) partition_manager: PartitionRuleManagerRef,
84    pub(crate) node_manager: NodeManagerRef,
85    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
86}
87
88pub type InserterRef = Arc<Inserter>;
89
90/// Hint for the table type to create automatically.
91#[derive(Clone)]
92pub enum AutoCreateTableType {
93    /// A logical table with the physical table name.
94    Logical(String),
95    /// A physical table.
96    Physical,
97    /// A log table which is append-only.
98    Log,
99    /// A table that merges rows by `last_non_null` strategy.
100    LastNonNull,
101    /// Create table that build index and default partition rules on trace_id
102    Trace,
103}
104
105impl AutoCreateTableType {
106    fn as_str(&self) -> &'static str {
107        match self {
108            AutoCreateTableType::Logical(_) => "logical",
109            AutoCreateTableType::Physical => "physical",
110            AutoCreateTableType::Log => "log",
111            AutoCreateTableType::LastNonNull => "last_non_null",
112            AutoCreateTableType::Trace => "trace",
113        }
114    }
115}
116
117/// Split insert requests into normal and instant requests.
118///
119/// Where instant requests are requests with ttl=instant,
120/// and normal requests are requests with ttl set to other values.
121///
122/// This is used to split requests for different processing.
123#[derive(Clone)]
124pub struct InstantAndNormalInsertRequests {
125    /// Requests with normal ttl.
126    pub normal_requests: RegionInsertRequests,
127    /// Requests with ttl=instant.
128    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
129    pub instant_requests: RegionInsertRequests,
130}
131
132impl Inserter {
133    pub fn new(
134        catalog_manager: CatalogManagerRef,
135        partition_manager: PartitionRuleManagerRef,
136        node_manager: NodeManagerRef,
137        table_flownode_set_cache: TableFlownodeSetCacheRef,
138    ) -> Self {
139        Self {
140            catalog_manager,
141            partition_manager,
142            node_manager,
143            table_flownode_set_cache,
144        }
145    }
146
147    pub async fn handle_column_inserts(
148        &self,
149        requests: InsertRequests,
150        ctx: QueryContextRef,
151        statement_executor: &StatementExecutor,
152    ) -> Result<Output> {
153        let row_inserts = ColumnToRow::convert(requests)?;
154        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
155            .await
156    }
157
158    /// Handles row inserts request and creates a physical table on demand.
159    pub async fn handle_row_inserts(
160        &self,
161        mut requests: RowInsertRequests,
162        ctx: QueryContextRef,
163        statement_executor: &StatementExecutor,
164        accommodate_existing_schema: bool,
165        is_single_value: bool,
166    ) -> Result<Output> {
167        preprocess_row_insert_requests(&mut requests.inserts)?;
168        self.handle_row_inserts_with_create_type(
169            requests,
170            ctx,
171            statement_executor,
172            AutoCreateTableType::Physical,
173            accommodate_existing_schema,
174            is_single_value,
175        )
176        .await
177    }
178
179    /// Handles row inserts request and creates a log table on demand.
180    pub async fn handle_log_inserts(
181        &self,
182        requests: RowInsertRequests,
183        ctx: QueryContextRef,
184        statement_executor: &StatementExecutor,
185    ) -> Result<Output> {
186        self.handle_row_inserts_with_create_type(
187            requests,
188            ctx,
189            statement_executor,
190            AutoCreateTableType::Log,
191            false,
192            false,
193        )
194        .await
195    }
196
197    pub async fn handle_trace_inserts(
198        &self,
199        requests: RowInsertRequests,
200        ctx: QueryContextRef,
201        statement_executor: &StatementExecutor,
202    ) -> Result<Output> {
203        self.handle_row_inserts_with_create_type(
204            requests,
205            ctx,
206            statement_executor,
207            AutoCreateTableType::Trace,
208            false,
209            false,
210        )
211        .await
212    }
213
214    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
215    pub async fn handle_last_non_null_inserts(
216        &self,
217        requests: RowInsertRequests,
218        ctx: QueryContextRef,
219        statement_executor: &StatementExecutor,
220        accommodate_existing_schema: bool,
221        is_single_value: bool,
222    ) -> Result<Output> {
223        self.handle_row_inserts_with_create_type(
224            requests,
225            ctx,
226            statement_executor,
227            AutoCreateTableType::LastNonNull,
228            accommodate_existing_schema,
229            is_single_value,
230        )
231        .await
232    }
233
234    /// Handles row inserts request with specified [AutoCreateTableType].
235    async fn handle_row_inserts_with_create_type(
236        &self,
237        mut requests: RowInsertRequests,
238        ctx: QueryContextRef,
239        statement_executor: &StatementExecutor,
240        create_type: AutoCreateTableType,
241        accommodate_existing_schema: bool,
242        is_single_value: bool,
243    ) -> Result<Output> {
244        // remove empty requests
245        requests.inserts.retain(|req| {
246            req.rows
247                .as_ref()
248                .map(|r| !r.rows.is_empty())
249                .unwrap_or_default()
250        });
251        validate_column_count_match(&requests)?;
252
253        let CreateAlterTableResult {
254            instant_table_ids,
255            table_infos,
256        } = self
257            .create_or_alter_tables_on_demand(
258                &mut requests,
259                &ctx,
260                create_type,
261                statement_executor,
262                accommodate_existing_schema,
263                is_single_value,
264            )
265            .await?;
266
267        let name_to_info = table_infos
268            .values()
269            .map(|info| (info.name.clone(), info.clone()))
270            .collect::<HashMap<_, _>>();
271        let inserts = RowToRegion::new(
272            name_to_info,
273            instant_table_ids,
274            self.partition_manager.as_ref(),
275        )
276        .convert(requests)
277        .await?;
278
279        self.do_request(inserts, &table_infos, &ctx).await
280    }
281
282    /// Handles row inserts request with metric engine.
283    pub async fn handle_metric_row_inserts(
284        &self,
285        mut requests: RowInsertRequests,
286        ctx: QueryContextRef,
287        statement_executor: &StatementExecutor,
288        physical_table: String,
289    ) -> Result<Output> {
290        // remove empty requests
291        requests.inserts.retain(|req| {
292            req.rows
293                .as_ref()
294                .map(|r| !r.rows.is_empty())
295                .unwrap_or_default()
296        });
297        validate_column_count_match(&requests)?;
298
299        // check and create physical table
300        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
301            .await?;
302
303        // check and create logical tables
304        let CreateAlterTableResult {
305            instant_table_ids,
306            table_infos,
307        } = self
308            .create_or_alter_tables_on_demand(
309                &mut requests,
310                &ctx,
311                AutoCreateTableType::Logical(physical_table.clone()),
312                statement_executor,
313                true,
314                true,
315            )
316            .await?;
317        let name_to_info = table_infos
318            .values()
319            .map(|info| (info.name.clone(), info.clone()))
320            .collect::<HashMap<_, _>>();
321        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
322            .convert(requests)
323            .await?;
324
325        self.do_request(inserts, &table_infos, &ctx).await
326    }
327
328    pub async fn handle_table_insert(
329        &self,
330        request: TableInsertRequest,
331        ctx: QueryContextRef,
332    ) -> Result<Output> {
333        let catalog = request.catalog_name.as_str();
334        let schema = request.schema_name.as_str();
335        let table_name = request.table_name.as_str();
336        let table = self.get_table(catalog, schema, table_name).await?;
337        let table = table.with_context(|| TableNotFoundSnafu {
338            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
339        })?;
340        let table_info = table.table_info();
341
342        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
343            .convert(request)
344            .await?;
345
346        let table_infos =
347            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
348
349        self.do_request(inserts, &table_infos, &ctx).await
350    }
351
352    pub async fn handle_statement_insert(
353        &self,
354        insert: &Insert,
355        ctx: &QueryContextRef,
356    ) -> Result<Output> {
357        let (inserts, table_info) =
358            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
359                .convert(insert, ctx)
360                .await?;
361
362        let table_infos =
363            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
364
365        self.do_request(inserts, &table_infos, ctx).await
366    }
367}
368
369impl Inserter {
370    async fn do_request(
371        &self,
372        requests: InstantAndNormalInsertRequests,
373        table_infos: &HashMap<TableId, Arc<TableInfo>>,
374        ctx: &QueryContextRef,
375    ) -> Result<Output> {
376        // Fill impure default values in the request
377        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
378
379        let write_cost = write_meter!(
380            ctx.current_catalog(),
381            ctx.current_schema(),
382            requests,
383            ctx.channel() as u8
384        );
385        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
386            tracing_context: TracingContext::from_current_span().to_w3c(),
387            dbname: ctx.get_db_string(),
388            ..Default::default()
389        });
390
391        let InstantAndNormalInsertRequests {
392            normal_requests,
393            instant_requests,
394        } = requests;
395
396        // Mirror requests for source table to flownode asynchronously
397        let flow_mirror_task = FlowMirrorTask::new(
398            &self.table_flownode_set_cache,
399            normal_requests
400                .requests
401                .iter()
402                .chain(instant_requests.requests.iter()),
403        )
404        .await?;
405        flow_mirror_task.detach(self.node_manager.clone())?;
406
407        // Write requests to datanode and wait for response
408        let write_tasks = self
409            .group_requests_by_peer(normal_requests)
410            .await?
411            .into_iter()
412            .map(|(peer, inserts)| {
413                let node_manager = self.node_manager.clone();
414                let request = request_factory.build_insert(inserts);
415                common_runtime::spawn_global(async move {
416                    node_manager
417                        .datanode(&peer)
418                        .await
419                        .handle(request)
420                        .await
421                        .context(RequestInsertsSnafu)
422                })
423            });
424        let results = future::try_join_all(write_tasks)
425            .await
426            .context(JoinTaskSnafu)?;
427        let affected_rows = results
428            .into_iter()
429            .map(|resp| resp.map(|r| r.affected_rows))
430            .sum::<Result<AffectedRows>>()?;
431        crate::metrics::DIST_INGEST_ROW_COUNT
432            .with_label_values(&[ctx.get_db_string().as_str()])
433            .inc_by(affected_rows as u64);
434        Ok(Output::new(
435            OutputData::AffectedRows(affected_rows),
436            OutputMeta::new_with_cost(write_cost as _),
437        ))
438    }
439
440    async fn group_requests_by_peer(
441        &self,
442        requests: RegionInsertRequests,
443    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
444        // group by region ids first to reduce repeatedly call `find_region_leader`
445        // TODO(discord9): determine if a addition clone is worth it
446        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
447        for req in requests.requests {
448            let region_id = RegionId::from_u64(req.region_id);
449            requests_per_region
450                .entry(region_id)
451                .or_default()
452                .requests
453                .push(req);
454        }
455
456        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
457
458        for (region_id, reqs) in requests_per_region {
459            let peer = self
460                .partition_manager
461                .find_region_leader(region_id)
462                .await
463                .context(FindRegionLeaderSnafu)?;
464            inserts
465                .entry(peer)
466                .or_default()
467                .requests
468                .extend(reqs.requests);
469        }
470
471        Ok(inserts)
472    }
473
474    /// Creates or alter tables on demand:
475    /// - if table does not exist, create table by inferred CreateExpr
476    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
477    ///
478    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
479    /// This mapping is used in the conversion of RowToRegion.
480    ///
481    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
482    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
483    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
484    /// remote write. This will modify the `RowInsertRequests` in place.
485    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
486    async fn create_or_alter_tables_on_demand(
487        &self,
488        requests: &mut RowInsertRequests,
489        ctx: &QueryContextRef,
490        auto_create_table_type: AutoCreateTableType,
491        statement_executor: &StatementExecutor,
492        accommodate_existing_schema: bool,
493        is_single_value: bool,
494    ) -> Result<CreateAlterTableResult> {
495        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
496            .with_label_values(&[auto_create_table_type.as_str()])
497            .start_timer();
498
499        let catalog = ctx.current_catalog();
500        let schema = ctx.current_schema();
501
502        let mut table_infos = HashMap::new();
503        // If `auto_create_table` hint is disabled, skip creating/altering tables.
504        let auto_create_table_hint = ctx
505            .extension(AUTO_CREATE_TABLE_KEY)
506            .map(|v| v.parse::<bool>())
507            .transpose()
508            .map_err(|_| {
509                InvalidInsertRequestSnafu {
510                    reason: "`auto_create_table` hint must be a boolean",
511                }
512                .build()
513            })?
514            .unwrap_or(true);
515        if !auto_create_table_hint {
516            let mut instant_table_ids = HashSet::new();
517            for req in &requests.inserts {
518                let table = self
519                    .get_table(catalog, &schema, &req.table_name)
520                    .await?
521                    .context(InvalidInsertRequestSnafu {
522                        reason: format!(
523                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
524                            req.table_name
525                        ),
526                    })?;
527                let table_info = table.table_info();
528                if table_info.is_ttl_instant_table() {
529                    instant_table_ids.insert(table_info.table_id());
530                }
531                table_infos.insert(table_info.table_id(), table.table_info());
532            }
533            let ret = CreateAlterTableResult {
534                instant_table_ids,
535                table_infos,
536            };
537            return Ok(ret);
538        }
539
540        let mut create_tables = vec![];
541        let mut alter_tables = vec![];
542        let mut instant_table_ids = HashSet::new();
543
544        for req in &mut requests.inserts {
545            match self.get_table(catalog, &schema, &req.table_name).await? {
546                Some(table) => {
547                    let table_info = table.table_info();
548                    if table_info.is_ttl_instant_table() {
549                        instant_table_ids.insert(table_info.table_id());
550                    }
551                    table_infos.insert(table_info.table_id(), table.table_info());
552                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
553                        req,
554                        &table,
555                        ctx,
556                        accommodate_existing_schema,
557                        is_single_value,
558                    )? {
559                        alter_tables.push(alter_expr);
560                    }
561                }
562                None => {
563                    let create_expr =
564                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
565                    create_tables.push(create_expr);
566                }
567            }
568        }
569
570        match auto_create_table_type {
571            AutoCreateTableType::Logical(_) => {
572                if !create_tables.is_empty() {
573                    // Creates logical tables in batch.
574                    let tables = self
575                        .create_logical_tables(create_tables, ctx, statement_executor)
576                        .await?;
577
578                    for table in tables {
579                        let table_info = table.table_info();
580                        if table_info.is_ttl_instant_table() {
581                            instant_table_ids.insert(table_info.table_id());
582                        }
583                        table_infos.insert(table_info.table_id(), table.table_info());
584                    }
585                }
586                if !alter_tables.is_empty() {
587                    // Alter logical tables in batch.
588                    statement_executor
589                        .alter_logical_tables(alter_tables, ctx.clone())
590                        .await?;
591                }
592            }
593            AutoCreateTableType::Physical
594            | AutoCreateTableType::Log
595            | AutoCreateTableType::LastNonNull => {
596                // note that auto create table shouldn't be ttl instant table
597                // for it's a very unexpected behavior and should be set by user explicitly
598                for create_table in create_tables {
599                    let table = self
600                        .create_physical_table(create_table, None, ctx, statement_executor)
601                        .await?;
602                    let table_info = table.table_info();
603                    if table_info.is_ttl_instant_table() {
604                        instant_table_ids.insert(table_info.table_id());
605                    }
606                    table_infos.insert(table_info.table_id(), table.table_info());
607                }
608                for alter_expr in alter_tables.into_iter() {
609                    statement_executor
610                        .alter_table_inner(alter_expr, ctx.clone())
611                        .await?;
612                }
613            }
614
615            AutoCreateTableType::Trace => {
616                let trace_table_name = ctx
617                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
618                    .unwrap_or(TRACE_TABLE_NAME);
619
620                // note that auto create table shouldn't be ttl instant table
621                // for it's a very unexpected behavior and should be set by user explicitly
622                for mut create_table in create_tables {
623                    if create_table.table_name == trace_services_table_name(trace_table_name)
624                        || create_table.table_name == trace_operations_table_name(trace_table_name)
625                    {
626                        // Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
627                        create_table
628                            .table_options
629                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
630                        // Remove `ttl` key from table options if it exists
631                        create_table.table_options.remove(TTL_KEY);
632
633                        let table = self
634                            .create_physical_table(create_table, None, ctx, statement_executor)
635                            .await?;
636                        let table_info = table.table_info();
637                        if table_info.is_ttl_instant_table() {
638                            instant_table_ids.insert(table_info.table_id());
639                        }
640                        table_infos.insert(table_info.table_id(), table.table_info());
641                    } else {
642                        // prebuilt partition rules for uuid data: see the function
643                        // for more information
644                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
645                            .context(CreatePartitionRulesSnafu)?;
646                        // add skip index to
647                        // - trace_id: when searching by trace id
648                        // - parent_span_id: when searching root span
649                        // - span_name: when searching certain types of span
650                        let index_columns =
651                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
652                        for index_column in index_columns {
653                            if let Some(col) = create_table
654                                .column_defs
655                                .iter_mut()
656                                .find(|c| c.name == index_column)
657                            {
658                                col.options =
659                                    options_from_skipping(&SkippingIndexOptions::default())
660                                        .context(ColumnOptionsSnafu)?;
661                            } else {
662                                warn!(
663                                    "Column {} not found when creating index for trace table: {}.",
664                                    index_column, create_table.table_name
665                                );
666                            }
667                        }
668
669                        // use table_options to mark table model version
670                        create_table.table_options.insert(
671                            TABLE_DATA_MODEL.to_string(),
672                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
673                        );
674
675                        let table = self
676                            .create_physical_table(
677                                create_table,
678                                Some(partitions),
679                                ctx,
680                                statement_executor,
681                            )
682                            .await?;
683                        let table_info = table.table_info();
684                        if table_info.is_ttl_instant_table() {
685                            instant_table_ids.insert(table_info.table_id());
686                        }
687                        table_infos.insert(table_info.table_id(), table.table_info());
688                    }
689                }
690                for alter_expr in alter_tables.into_iter() {
691                    statement_executor
692                        .alter_table_inner(alter_expr, ctx.clone())
693                        .await?;
694                }
695            }
696        }
697
698        Ok(CreateAlterTableResult {
699            instant_table_ids,
700            table_infos,
701        })
702    }
703
704    async fn create_physical_table_on_demand(
705        &self,
706        ctx: &QueryContextRef,
707        physical_table: String,
708        statement_executor: &StatementExecutor,
709    ) -> Result<()> {
710        let catalog_name = ctx.current_catalog();
711        let schema_name = ctx.current_schema();
712
713        // check if exist
714        if self
715            .get_table(catalog_name, &schema_name, &physical_table)
716            .await?
717            .is_some()
718        {
719            return Ok(());
720        }
721
722        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
723        info!("Physical metric table `{table_reference}` does not exist, try creating table");
724
725        // schema with timestamp and field column
726        let default_schema = vec![
727            ColumnSchema {
728                column_name: greptime_timestamp().to_string(),
729                datatype: ColumnDataType::TimestampMillisecond as _,
730                semantic_type: SemanticType::Timestamp as _,
731                datatype_extension: None,
732                options: None,
733            },
734            ColumnSchema {
735                column_name: greptime_value().to_string(),
736                datatype: ColumnDataType::Float64 as _,
737                semantic_type: SemanticType::Field as _,
738                datatype_extension: None,
739                options: None,
740            },
741        ];
742        let create_table_expr =
743            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
744
745        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
746        create_table_expr
747            .table_options
748            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
749
750        // create physical table
751        let res = statement_executor
752            .create_table_inner(create_table_expr, None, ctx.clone())
753            .await;
754
755        match res {
756            Ok(_) => {
757                info!("Successfully created table {table_reference}",);
758                Ok(())
759            }
760            Err(err) => {
761                error!(err; "Failed to create table {table_reference}");
762                Err(err)
763            }
764        }
765    }
766
767    async fn get_table(
768        &self,
769        catalog: &str,
770        schema: &str,
771        table: &str,
772    ) -> Result<Option<TableRef>> {
773        self.catalog_manager
774            .table(catalog, schema, table, None)
775            .await
776            .context(CatalogSnafu)
777    }
778
779    fn get_create_table_expr_on_demand(
780        &self,
781        req: &RowInsertRequest,
782        create_type: &AutoCreateTableType,
783        ctx: &QueryContextRef,
784    ) -> Result<CreateTableExpr> {
785        let mut table_options = std::collections::HashMap::with_capacity(4);
786        fill_table_options_for_create(&mut table_options, create_type, ctx);
787
788        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
789            // engine should be metric engine when creating logical tables.
790            METRIC_ENGINE_NAME
791        } else {
792            default_engine()
793        };
794
795        let schema = ctx.current_schema();
796        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
797        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
798        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
799        let mut create_table_expr =
800            build_create_table_expr(&table_ref, request_schema, engine_name)?;
801
802        info!("Table `{table_ref}` does not exist, try creating table");
803        create_table_expr.table_options.extend(table_options);
804        Ok(create_table_expr)
805    }
806
807    /// Returns an alter table expression if it finds new columns in the request.
808    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
809    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
810    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
811    /// for more details.
812    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
813    /// input `req`.
814    fn get_alter_table_expr_on_demand(
815        &self,
816        req: &mut RowInsertRequest,
817        table: &TableRef,
818        ctx: &QueryContextRef,
819        accommodate_existing_schema: bool,
820        is_single_value: bool,
821    ) -> Result<Option<AlterTableExpr>> {
822        let catalog_name = ctx.current_catalog();
823        let schema_name = ctx.current_schema();
824        let table_name = table.table_info().name.clone();
825
826        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
827        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
828        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
829        let Some(mut add_columns) = add_columns else {
830            return Ok(None);
831        };
832
833        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
834        if accommodate_existing_schema {
835            let table_schema = table.schema();
836            // Find timestamp column name
837            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
838            // Find field column name if there is only one and `is_single_value` is true.
839            let mut field_col_name = None;
840            if is_single_value {
841                let mut multiple_field_cols = false;
842                table.field_columns().for_each(|col| {
843                    if field_col_name.is_none() {
844                        field_col_name = Some(col.name.clone());
845                    } else {
846                        multiple_field_cols = true;
847                    }
848                });
849                if multiple_field_cols {
850                    field_col_name = None;
851                }
852            }
853
854            // Update column name in request schema for Timestamp/Field columns
855            if let Some(rows) = req.rows.as_mut() {
856                for col in &mut rows.schema {
857                    match col.semantic_type {
858                        x if x == SemanticType::Timestamp as i32 => {
859                            if let Some(ref ts_name) = ts_col_name
860                                && col.column_name != *ts_name
861                            {
862                                col.column_name = ts_name.clone();
863                            }
864                        }
865                        x if x == SemanticType::Field as i32 => {
866                            if let Some(ref field_name) = field_col_name
867                                && col.column_name != *field_name
868                            {
869                                col.column_name = field_name.clone();
870                            }
871                        }
872                        _ => {}
873                    }
874                }
875            }
876
877            // Only keep columns that are tags or non-single field.
878            add_columns.add_columns.retain(|col| {
879                let def = col.column_def.as_ref().unwrap();
880                def.semantic_type == SemanticType::Tag as i32
881                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
882            });
883
884            if add_columns.add_columns.is_empty() {
885                return Ok(None);
886            }
887        }
888
889        Ok(Some(AlterTableExpr {
890            catalog_name: catalog_name.to_string(),
891            schema_name: schema_name.clone(),
892            table_name: table_name.clone(),
893            kind: Some(Kind::AddColumns(add_columns)),
894        }))
895    }
896
897    /// Creates a table with options.
898    async fn create_physical_table(
899        &self,
900        mut create_table_expr: CreateTableExpr,
901        partitions: Option<Partitions>,
902        ctx: &QueryContextRef,
903        statement_executor: &StatementExecutor,
904    ) -> Result<TableRef> {
905        {
906            let table_ref = TableReference::full(
907                &create_table_expr.catalog_name,
908                &create_table_expr.schema_name,
909                &create_table_expr.table_name,
910            );
911
912            info!("Table `{table_ref}` does not exist, try creating table");
913        }
914        let res = statement_executor
915            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
916            .await;
917
918        let table_ref = TableReference::full(
919            &create_table_expr.catalog_name,
920            &create_table_expr.schema_name,
921            &create_table_expr.table_name,
922        );
923
924        match res {
925            Ok(table) => {
926                info!(
927                    "Successfully created table {} with options: {:?}",
928                    table_ref, create_table_expr.table_options,
929                );
930                Ok(table)
931            }
932            Err(err) => {
933                error!(err; "Failed to create table {}", table_ref);
934                Err(err)
935            }
936        }
937    }
938
939    async fn create_logical_tables(
940        &self,
941        create_table_exprs: Vec<CreateTableExpr>,
942        ctx: &QueryContextRef,
943        statement_executor: &StatementExecutor,
944    ) -> Result<Vec<TableRef>> {
945        let res = statement_executor
946            .create_logical_tables(&create_table_exprs, ctx.clone())
947            .await;
948
949        match res {
950            Ok(res) => {
951                info!("Successfully created logical tables");
952                Ok(res)
953            }
954            Err(err) => {
955                let failed_tables = create_table_exprs
956                    .into_iter()
957                    .map(|expr| {
958                        format!(
959                            "{}.{}.{}",
960                            expr.catalog_name, expr.schema_name, expr.table_name
961                        )
962                    })
963                    .collect::<Vec<_>>();
964                error!(
965                    err;
966                    "Failed to create logical tables {:?}",
967                    failed_tables
968                );
969                Err(err)
970            }
971        }
972    }
973
974    pub fn node_manager(&self) -> &NodeManagerRef {
975        &self.node_manager
976    }
977
978    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
979        &self.partition_manager
980    }
981}
982
983fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
984    for request in &requests.inserts {
985        let rows = request.rows.as_ref().unwrap();
986        let column_count = rows.schema.len();
987        rows.rows.iter().try_for_each(|r| {
988            ensure!(
989                r.values.len() == column_count,
990                InvalidInsertRequestSnafu {
991                    reason: format!(
992                        "column count mismatch, columns: {}, values: {}",
993                        column_count,
994                        r.values.len()
995                    )
996                }
997            );
998            Ok(())
999        })?;
1000    }
1001    Ok(())
1002}
1003
1004/// Fill table options for a new table by create type.
1005pub fn fill_table_options_for_create(
1006    table_options: &mut std::collections::HashMap<String, String>,
1007    create_type: &AutoCreateTableType,
1008    ctx: &QueryContextRef,
1009) {
1010    for key in VALID_TABLE_OPTION_KEYS {
1011        if let Some(value) = ctx.extension(key) {
1012            table_options.insert(key.to_string(), value.to_string());
1013        }
1014    }
1015
1016    match create_type {
1017        AutoCreateTableType::Logical(physical_table) => {
1018            table_options.insert(
1019                LOGICAL_TABLE_METADATA_KEY.to_string(),
1020                physical_table.clone(),
1021            );
1022        }
1023        AutoCreateTableType::Physical => {
1024            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1025                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1026            }
1027            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1028                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1029            }
1030            if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1031                table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1032                // We need to set the compaction type explicitly.
1033                table_options.insert(
1034                    COMPACTION_TYPE.to_string(),
1035                    COMPACTION_TYPE_TWCS.to_string(),
1036                );
1037            }
1038        }
1039        // Set append_mode to true for log table.
1040        // because log tables should keep rows with the same ts and tags.
1041        AutoCreateTableType::Log => {
1042            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1043        }
1044        AutoCreateTableType::LastNonNull => {
1045            table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1046        }
1047        AutoCreateTableType::Trace => {
1048            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1049        }
1050    }
1051}
1052
1053pub fn build_create_table_expr(
1054    table: &TableReference,
1055    request_schema: &[ColumnSchema],
1056    engine: &str,
1057) -> Result<CreateTableExpr> {
1058    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1059}
1060
1061/// Result of `create_or_alter_tables_on_demand`.
1062struct CreateAlterTableResult {
1063    /// table ids of ttl=instant tables.
1064    instant_table_ids: HashSet<TableId>,
1065    /// Table Info of the created tables.
1066    table_infos: HashMap<TableId, Arc<TableInfo>>,
1067}
1068
1069struct FlowMirrorTask {
1070    requests: HashMap<Peer, RegionInsertRequests>,
1071    num_rows: usize,
1072}
1073
1074impl FlowMirrorTask {
1075    async fn new(
1076        cache: &TableFlownodeSetCacheRef,
1077        requests: impl Iterator<Item = &RegionInsertRequest>,
1078    ) -> Result<Self> {
1079        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1080            HashMap::new();
1081        let mut num_rows = 0;
1082
1083        for req in requests {
1084            let table_id = RegionId::from_u64(req.region_id).table_id();
1085            match src_table_reqs.get_mut(&table_id) {
1086                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1087                // already know this is not source table
1088                Some(None) => continue,
1089                _ => {
1090                    // dedup peers
1091                    let peers = cache
1092                        .get(table_id)
1093                        .await
1094                        .context(RequestInsertsSnafu)?
1095                        .unwrap_or_default()
1096                        .values()
1097                        .cloned()
1098                        .collect::<HashSet<_>>()
1099                        .into_iter()
1100                        .collect::<Vec<_>>();
1101
1102                    if !peers.is_empty() {
1103                        let mut reqs = RegionInsertRequests::default();
1104                        reqs.requests.push(req.clone());
1105                        num_rows += reqs
1106                            .requests
1107                            .iter()
1108                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1109                            .sum::<usize>();
1110                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1111                    } else {
1112                        // insert a empty entry to avoid repeat query
1113                        src_table_reqs.insert(table_id, None);
1114                    }
1115                }
1116            }
1117        }
1118
1119        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1120
1121        for (_table_id, (peers, reqs)) in src_table_reqs
1122            .into_iter()
1123            .filter_map(|(k, v)| v.map(|v| (k, v)))
1124        {
1125            if peers.len() == 1 {
1126                // fast path, zero copy
1127                inserts
1128                    .entry(peers[0].clone())
1129                    .or_default()
1130                    .requests
1131                    .extend(reqs.requests);
1132                continue;
1133            } else {
1134                // TODO(discord9): need to split requests to multiple flownodes
1135                for flownode in peers {
1136                    inserts
1137                        .entry(flownode.clone())
1138                        .or_default()
1139                        .requests
1140                        .extend(reqs.requests.clone());
1141                }
1142            }
1143        }
1144
1145        Ok(Self {
1146            requests: inserts,
1147            num_rows,
1148        })
1149    }
1150
1151    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1152        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1153        for (peer, inserts) in self.requests {
1154            let node_manager = node_manager.clone();
1155            common_runtime::spawn_global(async move {
1156                let result = node_manager
1157                    .flownode(&peer)
1158                    .await
1159                    .handle_inserts(inserts)
1160                    .await
1161                    .context(RequestInsertsSnafu);
1162
1163                match result {
1164                    Ok(resp) => {
1165                        let affected_rows = resp.affected_rows;
1166                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1167                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1168                    }
1169                    Err(err) => {
1170                        error!(err; "Failed to insert data into flownode {}", peer);
1171                    }
1172                }
1173            });
1174        }
1175
1176        Ok(())
1177    }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182    use std::sync::Arc;
1183
1184    use api::v1::helper::{field_column_schema, time_index_column_schema};
1185    use api::v1::{RowInsertRequest, Rows, Value};
1186    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1187    use common_meta::cache::new_table_flownode_set_cache;
1188    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1189    use common_meta::test_util::MockDatanodeManager;
1190    use datatypes::data_type::ConcreteDataType;
1191    use datatypes::schema::ColumnSchema;
1192    use moka::future::Cache;
1193    use session::context::QueryContext;
1194    use table::TableRef;
1195    use table::dist_table::DummyDataSource;
1196    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1197
1198    use super::*;
1199    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1200
1201    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1202        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1203            ColumnSchema::new(
1204                ts_name,
1205                ConcreteDataType::timestamp_millisecond_datatype(),
1206                false,
1207            )
1208            .with_time_index(true),
1209            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1210        ])
1211        .unwrap()
1212        .build()
1213        .unwrap();
1214        let meta = TableMetaBuilder::empty()
1215            .schema(Arc::new(schema))
1216            .primary_key_indices(vec![])
1217            .value_indices(vec![1])
1218            .engine("mito")
1219            .next_column_id(0)
1220            .options(Default::default())
1221            .created_on(Default::default())
1222            .region_numbers(vec![0])
1223            .build()
1224            .unwrap();
1225        let info = Arc::new(
1226            TableInfoBuilder::default()
1227                .table_id(1)
1228                .table_version(0)
1229                .name("test_table")
1230                .schema_name(DEFAULT_SCHEMA_NAME)
1231                .catalog_name(DEFAULT_CATALOG_NAME)
1232                .desc(None)
1233                .table_type(TableType::Base)
1234                .meta(meta)
1235                .build()
1236                .unwrap(),
1237        );
1238        Arc::new(table::Table::new(
1239            info,
1240            table::metadata::FilterPushDownType::Unsupported,
1241            Arc::new(DummyDataSource),
1242        ))
1243    }
1244
1245    #[tokio::test]
1246    async fn test_accommodate_existing_schema_logic() {
1247        let ts_name = "my_ts";
1248        let field_name = "my_field";
1249        let table = make_table_ref_with_schema(ts_name, field_name);
1250
1251        // The request uses different names for timestamp and field columns
1252        let mut req = RowInsertRequest {
1253            table_name: "test_table".to_string(),
1254            rows: Some(Rows {
1255                schema: vec![
1256                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1257                    field_column_schema("field_wrong", ColumnDataType::Float64),
1258                ],
1259                rows: vec![api::v1::Row {
1260                    values: vec![Value::default(), Value::default()],
1261                }],
1262            }),
1263        };
1264        let ctx = Arc::new(QueryContext::with(
1265            DEFAULT_CATALOG_NAME,
1266            DEFAULT_SCHEMA_NAME,
1267        ));
1268
1269        let kv_backend = prepare_mocked_backend().await;
1270        let inserter = Inserter::new(
1271            catalog::memory::MemoryCatalogManager::new(),
1272            create_partition_rule_manager(kv_backend.clone()).await,
1273            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1274            Arc::new(new_table_flownode_set_cache(
1275                String::new(),
1276                Cache::new(100),
1277                kv_backend.clone(),
1278            )),
1279        );
1280        let alter_expr = inserter
1281            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1282            .unwrap();
1283        assert!(alter_expr.is_none());
1284
1285        // The request's schema should have updated names for timestamp and field columns
1286        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1287        assert_eq!(req_schema[0].column_name, ts_name);
1288        assert_eq!(req_schema[1].column_name, field_name);
1289    }
1290}