operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::v1::RowInsertRequests;
33use catalog::CatalogManagerRef;
34use catalog::kvbackend::KvBackendCatalogManager;
35use catalog::process_manager::ProcessManagerRef;
36use client::RecordBatches;
37use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
38use client::inserter::{InsertOptions, Inserter};
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
43use common_meta::key::schema_name::SchemaNameKey;
44use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
45use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
46use common_meta::kv_backend::KvBackendRef;
47use common_meta::procedure_executor::ProcedureExecutorRef;
48use common_query::Output;
49use common_telemetry::{debug, tracing, warn};
50use common_time::Timestamp;
51use common_time::range::TimestampRange;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use humantime::format_duration;
55use itertools::Itertools;
56use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
57use query::QueryEngineRef;
58use query::parser::QueryStatement;
59use session::context::{Channel, QueryContextBuilder, QueryContextRef};
60use session::table_name::table_idents_to_full_name;
61use set::{set_query_timeout, set_read_preference};
62use snafu::{OptionExt, ResultExt, ensure};
63use sql::ast::ObjectNamePartExt;
64use sql::statements::OptionMap;
65use sql::statements::copy::{
66    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
67};
68use sql::statements::set_variables::SetVariables;
69use sql::statements::show::ShowCreateTableVariant;
70use sql::statements::statement::Statement;
71use sql::util::format_raw_object_name;
72use sqlparser::ast::ObjectName;
73use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
74use table::TableRef;
75use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
76use table::table_name::TableName;
77use table::table_reference::TableReference;
78
79use self::set::{
80    set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
81};
82use crate::error::{
83    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
84    PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
85    TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
86};
87use crate::insert::InserterRef;
88use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
89use crate::statement::set::set_allow_query_fallback;
90
91/// A configurator that customizes or enhances a [`StatementExecutor`].
92#[async_trait::async_trait]
93pub trait StatementExecutorConfigurator: Send + Sync {
94    async fn configure(
95        &self,
96        executor: StatementExecutor,
97        ctx: ExecutorConfigureContext,
98    ) -> std::result::Result<StatementExecutor, BoxedError>;
99}
100
101pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
102
103pub struct ExecutorConfigureContext {
104    pub kv_backend: KvBackendRef,
105}
106
107#[derive(Clone)]
108pub struct StatementExecutor {
109    catalog_manager: CatalogManagerRef,
110    query_engine: QueryEngineRef,
111    procedure_executor: ProcedureExecutorRef,
112    table_metadata_manager: TableMetadataManagerRef,
113    flow_metadata_manager: FlowMetadataManagerRef,
114    view_info_manager: ViewInfoManagerRef,
115    partition_manager: PartitionRuleManagerRef,
116    cache_invalidator: CacheInvalidatorRef,
117    inserter: InserterRef,
118    process_manager: Option<ProcessManagerRef>,
119    #[cfg(feature = "enterprise")]
120    trigger_querier: Option<TriggerQuerierRef>,
121}
122
123pub type StatementExecutorRef = Arc<StatementExecutor>;
124
125/// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc.
126#[cfg(feature = "enterprise")]
127#[async_trait::async_trait]
128pub trait TriggerQuerier: Send + Sync {
129    // Query the `SHOW CREATE TRIGGER` statement for the given trigger.
130    async fn show_create_trigger(
131        &self,
132        catalog: &str,
133        trigger: &str,
134        query_ctx: &QueryContextRef,
135    ) -> std::result::Result<Output, BoxedError>;
136
137    fn as_any(&self) -> &dyn std::any::Any;
138}
139
140#[cfg(feature = "enterprise")]
141pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
142
143impl StatementExecutor {
144    #[allow(clippy::too_many_arguments)]
145    pub fn new(
146        catalog_manager: CatalogManagerRef,
147        query_engine: QueryEngineRef,
148        procedure_executor: ProcedureExecutorRef,
149        kv_backend: KvBackendRef,
150        cache_invalidator: CacheInvalidatorRef,
151        inserter: InserterRef,
152        table_route_cache: TableRouteCacheRef,
153        process_manager: Option<ProcessManagerRef>,
154    ) -> Self {
155        Self {
156            catalog_manager,
157            query_engine,
158            procedure_executor,
159            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
160            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
161            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
162            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
163            cache_invalidator,
164            inserter,
165            process_manager,
166            #[cfg(feature = "enterprise")]
167            trigger_querier: None,
168        }
169    }
170
171    #[cfg(feature = "enterprise")]
172    pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
173        self.trigger_querier = Some(querier);
174        self
175    }
176
177    #[cfg(feature = "testing")]
178    pub async fn execute_stmt(
179        &self,
180        stmt: QueryStatement,
181        query_ctx: QueryContextRef,
182    ) -> Result<Output> {
183        match stmt {
184            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
185            QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
186        }
187    }
188
189    #[tracing::instrument(skip_all)]
190    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
191        match stmt {
192            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
193                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
194            }
195
196            Statement::DeclareCursor(declare_cursor) => {
197                self.declare_cursor(declare_cursor, query_ctx).await
198            }
199            Statement::FetchCursor(fetch_cursor) => {
200                self.fetch_cursor(fetch_cursor, query_ctx).await
201            }
202            Statement::CloseCursor(close_cursor) => {
203                self.close_cursor(close_cursor, query_ctx).await
204            }
205
206            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
207
208            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
209
210            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
211
212            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
213
214            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
215
216            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
217
218            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
219
220            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
221
222            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
223
224            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
225
226            #[cfg(feature = "enterprise")]
227            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
228
229            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
230                let query_output = self
231                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
232                    .await?;
233                let req = to_copy_query_request(stmt.arg)?;
234
235                self.copy_query_to(req, query_output)
236                    .await
237                    .map(Output::new_with_affected_rows)
238            }
239
240            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
241                let req = to_copy_table_request(stmt, query_ctx.clone())?;
242                match req.direction {
243                    CopyDirection::Export => self
244                        .copy_table_to(req, query_ctx)
245                        .await
246                        .map(Output::new_with_affected_rows),
247                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
248                }
249            }
250
251            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
252                match copy_database {
253                    CopyDatabase::To(arg) => {
254                        self.copy_database_to(
255                            to_copy_database_request(arg, &query_ctx)?,
256                            query_ctx.clone(),
257                        )
258                        .await
259                    }
260                    CopyDatabase::From(arg) => {
261                        self.copy_database_from(
262                            to_copy_database_request(arg, &query_ctx)?,
263                            query_ctx,
264                        )
265                        .await
266                    }
267                }
268            }
269
270            Statement::CreateTable(stmt) => {
271                let _ = self.create_table(stmt, query_ctx).await?;
272                Ok(Output::new_with_affected_rows(0))
273            }
274            Statement::CreateTableLike(stmt) => {
275                let _ = self.create_table_like(stmt, query_ctx).await?;
276                Ok(Output::new_with_affected_rows(0))
277            }
278            Statement::CreateExternalTable(stmt) => {
279                let _ = self.create_external_table(stmt, query_ctx).await?;
280                Ok(Output::new_with_affected_rows(0))
281            }
282            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
283            #[cfg(feature = "enterprise")]
284            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
285            Statement::DropFlow(stmt) => {
286                self.drop_flow(
287                    query_ctx.current_catalog().to_string(),
288                    format_raw_object_name(stmt.flow_name()),
289                    stmt.drop_if_exists(),
290                    query_ctx,
291                )
292                .await
293            }
294            #[cfg(feature = "enterprise")]
295            Statement::DropTrigger(stmt) => {
296                self.drop_trigger(
297                    query_ctx.current_catalog().to_string(),
298                    format_raw_object_name(stmt.trigger_name()),
299                    stmt.drop_if_exists(),
300                    query_ctx,
301                )
302                .await
303            }
304            Statement::CreateView(stmt) => {
305                let _ = self.create_view(stmt, query_ctx).await?;
306                Ok(Output::new_with_affected_rows(0))
307            }
308            Statement::DropView(stmt) => {
309                let (catalog_name, schema_name, view_name) =
310                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
311                        .map_err(BoxedError::new)
312                        .context(ExternalSnafu)?;
313
314                self.drop_view(
315                    catalog_name,
316                    schema_name,
317                    view_name,
318                    stmt.drop_if_exists,
319                    query_ctx,
320                )
321                .await
322            }
323            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
324
325            Statement::AlterDatabase(alter_database) => {
326                self.alter_database(alter_database, query_ctx).await
327            }
328
329            #[cfg(feature = "enterprise")]
330            Statement::AlterTrigger(alter_trigger) => {
331                self.alter_trigger(alter_trigger, query_ctx).await
332            }
333
334            Statement::DropTable(stmt) => {
335                let mut table_names = Vec::with_capacity(stmt.table_names().len());
336                for table_name_stmt in stmt.table_names() {
337                    let (catalog, schema, table) =
338                        table_idents_to_full_name(table_name_stmt, &query_ctx)
339                            .map_err(BoxedError::new)
340                            .context(ExternalSnafu)?;
341                    table_names.push(TableName::new(catalog, schema, table));
342                }
343                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
344                    .await
345            }
346            Statement::DropDatabase(stmt) => {
347                self.drop_database(
348                    query_ctx.current_catalog().to_string(),
349                    format_raw_object_name(stmt.name()),
350                    stmt.drop_if_exists(),
351                    query_ctx,
352                )
353                .await
354            }
355            Statement::TruncateTable(stmt) => {
356                let (catalog, schema, table) =
357                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
358                        .map_err(BoxedError::new)
359                        .context(ExternalSnafu)?;
360                let table_name = TableName::new(catalog, schema, table);
361                let time_ranges = self
362                    .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
363                    .await?;
364                self.truncate_table(table_name, time_ranges, query_ctx)
365                    .await
366            }
367            Statement::CreateDatabase(stmt) => {
368                self.create_database(
369                    &format_raw_object_name(&stmt.name),
370                    stmt.if_not_exists,
371                    stmt.options.into_map(),
372                    query_ctx,
373                )
374                .await
375            }
376            Statement::ShowCreateDatabase(show) => {
377                let (catalog, database) =
378                    idents_to_full_database_name(&show.database_name, &query_ctx)
379                        .map_err(BoxedError::new)
380                        .context(ExternalSnafu)?;
381                let table_metadata_manager = self
382                    .catalog_manager
383                    .as_any()
384                    .downcast_ref::<KvBackendCatalogManager>()
385                    .map(|manager| manager.table_metadata_manager_ref().clone())
386                    .context(UpgradeCatalogManagerRefSnafu)?;
387                let opts: HashMap<String, String> = table_metadata_manager
388                    .schema_manager()
389                    .get(SchemaNameKey::new(&catalog, &database))
390                    .await
391                    .context(TableMetadataManagerSnafu)?
392                    .context(SchemaNotFoundSnafu {
393                        schema_info: &database,
394                    })?
395                    .into_inner()
396                    .into();
397
398                self.show_create_database(&database, opts.into()).await
399            }
400            Statement::ShowCreateTable(show) => {
401                let (catalog, schema, table) =
402                    table_idents_to_full_name(&show.table_name, &query_ctx)
403                        .map_err(BoxedError::new)
404                        .context(ExternalSnafu)?;
405
406                let table_ref = self
407                    .catalog_manager
408                    .table(&catalog, &schema, &table, Some(&query_ctx))
409                    .await
410                    .context(CatalogSnafu)?
411                    .context(TableNotFoundSnafu { table_name: &table })?;
412                let table_name = TableName::new(catalog, schema, table);
413
414                match show.variant {
415                    ShowCreateTableVariant::Original => {
416                        self.show_create_table(table_name, table_ref, query_ctx)
417                            .await
418                    }
419                    ShowCreateTableVariant::PostgresForeignTable => {
420                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
421                            .await
422                    }
423                }
424            }
425            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
426            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
427            #[cfg(feature = "enterprise")]
428            Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
429            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
430            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
431            Statement::ShowColumns(show_columns) => {
432                self.show_columns(show_columns, query_ctx).await
433            }
434            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
435            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
436            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
437            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
438            Statement::Use(db) => self.use_database(db, query_ctx).await,
439            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
440            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
441            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
442        }
443    }
444
445    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
446        let catalog = query_ctx.current_catalog();
447        ensure!(
448            self.catalog_manager
449                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
450                .await
451                .context(CatalogSnafu)?,
452            SchemaNotFoundSnafu { schema_info: &db }
453        );
454
455        query_ctx.set_current_schema(&db);
456
457        Ok(Output::new_with_record_batches(RecordBatches::empty()))
458    }
459
460    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
461        let var_name = set_var.variable.to_string().to_uppercase();
462
463        debug!(
464            "Trying to set {}={} for session: {} ",
465            var_name,
466            set_var.value.iter().map(|e| e.to_string()).join(", "),
467            query_ctx.conn_info()
468        );
469
470        match var_name.as_str() {
471            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
472
473            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
474                set_timezone(set_var.value, query_ctx)?
475            }
476
477            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
478
479            // Same as "bytea_output", we just ignore it here.
480            // Not harmful since it only relates to how date is viewed in client app's output.
481            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
482            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
483
484            // Allow query to fallback when failed to push down.
485            "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
486
487            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
488            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
489                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
490                Channel::Postgres => {
491                    warn!(
492                        "Unsupported set variable {} for channel {:?}",
493                        var_name,
494                        query_ctx.channel()
495                    );
496                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
497                }
498                _ => {
499                    return NotSupportedSnafu {
500                        feat: format!("Unsupported set variable {}", var_name),
501                    }
502                    .fail();
503                }
504            },
505            "STATEMENT_TIMEOUT" => match query_ctx.channel() {
506                Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
507                Channel::Mysql => {
508                    warn!(
509                        "Unsupported set variable {} for channel {:?}",
510                        var_name,
511                        query_ctx.channel()
512                    );
513                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
514                }
515                _ => {
516                    return NotSupportedSnafu {
517                        feat: format!("Unsupported set variable {}", var_name),
518                    }
519                    .fail();
520                }
521            },
522            "SEARCH_PATH" => {
523                if query_ctx.channel() == Channel::Postgres {
524                    set_search_path(set_var.value, query_ctx)?
525                } else {
526                    return NotSupportedSnafu {
527                        feat: format!("Unsupported set variable {}", var_name),
528                    }
529                    .fail();
530                }
531            }
532            _ => {
533                if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
534                {
535                    // For unknown SET statements, we give a warning with success.
536                    // This prevents the SET call from becoming a blocker of MySQL/Postgres clients'
537                    // connection establishment.
538                    warn!(
539                        "Unsupported set variable {} for channel {:?}",
540                        var_name,
541                        query_ctx.channel()
542                    );
543                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
544                } else {
545                    return NotSupportedSnafu {
546                        feat: format!("Unsupported set variable {}", var_name),
547                    }
548                    .fail();
549                }
550            }
551        }
552        Ok(Output::new_with_affected_rows(0))
553    }
554
555    #[tracing::instrument(skip_all)]
556    pub async fn plan(
557        &self,
558        stmt: &QueryStatement,
559        query_ctx: QueryContextRef,
560    ) -> Result<LogicalPlan> {
561        self.query_engine
562            .planner()
563            .plan(stmt, query_ctx)
564            .await
565            .context(PlanStatementSnafu)
566    }
567
568    /// Execute [`LogicalPlan`] directly.
569    #[tracing::instrument(skip_all)]
570    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
571        self.query_engine
572            .execute(plan, query_ctx)
573            .await
574            .context(ExecLogicalPlanSnafu)
575    }
576
577    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
578        self.query_engine
579            .planner()
580            .optimize(plan)
581            .context(PlanStatementSnafu)
582    }
583
584    #[tracing::instrument(skip_all)]
585    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
586        let plan = self.plan(&stmt, query_ctx.clone()).await?;
587        self.exec_plan(plan, query_ctx).await
588    }
589
590    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
591        let TableReference {
592            catalog,
593            schema,
594            table,
595        } = table_ref;
596        self.catalog_manager
597            .table(catalog, schema, table, None)
598            .await
599            .context(CatalogSnafu)?
600            .with_context(|| TableNotFoundSnafu {
601                table_name: table_ref.to_string(),
602            })
603    }
604
605    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
606        &self.procedure_executor
607    }
608
609    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
610        &self.cache_invalidator
611    }
612
613    /// Convert truncate time ranges for the given table from sql values to timestamps
614    ///
615    pub async fn convert_truncate_time_ranges(
616        &self,
617        table_name: &TableName,
618        sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
619        query_ctx: &QueryContextRef,
620    ) -> Result<Vec<(Timestamp, Timestamp)>> {
621        if sql_values_time_range.is_empty() {
622            return Ok(vec![]);
623        }
624        let table = self.get_table(&table_name.table_ref()).await?;
625        let info = table.table_info();
626        let time_index_dt = info
627            .meta
628            .schema
629            .timestamp_column()
630            .context(UnexpectedSnafu {
631                violated: "Table must have a timestamp column",
632            })?;
633
634        let time_unit = time_index_dt
635            .data_type
636            .as_timestamp()
637            .with_context(|| UnexpectedSnafu {
638                violated: format!(
639                    "Table {}'s time index column must be a timestamp type, found: {:?}",
640                    table_name, time_index_dt
641                ),
642            })?
643            .unit();
644
645        let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
646        for (start, end) in sql_values_time_range {
647            let start = common_sql::convert::sql_value_to_value(
648                "range_start",
649                &ConcreteDataType::timestamp_datatype(time_unit),
650                start,
651                Some(&query_ctx.timezone()),
652                None,
653                false,
654            )
655            .context(SqlCommonSnafu)
656            .and_then(|v| {
657                if let datatypes::value::Value::Timestamp(t) = v {
658                    Ok(t)
659                } else {
660                    error::InvalidSqlSnafu {
661                        err_msg: format!("Expected a timestamp value, found {v:?}"),
662                    }
663                    .fail()
664                }
665            })?;
666
667            let end = common_sql::convert::sql_value_to_value(
668                "range_end",
669                &ConcreteDataType::timestamp_datatype(time_unit),
670                end,
671                Some(&query_ctx.timezone()),
672                None,
673                false,
674            )
675            .context(SqlCommonSnafu)
676            .and_then(|v| {
677                if let datatypes::value::Value::Timestamp(t) = v {
678                    Ok(t)
679                } else {
680                    error::InvalidSqlSnafu {
681                        err_msg: format!("Expected a timestamp value, found {v:?}"),
682                    }
683                    .fail()
684                }
685            })?;
686            time_ranges.push((start, end));
687        }
688        Ok(time_ranges)
689    }
690
691    /// Returns the inserter for the statement executor.
692    pub(crate) fn inserter(&self) -> &InserterRef {
693        &self.inserter
694    }
695}
696
697fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
698    let CopyQueryToArgument {
699        with,
700        connection,
701        location,
702    } = stmt;
703
704    Ok(CopyQueryToRequest {
705        location,
706        with: with.into_map(),
707        connection: connection.into_map(),
708    })
709}
710
711// Verifies time related format is valid
712fn verify_time_related_format(with: &OptionMap) -> Result<()> {
713    let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
714    let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
715    let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
716    let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
717
718    if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
719        ensure!(
720            time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
721            error::TimestampFormatNotSupportedSnafu {
722                format: "<unknown>".to_string(),
723                file_format: file_format.unwrap_or_default(),
724            }
725        );
726    }
727
728    for (key, format_opt) in [
729        (common_datasource::file_format::TIME_FORMAT, time_format),
730        (common_datasource::file_format::DATE_FORMAT, date_format),
731        (
732            common_datasource::file_format::TIMESTAMP_FORMAT,
733            timestamp_format,
734        ),
735    ] {
736        if let Some(format) = format_opt {
737            chrono::format::strftime::StrftimeItems::new(format)
738                .parse()
739                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
740        }
741    }
742
743    Ok(())
744}
745
746fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
747    let direction = match stmt {
748        CopyTable::To(_) => CopyDirection::Export,
749        CopyTable::From(_) => CopyDirection::Import,
750    };
751
752    let CopyTableArgument {
753        location,
754        connection,
755        with,
756        table_name,
757        limit,
758        ..
759    } = match stmt {
760        CopyTable::To(arg) => arg,
761        CopyTable::From(arg) => arg,
762    };
763    let (catalog_name, schema_name, table_name) =
764        table_idents_to_full_name(&table_name, &query_ctx)
765            .map_err(BoxedError::new)
766            .context(ExternalSnafu)?;
767
768    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
769
770    verify_time_related_format(&with)?;
771
772    let pattern = with
773        .get(common_datasource::file_format::FILE_PATTERN)
774        .map(|x| x.to_string());
775
776    Ok(CopyTableRequest {
777        catalog_name,
778        schema_name,
779        table_name,
780        location,
781        with: with.into_map(),
782        connection: connection.into_map(),
783        pattern,
784        direction,
785        timestamp_range,
786        limit,
787    })
788}
789
790/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
791/// This function extracts the necessary info including catalog/database name, time range, etc.
792fn to_copy_database_request(
793    arg: CopyDatabaseArgument,
794    query_ctx: &QueryContextRef,
795) -> Result<CopyDatabaseRequest> {
796    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
797        .map_err(BoxedError::new)
798        .context(ExternalSnafu)?;
799    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
800
801    Ok(CopyDatabaseRequest {
802        catalog_name,
803        schema_name: database_name,
804        location: arg.location,
805        with: arg.with.into_map(),
806        connection: arg.connection.into_map(),
807        time_range,
808    })
809}
810
811/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
812/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
813/// The timezone used for conversion will respect that inside `query_ctx`.
814fn timestamp_range_from_option_map(
815    options: &OptionMap,
816    query_ctx: &QueryContextRef,
817) -> Result<Option<TimestampRange>> {
818    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
819    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
820    let time_range = match (start_timestamp, end_timestamp) {
821        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
822            error::InvalidTimestampRangeSnafu {
823                start: start.to_iso8601_string(),
824                end: end.to_iso8601_string(),
825            }
826        })?),
827        (Some(start), None) => Some(TimestampRange::from_start(start)),
828        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
829        (None, None) => None,
830    };
831    Ok(time_range)
832}
833
834/// Extracts timestamp from a [HashMap<String, String>] with given key.
835fn extract_timestamp(
836    map: &OptionMap,
837    key: &str,
838    query_ctx: &QueryContextRef,
839) -> Result<Option<Timestamp>> {
840    map.get(key)
841        .map(|v| {
842            Timestamp::from_str(v, Some(&query_ctx.timezone()))
843                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
844        })
845        .transpose()
846}
847
848fn idents_to_full_database_name(
849    obj_name: &ObjectName,
850    query_ctx: &QueryContextRef,
851) -> Result<(String, String)> {
852    match &obj_name.0[..] {
853        [database] => Ok((
854            query_ctx.current_catalog().to_owned(),
855            database.to_string_unquoted(),
856        )),
857        [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
858        _ => InvalidSqlSnafu {
859            err_msg: format!(
860                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
861            ),
862        }
863        .fail(),
864    }
865}
866
867/// The [`Inserter`] implementation for the statement executor.
868pub struct InserterImpl {
869    statement_executor: StatementExecutorRef,
870    options: Option<InsertOptions>,
871}
872
873impl InserterImpl {
874    pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
875        Self {
876            statement_executor,
877            options,
878        }
879    }
880}
881
882#[async_trait::async_trait]
883impl Inserter for InserterImpl {
884    async fn insert_rows(
885        &self,
886        context: &client::inserter::Context<'_>,
887        requests: RowInsertRequests,
888    ) -> ClientResult<()> {
889        let mut ctx_builder = QueryContextBuilder::default()
890            .current_catalog(context.catalog.to_string())
891            .current_schema(context.schema.to_string());
892        if let Some(options) = self.options.as_ref() {
893            ctx_builder = ctx_builder
894                .set_extension(
895                    TTL_KEY.to_string(),
896                    format_duration(options.ttl).to_string(),
897                )
898                .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
899        }
900        let query_ctx = ctx_builder.build().into();
901
902        self.statement_executor
903            .inserter()
904            .handle_row_inserts(
905                requests,
906                query_ctx,
907                self.statement_executor.as_ref(),
908                false,
909                false,
910            )
911            .await
912            .map_err(BoxedError::new)
913            .context(ClientExternalSnafu)
914            .map(|_| ())
915    }
916
917    fn set_options(&mut self, options: &InsertOptions) {
918        self.options = Some(*options);
919    }
920}
921
922#[cfg(test)]
923mod tests {
924    use std::assert_matches::assert_matches;
925    use std::collections::HashMap;
926
927    use common_time::range::TimestampRange;
928    use common_time::{Timestamp, Timezone};
929    use session::context::QueryContextBuilder;
930    use sql::statements::OptionMap;
931
932    use crate::error;
933    use crate::statement::copy_database::{
934        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
935    };
936    use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
937
938    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
939        let query_ctx = QueryContextBuilder::default()
940            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
941            .build()
942            .into();
943        let map = OptionMap::from(
944            [
945                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
946                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
947            ]
948            .into_iter()
949            .collect::<HashMap<_, _>>(),
950        );
951        timestamp_range_from_option_map(&map, &query_ctx)
952    }
953
954    #[test]
955    fn test_timestamp_range_from_option_map() {
956        assert_eq!(
957            Some(
958                TimestampRange::new(
959                    Timestamp::new_second(1649635200),
960                    Timestamp::new_second(1649664000),
961                )
962                .unwrap(),
963            ),
964            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
965        );
966
967        assert_matches!(
968            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
969            error::Error::InvalidTimestampRange { .. }
970        );
971    }
972
973    #[test]
974    fn test_verify_timestamp_format() {
975        let map = OptionMap::from(
976            [
977                (
978                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
979                    "%Y-%m-%d %H:%M:%S".to_string(),
980                ),
981                (
982                    common_datasource::file_format::FORMAT_TYPE.to_string(),
983                    "csv".to_string(),
984                ),
985            ]
986            .into_iter()
987            .collect::<HashMap<_, _>>(),
988        );
989        assert!(verify_time_related_format(&map).is_ok());
990
991        let map = OptionMap::from(
992            [
993                (
994                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
995                    "%Y-%m-%d %H:%M:%S".to_string(),
996                ),
997                (
998                    common_datasource::file_format::FORMAT_TYPE.to_string(),
999                    "json".to_string(),
1000                ),
1001            ]
1002            .into_iter()
1003            .collect::<HashMap<_, _>>(),
1004        );
1005
1006        assert_matches!(
1007            verify_time_related_format(&map).unwrap_err(),
1008            error::Error::TimestampFormatNotSupported { .. }
1009        );
1010        let map = OptionMap::from(
1011            [
1012                (
1013                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1014                    "%111112".to_string(),
1015                ),
1016                (
1017                    common_datasource::file_format::FORMAT_TYPE.to_string(),
1018                    "csv".to_string(),
1019                ),
1020            ]
1021            .into_iter()
1022            .collect::<HashMap<_, _>>(),
1023        );
1024
1025        assert_matches!(
1026            verify_time_related_format(&map).unwrap_err(),
1027            error::Error::InvalidCopyParameter { .. }
1028        );
1029    }
1030}