operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
32    TRACE_ID_COLUMN, TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY,
33};
34use common_grpc_expr::util::ColumnExpr;
35use common_meta::cache::TableFlownodeSetCacheRef;
36use common_meta::node_manager::{AffectedRows, NodeManagerRef};
37use common_meta::peer::Peer;
38use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
39use common_query::Output;
40use common_telemetry::tracing_context::TracingContext;
41use common_telemetry::{error, info, warn};
42use datatypes::schema::SkippingIndexOptions;
43use futures_util::future;
44use meter_macros::write_meter;
45use partition::manager::PartitionRuleManagerRef;
46use session::context::QueryContextRef;
47use snafu::prelude::*;
48use snafu::ResultExt;
49use sql::partition::partition_rule_for_hexstring;
50use sql::statements::create::Partitions;
51use sql::statements::insert::Insert;
52use store_api::metric_engine_consts::{
53    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
54};
55use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
56use store_api::storage::{RegionId, TableId};
57use table::metadata::TableInfo;
58use table::requests::{
59    InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TABLE_DATA_MODEL,
60    TABLE_DATA_MODEL_TRACE_V1, VALID_TABLE_OPTION_KEYS,
61};
62use table::table_reference::TableReference;
63use table::TableRef;
64
65use crate::error::{
66    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
67    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
68};
69use crate::expr_helper;
70use crate::region_req_factory::RegionRequestFactory;
71use crate::req_convert::common::preprocess_row_insert_requests;
72use crate::req_convert::insert::{
73    fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
74};
75use crate::statement::StatementExecutor;
76
77pub struct Inserter {
78    catalog_manager: CatalogManagerRef,
79    pub(crate) partition_manager: PartitionRuleManagerRef,
80    pub(crate) node_manager: NodeManagerRef,
81    table_flownode_set_cache: TableFlownodeSetCacheRef,
82}
83
84pub type InserterRef = Arc<Inserter>;
85
86/// Hint for the table type to create automatically.
87#[derive(Clone)]
88enum AutoCreateTableType {
89    /// A logical table with the physical table name.
90    Logical(String),
91    /// A physical table.
92    Physical,
93    /// A log table which is append-only.
94    Log,
95    /// A table that merges rows by `last_non_null` strategy.
96    LastNonNull,
97    /// Create table that build index and default partition rules on trace_id
98    Trace,
99}
100
101impl AutoCreateTableType {
102    fn as_str(&self) -> &'static str {
103        match self {
104            AutoCreateTableType::Logical(_) => "logical",
105            AutoCreateTableType::Physical => "physical",
106            AutoCreateTableType::Log => "log",
107            AutoCreateTableType::LastNonNull => "last_non_null",
108            AutoCreateTableType::Trace => "trace",
109        }
110    }
111}
112
113/// Split insert requests into normal and instant requests.
114///
115/// Where instant requests are requests with ttl=instant,
116/// and normal requests are requests with ttl set to other values.
117///
118/// This is used to split requests for different processing.
119#[derive(Clone)]
120pub struct InstantAndNormalInsertRequests {
121    /// Requests with normal ttl.
122    pub normal_requests: RegionInsertRequests,
123    /// Requests with ttl=instant.
124    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
125    pub instant_requests: RegionInsertRequests,
126}
127
128impl Inserter {
129    pub fn new(
130        catalog_manager: CatalogManagerRef,
131        partition_manager: PartitionRuleManagerRef,
132        node_manager: NodeManagerRef,
133        table_flownode_set_cache: TableFlownodeSetCacheRef,
134    ) -> Self {
135        Self {
136            catalog_manager,
137            partition_manager,
138            node_manager,
139            table_flownode_set_cache,
140        }
141    }
142
143    pub async fn handle_column_inserts(
144        &self,
145        requests: InsertRequests,
146        ctx: QueryContextRef,
147        statement_executor: &StatementExecutor,
148    ) -> Result<Output> {
149        let row_inserts = ColumnToRow::convert(requests)?;
150        self.handle_row_inserts(row_inserts, ctx, statement_executor)
151            .await
152    }
153
154    /// Handles row inserts request and creates a physical table on demand.
155    pub async fn handle_row_inserts(
156        &self,
157        mut requests: RowInsertRequests,
158        ctx: QueryContextRef,
159        statement_executor: &StatementExecutor,
160    ) -> Result<Output> {
161        preprocess_row_insert_requests(&mut requests.inserts)?;
162        self.handle_row_inserts_with_create_type(
163            requests,
164            ctx,
165            statement_executor,
166            AutoCreateTableType::Physical,
167        )
168        .await
169    }
170
171    /// Handles row inserts request and creates a log table on demand.
172    pub async fn handle_log_inserts(
173        &self,
174        requests: RowInsertRequests,
175        ctx: QueryContextRef,
176        statement_executor: &StatementExecutor,
177    ) -> Result<Output> {
178        self.handle_row_inserts_with_create_type(
179            requests,
180            ctx,
181            statement_executor,
182            AutoCreateTableType::Log,
183        )
184        .await
185    }
186
187    pub async fn handle_trace_inserts(
188        &self,
189        requests: RowInsertRequests,
190        ctx: QueryContextRef,
191        statement_executor: &StatementExecutor,
192    ) -> Result<Output> {
193        self.handle_row_inserts_with_create_type(
194            requests,
195            ctx,
196            statement_executor,
197            AutoCreateTableType::Trace,
198        )
199        .await
200    }
201
202    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
203    pub async fn handle_last_non_null_inserts(
204        &self,
205        requests: RowInsertRequests,
206        ctx: QueryContextRef,
207        statement_executor: &StatementExecutor,
208    ) -> Result<Output> {
209        self.handle_row_inserts_with_create_type(
210            requests,
211            ctx,
212            statement_executor,
213            AutoCreateTableType::LastNonNull,
214        )
215        .await
216    }
217
218    /// Handles row inserts request with specified [AutoCreateTableType].
219    async fn handle_row_inserts_with_create_type(
220        &self,
221        mut requests: RowInsertRequests,
222        ctx: QueryContextRef,
223        statement_executor: &StatementExecutor,
224        create_type: AutoCreateTableType,
225    ) -> Result<Output> {
226        // remove empty requests
227        requests.inserts.retain(|req| {
228            req.rows
229                .as_ref()
230                .map(|r| !r.rows.is_empty())
231                .unwrap_or_default()
232        });
233        validate_column_count_match(&requests)?;
234
235        let CreateAlterTableResult {
236            instant_table_ids,
237            table_infos,
238        } = self
239            .create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
240            .await?;
241
242        let name_to_info = table_infos
243            .values()
244            .map(|info| (info.name.clone(), info.clone()))
245            .collect::<HashMap<_, _>>();
246        let inserts = RowToRegion::new(
247            name_to_info,
248            instant_table_ids,
249            self.partition_manager.as_ref(),
250        )
251        .convert(requests)
252        .await?;
253
254        self.do_request(inserts, &table_infos, &ctx).await
255    }
256
257    /// Handles row inserts request with metric engine.
258    pub async fn handle_metric_row_inserts(
259        &self,
260        mut requests: RowInsertRequests,
261        ctx: QueryContextRef,
262        statement_executor: &StatementExecutor,
263        physical_table: String,
264    ) -> Result<Output> {
265        // remove empty requests
266        requests.inserts.retain(|req| {
267            req.rows
268                .as_ref()
269                .map(|r| !r.rows.is_empty())
270                .unwrap_or_default()
271        });
272        validate_column_count_match(&requests)?;
273
274        // check and create physical table
275        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
276            .await?;
277
278        // check and create logical tables
279        let CreateAlterTableResult {
280            instant_table_ids,
281            table_infos,
282        } = self
283            .create_or_alter_tables_on_demand(
284                &requests,
285                &ctx,
286                AutoCreateTableType::Logical(physical_table.to_string()),
287                statement_executor,
288            )
289            .await?;
290        let name_to_info = table_infos
291            .values()
292            .map(|info| (info.name.clone(), info.clone()))
293            .collect::<HashMap<_, _>>();
294        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
295            .convert(requests)
296            .await?;
297
298        self.do_request(inserts, &table_infos, &ctx).await
299    }
300
301    pub async fn handle_table_insert(
302        &self,
303        request: TableInsertRequest,
304        ctx: QueryContextRef,
305    ) -> Result<Output> {
306        let catalog = request.catalog_name.as_str();
307        let schema = request.schema_name.as_str();
308        let table_name = request.table_name.as_str();
309        let table = self.get_table(catalog, schema, table_name).await?;
310        let table = table.with_context(|| TableNotFoundSnafu {
311            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
312        })?;
313        let table_info = table.table_info();
314
315        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
316            .convert(request)
317            .await?;
318
319        let table_infos =
320            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
321
322        self.do_request(inserts, &table_infos, &ctx).await
323    }
324
325    pub async fn handle_statement_insert(
326        &self,
327        insert: &Insert,
328        ctx: &QueryContextRef,
329    ) -> Result<Output> {
330        let (inserts, table_info) =
331            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
332                .convert(insert, ctx)
333                .await?;
334
335        let table_infos =
336            HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
337
338        self.do_request(inserts, &table_infos, ctx).await
339    }
340}
341
342impl Inserter {
343    async fn do_request(
344        &self,
345        requests: InstantAndNormalInsertRequests,
346        table_infos: &HashMap<TableId, Arc<TableInfo>>,
347        ctx: &QueryContextRef,
348    ) -> Result<Output> {
349        // Fill impure default values in the request
350        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
351
352        let write_cost = write_meter!(
353            ctx.current_catalog(),
354            ctx.current_schema(),
355            requests,
356            ctx.channel() as u8
357        );
358        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
359            tracing_context: TracingContext::from_current_span().to_w3c(),
360            dbname: ctx.get_db_string(),
361            ..Default::default()
362        });
363
364        let InstantAndNormalInsertRequests {
365            normal_requests,
366            instant_requests,
367        } = requests;
368
369        // Mirror requests for source table to flownode asynchronously
370        let flow_mirror_task = FlowMirrorTask::new(
371            &self.table_flownode_set_cache,
372            normal_requests
373                .requests
374                .iter()
375                .chain(instant_requests.requests.iter()),
376        )
377        .await?;
378        flow_mirror_task.detach(self.node_manager.clone())?;
379
380        // Write requests to datanode and wait for response
381        let write_tasks = self
382            .group_requests_by_peer(normal_requests)
383            .await?
384            .into_iter()
385            .map(|(peer, inserts)| {
386                let node_manager = self.node_manager.clone();
387                let request = request_factory.build_insert(inserts);
388                common_runtime::spawn_global(async move {
389                    node_manager
390                        .datanode(&peer)
391                        .await
392                        .handle(request)
393                        .await
394                        .context(RequestInsertsSnafu)
395                })
396            });
397        let results = future::try_join_all(write_tasks)
398            .await
399            .context(JoinTaskSnafu)?;
400        let affected_rows = results
401            .into_iter()
402            .map(|resp| resp.map(|r| r.affected_rows))
403            .sum::<Result<AffectedRows>>()?;
404        crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
405        Ok(Output::new(
406            OutputData::AffectedRows(affected_rows),
407            OutputMeta::new_with_cost(write_cost as _),
408        ))
409    }
410
411    async fn group_requests_by_peer(
412        &self,
413        requests: RegionInsertRequests,
414    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
415        // group by region ids first to reduce repeatedly call `find_region_leader`
416        // TODO(discord9): determine if a addition clone is worth it
417        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
418        for req in requests.requests {
419            let region_id = RegionId::from_u64(req.region_id);
420            requests_per_region
421                .entry(region_id)
422                .or_default()
423                .requests
424                .push(req);
425        }
426
427        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
428
429        for (region_id, reqs) in requests_per_region {
430            let peer = self
431                .partition_manager
432                .find_region_leader(region_id)
433                .await
434                .context(FindRegionLeaderSnafu)?;
435            inserts
436                .entry(peer)
437                .or_default()
438                .requests
439                .extend(reqs.requests);
440        }
441
442        Ok(inserts)
443    }
444
445    /// Creates or alter tables on demand:
446    /// - if table does not exist, create table by inferred CreateExpr
447    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
448    ///
449    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
450    /// This mapping is used in the conversion of RowToRegion.
451    async fn create_or_alter_tables_on_demand(
452        &self,
453        requests: &RowInsertRequests,
454        ctx: &QueryContextRef,
455        auto_create_table_type: AutoCreateTableType,
456        statement_executor: &StatementExecutor,
457    ) -> Result<CreateAlterTableResult> {
458        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
459            .with_label_values(&[auto_create_table_type.as_str()])
460            .start_timer();
461
462        let catalog = ctx.current_catalog();
463        let schema = ctx.current_schema();
464
465        let mut table_infos = HashMap::new();
466        // If `auto_create_table` hint is disabled, skip creating/altering tables.
467        let auto_create_table_hint = ctx
468            .extension(AUTO_CREATE_TABLE_KEY)
469            .map(|v| v.parse::<bool>())
470            .transpose()
471            .map_err(|_| {
472                InvalidInsertRequestSnafu {
473                    reason: "`auto_create_table` hint must be a boolean",
474                }
475                .build()
476            })?
477            .unwrap_or(true);
478        if !auto_create_table_hint {
479            let mut instant_table_ids = HashSet::new();
480            for req in &requests.inserts {
481                let table = self
482                    .get_table(catalog, &schema, &req.table_name)
483                    .await?
484                    .context(InvalidInsertRequestSnafu {
485                        reason: format!(
486                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
487                            req.table_name
488                        ),
489                    })?;
490                let table_info = table.table_info();
491                if table_info.is_ttl_instant_table() {
492                    instant_table_ids.insert(table_info.table_id());
493                }
494                table_infos.insert(table_info.table_id(), table.table_info());
495            }
496            let ret = CreateAlterTableResult {
497                instant_table_ids,
498                table_infos,
499            };
500            return Ok(ret);
501        }
502
503        let mut create_tables = vec![];
504        let mut alter_tables = vec![];
505        let mut instant_table_ids = HashSet::new();
506
507        for req in &requests.inserts {
508            match self.get_table(catalog, &schema, &req.table_name).await? {
509                Some(table) => {
510                    let table_info = table.table_info();
511                    if table_info.is_ttl_instant_table() {
512                        instant_table_ids.insert(table_info.table_id());
513                    }
514                    table_infos.insert(table_info.table_id(), table.table_info());
515                    if let Some(alter_expr) =
516                        self.get_alter_table_expr_on_demand(req, &table, ctx)?
517                    {
518                        alter_tables.push(alter_expr);
519                    }
520                }
521                None => {
522                    let create_expr =
523                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
524                    create_tables.push(create_expr);
525                }
526            }
527        }
528
529        match auto_create_table_type {
530            AutoCreateTableType::Logical(_) => {
531                if !create_tables.is_empty() {
532                    // Creates logical tables in batch.
533                    let tables = self
534                        .create_logical_tables(create_tables, ctx, statement_executor)
535                        .await?;
536
537                    for table in tables {
538                        let table_info = table.table_info();
539                        if table_info.is_ttl_instant_table() {
540                            instant_table_ids.insert(table_info.table_id());
541                        }
542                        table_infos.insert(table_info.table_id(), table.table_info());
543                    }
544                }
545                if !alter_tables.is_empty() {
546                    // Alter logical tables in batch.
547                    statement_executor
548                        .alter_logical_tables(alter_tables, ctx.clone())
549                        .await?;
550                }
551            }
552            AutoCreateTableType::Physical
553            | AutoCreateTableType::Log
554            | AutoCreateTableType::LastNonNull => {
555                // note that auto create table shouldn't be ttl instant table
556                // for it's a very unexpected behavior and should be set by user explicitly
557                for create_table in create_tables {
558                    let table = self
559                        .create_physical_table(create_table, None, ctx, statement_executor)
560                        .await?;
561                    let table_info = table.table_info();
562                    if table_info.is_ttl_instant_table() {
563                        instant_table_ids.insert(table_info.table_id());
564                    }
565                    table_infos.insert(table_info.table_id(), table.table_info());
566                }
567                for alter_expr in alter_tables.into_iter() {
568                    statement_executor
569                        .alter_table_inner(alter_expr, ctx.clone())
570                        .await?;
571                }
572            }
573
574            AutoCreateTableType::Trace => {
575                let trace_table_name = ctx
576                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
577                    .unwrap_or(TRACE_TABLE_NAME);
578
579                // note that auto create table shouldn't be ttl instant table
580                // for it's a very unexpected behavior and should be set by user explicitly
581                for mut create_table in create_tables {
582                    if create_table.table_name == trace_services_table_name(trace_table_name) {
583                        // Disable append mode for trace services table since it requires upsert behavior.
584                        create_table
585                            .table_options
586                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
587                        let table = self
588                            .create_physical_table(create_table, None, ctx, statement_executor)
589                            .await?;
590                        let table_info = table.table_info();
591                        if table_info.is_ttl_instant_table() {
592                            instant_table_ids.insert(table_info.table_id());
593                        }
594                        table_infos.insert(table_info.table_id(), table.table_info());
595                    } else {
596                        // prebuilt partition rules for uuid data: see the function
597                        // for more information
598                        let partitions = partition_rule_for_hexstring(TRACE_ID_COLUMN)
599                            .context(CreatePartitionRulesSnafu)?;
600                        // add skip index to
601                        // - trace_id: when searching by trace id
602                        // - parent_span_id: when searching root span
603                        // - span_name: when searching certain types of span
604                        let index_columns =
605                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
606                        for index_column in index_columns {
607                            if let Some(col) = create_table
608                                .column_defs
609                                .iter_mut()
610                                .find(|c| c.name == index_column)
611                            {
612                                col.options =
613                                    options_from_skipping(&SkippingIndexOptions::default())
614                                        .context(ColumnOptionsSnafu)?;
615                            } else {
616                                warn!(
617                                    "Column {} not found when creating index for trace table: {}.",
618                                    index_column, create_table.table_name
619                                );
620                            }
621                        }
622
623                        // use table_options to mark table model version
624                        create_table.table_options.insert(
625                            TABLE_DATA_MODEL.to_string(),
626                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
627                        );
628
629                        let table = self
630                            .create_physical_table(
631                                create_table,
632                                Some(partitions),
633                                ctx,
634                                statement_executor,
635                            )
636                            .await?;
637                        let table_info = table.table_info();
638                        if table_info.is_ttl_instant_table() {
639                            instant_table_ids.insert(table_info.table_id());
640                        }
641                        table_infos.insert(table_info.table_id(), table.table_info());
642                    }
643                }
644                for alter_expr in alter_tables.into_iter() {
645                    statement_executor
646                        .alter_table_inner(alter_expr, ctx.clone())
647                        .await?;
648                }
649            }
650        }
651
652        Ok(CreateAlterTableResult {
653            instant_table_ids,
654            table_infos,
655        })
656    }
657
658    async fn create_physical_table_on_demand(
659        &self,
660        ctx: &QueryContextRef,
661        physical_table: String,
662        statement_executor: &StatementExecutor,
663    ) -> Result<()> {
664        let catalog_name = ctx.current_catalog();
665        let schema_name = ctx.current_schema();
666
667        // check if exist
668        if self
669            .get_table(catalog_name, &schema_name, &physical_table)
670            .await?
671            .is_some()
672        {
673            return Ok(());
674        }
675
676        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
677        info!("Physical metric table `{table_reference}` does not exist, try creating table");
678
679        // schema with timestamp and field column
680        let default_schema = vec![
681            ColumnSchema {
682                column_name: GREPTIME_TIMESTAMP.to_string(),
683                datatype: ColumnDataType::TimestampMillisecond as _,
684                semantic_type: SemanticType::Timestamp as _,
685                datatype_extension: None,
686                options: None,
687            },
688            ColumnSchema {
689                column_name: GREPTIME_VALUE.to_string(),
690                datatype: ColumnDataType::Float64 as _,
691                semantic_type: SemanticType::Field as _,
692                datatype_extension: None,
693                options: None,
694            },
695        ];
696        let create_table_expr =
697            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
698
699        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
700        create_table_expr
701            .table_options
702            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
703
704        // create physical table
705        let res = statement_executor
706            .create_table_inner(create_table_expr, None, ctx.clone())
707            .await;
708
709        match res {
710            Ok(_) => {
711                info!("Successfully created table {table_reference}",);
712                Ok(())
713            }
714            Err(err) => {
715                error!(err; "Failed to create table {table_reference}");
716                Err(err)
717            }
718        }
719    }
720
721    async fn get_table(
722        &self,
723        catalog: &str,
724        schema: &str,
725        table: &str,
726    ) -> Result<Option<TableRef>> {
727        self.catalog_manager
728            .table(catalog, schema, table, None)
729            .await
730            .context(CatalogSnafu)
731    }
732
733    fn get_create_table_expr_on_demand(
734        &self,
735        req: &RowInsertRequest,
736        create_type: &AutoCreateTableType,
737        ctx: &QueryContextRef,
738    ) -> Result<CreateTableExpr> {
739        let mut table_options = Vec::with_capacity(4);
740        for key in VALID_TABLE_OPTION_KEYS {
741            if let Some(value) = ctx.extension(key) {
742                table_options.push((key, value));
743            }
744        }
745
746        let mut engine_name = default_engine();
747        match create_type {
748            AutoCreateTableType::Logical(physical_table) => {
749                engine_name = METRIC_ENGINE_NAME;
750                table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
751            }
752            AutoCreateTableType::Physical => {
753                if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
754                    table_options.push((APPEND_MODE_KEY, append_mode));
755                }
756                if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
757                    table_options.push((MERGE_MODE_KEY, merge_mode));
758                }
759            }
760            // Set append_mode to true for log table.
761            // because log tables should keep rows with the same ts and tags.
762            AutoCreateTableType::Log => {
763                table_options.push((APPEND_MODE_KEY, "true"));
764            }
765            AutoCreateTableType::LastNonNull => {
766                table_options.push((MERGE_MODE_KEY, "last_non_null"));
767            }
768            AutoCreateTableType::Trace => {
769                table_options.push((APPEND_MODE_KEY, "true"));
770            }
771        }
772
773        let schema = ctx.current_schema();
774        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
775        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
776        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
777        let mut create_table_expr =
778            build_create_table_expr(&table_ref, request_schema, engine_name)?;
779
780        info!("Table `{table_ref}` does not exist, try creating table");
781        for (k, v) in table_options {
782            create_table_expr
783                .table_options
784                .insert(k.to_string(), v.to_string());
785        }
786
787        Ok(create_table_expr)
788    }
789
790    /// Returns an alter table expression if it finds new columns in the request.
791    /// It always adds columns if not exist.
792    fn get_alter_table_expr_on_demand(
793        &self,
794        req: &RowInsertRequest,
795        table: &TableRef,
796        ctx: &QueryContextRef,
797    ) -> Result<Option<AlterTableExpr>> {
798        let catalog_name = ctx.current_catalog();
799        let schema_name = ctx.current_schema();
800        let table_name = table.table_info().name.clone();
801
802        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
803        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
804        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
805        let Some(add_columns) = add_columns else {
806            return Ok(None);
807        };
808
809        Ok(Some(AlterTableExpr {
810            catalog_name: catalog_name.to_string(),
811            schema_name: schema_name.to_string(),
812            table_name: table_name.to_string(),
813            kind: Some(Kind::AddColumns(add_columns)),
814        }))
815    }
816
817    /// Creates a table with options.
818    async fn create_physical_table(
819        &self,
820        mut create_table_expr: CreateTableExpr,
821        partitions: Option<Partitions>,
822        ctx: &QueryContextRef,
823        statement_executor: &StatementExecutor,
824    ) -> Result<TableRef> {
825        {
826            let table_ref = TableReference::full(
827                &create_table_expr.catalog_name,
828                &create_table_expr.schema_name,
829                &create_table_expr.table_name,
830            );
831
832            info!("Table `{table_ref}` does not exist, try creating table");
833        }
834        let res = statement_executor
835            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
836            .await;
837
838        let table_ref = TableReference::full(
839            &create_table_expr.catalog_name,
840            &create_table_expr.schema_name,
841            &create_table_expr.table_name,
842        );
843
844        match res {
845            Ok(table) => {
846                info!(
847                    "Successfully created table {} with options: {:?}",
848                    table_ref, create_table_expr.table_options,
849                );
850                Ok(table)
851            }
852            Err(err) => {
853                error!(err; "Failed to create table {}", table_ref);
854                Err(err)
855            }
856        }
857    }
858
859    async fn create_logical_tables(
860        &self,
861        create_table_exprs: Vec<CreateTableExpr>,
862        ctx: &QueryContextRef,
863        statement_executor: &StatementExecutor,
864    ) -> Result<Vec<TableRef>> {
865        let res = statement_executor
866            .create_logical_tables(&create_table_exprs, ctx.clone())
867            .await;
868
869        match res {
870            Ok(res) => {
871                info!("Successfully created logical tables");
872                Ok(res)
873            }
874            Err(err) => {
875                let failed_tables = create_table_exprs
876                    .into_iter()
877                    .map(|expr| {
878                        format!(
879                            "{}.{}.{}",
880                            expr.catalog_name, expr.schema_name, expr.table_name
881                        )
882                    })
883                    .collect::<Vec<_>>();
884                error!(
885                    err;
886                    "Failed to create logical tables {:?}",
887                    failed_tables
888                );
889                Err(err)
890            }
891        }
892    }
893}
894
895fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
896    for request in &requests.inserts {
897        let rows = request.rows.as_ref().unwrap();
898        let column_count = rows.schema.len();
899        rows.rows.iter().try_for_each(|r| {
900            ensure!(
901                r.values.len() == column_count,
902                InvalidInsertRequestSnafu {
903                    reason: format!(
904                        "column count mismatch, columns: {}, values: {}",
905                        column_count,
906                        r.values.len()
907                    )
908                }
909            );
910            Ok(())
911        })?;
912    }
913    Ok(())
914}
915
916fn build_create_table_expr(
917    table: &TableReference,
918    request_schema: &[ColumnSchema],
919    engine: &str,
920) -> Result<CreateTableExpr> {
921    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
922}
923
924/// Result of `create_or_alter_tables_on_demand`.
925struct CreateAlterTableResult {
926    /// table ids of ttl=instant tables.
927    instant_table_ids: HashSet<TableId>,
928    /// Table Info of the created tables.
929    table_infos: HashMap<TableId, Arc<TableInfo>>,
930}
931
932struct FlowMirrorTask {
933    requests: HashMap<Peer, RegionInsertRequests>,
934    num_rows: usize,
935}
936
937impl FlowMirrorTask {
938    async fn new(
939        cache: &TableFlownodeSetCacheRef,
940        requests: impl Iterator<Item = &RegionInsertRequest>,
941    ) -> Result<Self> {
942        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
943            HashMap::new();
944        let mut num_rows = 0;
945
946        for req in requests {
947            let table_id = RegionId::from_u64(req.region_id).table_id();
948            match src_table_reqs.get_mut(&table_id) {
949                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
950                // already know this is not source table
951                Some(None) => continue,
952                _ => {
953                    let peers = cache
954                        .get(table_id)
955                        .await
956                        .context(RequestInsertsSnafu)?
957                        .unwrap_or_default()
958                        .values()
959                        .cloned()
960                        .collect::<Vec<_>>();
961
962                    if !peers.is_empty() {
963                        let mut reqs = RegionInsertRequests::default();
964                        reqs.requests.push(req.clone());
965                        num_rows += reqs
966                            .requests
967                            .iter()
968                            .map(|r| r.rows.as_ref().unwrap().rows.len())
969                            .sum::<usize>();
970                        src_table_reqs.insert(table_id, Some((peers, reqs)));
971                    } else {
972                        // insert a empty entry to avoid repeat query
973                        src_table_reqs.insert(table_id, None);
974                    }
975                }
976            }
977        }
978
979        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
980
981        for (_table_id, (peers, reqs)) in src_table_reqs
982            .into_iter()
983            .filter_map(|(k, v)| v.map(|v| (k, v)))
984        {
985            if peers.len() == 1 {
986                // fast path, zero copy
987                inserts
988                    .entry(peers[0].clone())
989                    .or_default()
990                    .requests
991                    .extend(reqs.requests);
992                continue;
993            } else {
994                // TODO(discord9): need to split requests to multiple flownodes
995                for flownode in peers {
996                    inserts
997                        .entry(flownode.clone())
998                        .or_default()
999                        .requests
1000                        .extend(reqs.requests.clone());
1001                }
1002            }
1003        }
1004
1005        Ok(Self {
1006            requests: inserts,
1007            num_rows,
1008        })
1009    }
1010
1011    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1012        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1013        for (peer, inserts) in self.requests {
1014            let node_manager = node_manager.clone();
1015            common_runtime::spawn_global(async move {
1016                let result = node_manager
1017                    .flownode(&peer)
1018                    .await
1019                    .handle_inserts(inserts)
1020                    .await
1021                    .context(RequestInsertsSnafu);
1022
1023                match result {
1024                    Ok(resp) => {
1025                        let affected_rows = resp.affected_rows;
1026                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1027                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1028                    }
1029                    Err(err) => {
1030                        error!(err; "Failed to insert data into flownode {}", peer);
1031                    }
1032                }
1033            });
1034        }
1035
1036        Ok(())
1037    }
1038}