operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod comment;
17mod copy_database;
18mod copy_query_to;
19mod copy_table_from;
20mod copy_table_to;
21mod cursor;
22pub mod ddl;
23mod describe;
24mod dml;
25mod kill;
26mod set;
27mod show;
28mod tql;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32
33use api::v1::RowInsertRequests;
34use catalog::CatalogManagerRef;
35use catalog::kvbackend::KvBackendCatalogManager;
36use catalog::process_manager::ProcessManagerRef;
37use client::RecordBatches;
38use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
39use client::inserter::{InsertOptions, Inserter};
40use common_error::ext::BoxedError;
41use common_meta::cache::TableRouteCacheRef;
42use common_meta::cache_invalidator::CacheInvalidatorRef;
43use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
44use common_meta::key::schema_name::SchemaNameKey;
45use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
46use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
47use common_meta::kv_backend::KvBackendRef;
48use common_meta::procedure_executor::ProcedureExecutorRef;
49use common_query::Output;
50use common_telemetry::{debug, tracing, warn};
51use common_time::Timestamp;
52use common_time::range::TimestampRange;
53use datafusion_expr::LogicalPlan;
54use datatypes::prelude::ConcreteDataType;
55use datatypes::schema::ColumnSchema;
56use humantime::format_duration;
57use itertools::Itertools;
58use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
59use query::QueryEngineRef;
60use query::parser::QueryStatement;
61use session::context::{Channel, QueryContextBuilder, QueryContextRef};
62use session::table_name::table_idents_to_full_name;
63use set::{set_query_timeout, set_read_preference};
64use snafu::{OptionExt, ResultExt, ensure};
65use sql::ast::ObjectNamePartExt;
66use sql::statements::OptionMap;
67use sql::statements::copy::{
68    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
69};
70use sql::statements::set_variables::SetVariables;
71use sql::statements::show::ShowCreateTableVariant;
72use sql::statements::statement::Statement;
73use sql::util::format_raw_object_name;
74use sqlparser::ast::ObjectName;
75use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
76use table::TableRef;
77use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
78use table::table_name::TableName;
79use table::table_reference::TableReference;
80
81use self::set::{
82    set_bytea_output, set_datestyle, set_intervalstyle, set_search_path, set_timezone,
83    validate_client_encoding,
84};
85use crate::error::{
86    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
87    PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
88    TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
89};
90use crate::insert::InserterRef;
91use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
92use crate::statement::set::set_allow_query_fallback;
93
94/// A configurator that customizes or enhances a [`StatementExecutor`].
95#[async_trait::async_trait]
96pub trait StatementExecutorConfigurator: Send + Sync {
97    async fn configure(
98        &self,
99        executor: StatementExecutor,
100        ctx: ExecutorConfigureContext,
101    ) -> std::result::Result<StatementExecutor, BoxedError>;
102}
103
104pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
105
106pub struct ExecutorConfigureContext {
107    pub kv_backend: KvBackendRef,
108}
109
110#[derive(Clone)]
111pub struct StatementExecutor {
112    catalog_manager: CatalogManagerRef,
113    query_engine: QueryEngineRef,
114    procedure_executor: ProcedureExecutorRef,
115    table_metadata_manager: TableMetadataManagerRef,
116    flow_metadata_manager: FlowMetadataManagerRef,
117    view_info_manager: ViewInfoManagerRef,
118    partition_manager: PartitionRuleManagerRef,
119    cache_invalidator: CacheInvalidatorRef,
120    inserter: InserterRef,
121    process_manager: Option<ProcessManagerRef>,
122    #[cfg(feature = "enterprise")]
123    trigger_querier: Option<TriggerQuerierRef>,
124}
125
126pub type StatementExecutorRef = Arc<StatementExecutor>;
127
128/// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc.
129#[cfg(feature = "enterprise")]
130#[async_trait::async_trait]
131pub trait TriggerQuerier: Send + Sync {
132    // Query the `SHOW CREATE TRIGGER` statement for the given trigger.
133    async fn show_create_trigger(
134        &self,
135        catalog: &str,
136        trigger: &str,
137        query_ctx: &QueryContextRef,
138    ) -> std::result::Result<Output, BoxedError>;
139
140    fn as_any(&self) -> &dyn std::any::Any;
141}
142
143#[cfg(feature = "enterprise")]
144pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
145
146impl StatementExecutor {
147    #[allow(clippy::too_many_arguments)]
148    pub fn new(
149        catalog_manager: CatalogManagerRef,
150        query_engine: QueryEngineRef,
151        procedure_executor: ProcedureExecutorRef,
152        kv_backend: KvBackendRef,
153        cache_invalidator: CacheInvalidatorRef,
154        inserter: InserterRef,
155        table_route_cache: TableRouteCacheRef,
156        process_manager: Option<ProcessManagerRef>,
157    ) -> Self {
158        Self {
159            catalog_manager,
160            query_engine,
161            procedure_executor,
162            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
163            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
164            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
165            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
166            cache_invalidator,
167            inserter,
168            process_manager,
169            #[cfg(feature = "enterprise")]
170            trigger_querier: None,
171        }
172    }
173
174    #[cfg(feature = "enterprise")]
175    pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
176        self.trigger_querier = Some(querier);
177        self
178    }
179
180    #[cfg(feature = "testing")]
181    pub async fn execute_stmt(
182        &self,
183        stmt: QueryStatement,
184        query_ctx: QueryContextRef,
185    ) -> Result<Output> {
186        match stmt {
187            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
188            QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
189        }
190    }
191
192    #[tracing::instrument(skip_all)]
193    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
194        match stmt {
195            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
196                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
197            }
198
199            Statement::DeclareCursor(declare_cursor) => {
200                self.declare_cursor(declare_cursor, query_ctx).await
201            }
202            Statement::FetchCursor(fetch_cursor) => {
203                self.fetch_cursor(fetch_cursor, query_ctx).await
204            }
205            Statement::CloseCursor(close_cursor) => {
206                self.close_cursor(close_cursor, query_ctx).await
207            }
208
209            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
210
211            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
212
213            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
214
215            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
216
217            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
218
219            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
220
221            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
222
223            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
224
225            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
226
227            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
228
229            #[cfg(feature = "enterprise")]
230            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
231
232            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
233                let query_output = self
234                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
235                    .await?;
236                let req = to_copy_query_request(stmt.arg)?;
237
238                self.copy_query_to(req, query_output)
239                    .await
240                    .map(Output::new_with_affected_rows)
241            }
242
243            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
244                let req = to_copy_table_request(stmt, query_ctx.clone())?;
245                match req.direction {
246                    CopyDirection::Export => self
247                        .copy_table_to(req, query_ctx)
248                        .await
249                        .map(Output::new_with_affected_rows),
250                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
251                }
252            }
253
254            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
255                match copy_database {
256                    CopyDatabase::To(arg) => {
257                        self.copy_database_to(
258                            to_copy_database_request(arg, &query_ctx)?,
259                            query_ctx.clone(),
260                        )
261                        .await
262                    }
263                    CopyDatabase::From(arg) => {
264                        self.copy_database_from(
265                            to_copy_database_request(arg, &query_ctx)?,
266                            query_ctx,
267                        )
268                        .await
269                    }
270                }
271            }
272
273            Statement::CreateTable(stmt) => {
274                let _ = self.create_table(stmt, query_ctx).await?;
275                Ok(Output::new_with_affected_rows(0))
276            }
277            Statement::CreateTableLike(stmt) => {
278                let _ = self.create_table_like(stmt, query_ctx).await?;
279                Ok(Output::new_with_affected_rows(0))
280            }
281            Statement::CreateExternalTable(stmt) => {
282                let _ = self.create_external_table(stmt, query_ctx).await?;
283                Ok(Output::new_with_affected_rows(0))
284            }
285            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
286            #[cfg(feature = "enterprise")]
287            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
288            Statement::DropFlow(stmt) => {
289                self.drop_flow(
290                    query_ctx.current_catalog().to_string(),
291                    format_raw_object_name(stmt.flow_name()),
292                    stmt.drop_if_exists(),
293                    query_ctx,
294                )
295                .await
296            }
297            #[cfg(feature = "enterprise")]
298            Statement::DropTrigger(stmt) => {
299                self.drop_trigger(
300                    query_ctx.current_catalog().to_string(),
301                    format_raw_object_name(stmt.trigger_name()),
302                    stmt.drop_if_exists(),
303                    query_ctx,
304                )
305                .await
306            }
307            Statement::CreateView(stmt) => {
308                let _ = self.create_view(stmt, query_ctx).await?;
309                Ok(Output::new_with_affected_rows(0))
310            }
311            Statement::DropView(stmt) => {
312                let (catalog_name, schema_name, view_name) =
313                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
314                        .map_err(BoxedError::new)
315                        .context(ExternalSnafu)?;
316
317                self.drop_view(
318                    catalog_name,
319                    schema_name,
320                    view_name,
321                    stmt.drop_if_exists,
322                    query_ctx,
323                )
324                .await
325            }
326            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
327
328            Statement::AlterDatabase(alter_database) => {
329                self.alter_database(alter_database, query_ctx).await
330            }
331
332            #[cfg(feature = "enterprise")]
333            Statement::AlterTrigger(alter_trigger) => {
334                self.alter_trigger(alter_trigger, query_ctx).await
335            }
336
337            Statement::DropTable(stmt) => {
338                let mut table_names = Vec::with_capacity(stmt.table_names().len());
339                for table_name_stmt in stmt.table_names() {
340                    let (catalog, schema, table) =
341                        table_idents_to_full_name(table_name_stmt, &query_ctx)
342                            .map_err(BoxedError::new)
343                            .context(ExternalSnafu)?;
344                    table_names.push(TableName::new(catalog, schema, table));
345                }
346                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
347                    .await
348            }
349            Statement::DropDatabase(stmt) => {
350                self.drop_database(
351                    query_ctx.current_catalog().to_string(),
352                    format_raw_object_name(stmt.name()),
353                    stmt.drop_if_exists(),
354                    query_ctx,
355                )
356                .await
357            }
358            Statement::TruncateTable(stmt) => {
359                let (catalog, schema, table) =
360                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
361                        .map_err(BoxedError::new)
362                        .context(ExternalSnafu)?;
363                let table_name = TableName::new(catalog, schema, table);
364                let time_ranges = self
365                    .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
366                    .await?;
367                self.truncate_table(table_name, time_ranges, query_ctx)
368                    .await
369            }
370            Statement::CreateDatabase(stmt) => {
371                self.create_database(
372                    &format_raw_object_name(&stmt.name),
373                    stmt.if_not_exists,
374                    stmt.options.into_map(),
375                    query_ctx,
376                )
377                .await
378            }
379            Statement::ShowCreateDatabase(show) => {
380                let (catalog, database) =
381                    idents_to_full_database_name(&show.database_name, &query_ctx)
382                        .map_err(BoxedError::new)
383                        .context(ExternalSnafu)?;
384                let table_metadata_manager = self
385                    .catalog_manager
386                    .as_any()
387                    .downcast_ref::<KvBackendCatalogManager>()
388                    .map(|manager| manager.table_metadata_manager_ref().clone())
389                    .context(UpgradeCatalogManagerRefSnafu)?;
390                let opts: HashMap<String, String> = table_metadata_manager
391                    .schema_manager()
392                    .get(SchemaNameKey::new(&catalog, &database))
393                    .await
394                    .context(TableMetadataManagerSnafu)?
395                    .context(SchemaNotFoundSnafu {
396                        schema_info: &database,
397                    })?
398                    .into_inner()
399                    .into();
400
401                self.show_create_database(&database, opts.into()).await
402            }
403            Statement::ShowCreateTable(show) => {
404                let (catalog, schema, table) =
405                    table_idents_to_full_name(&show.table_name, &query_ctx)
406                        .map_err(BoxedError::new)
407                        .context(ExternalSnafu)?;
408
409                let table_ref = self
410                    .catalog_manager
411                    .table(&catalog, &schema, &table, Some(&query_ctx))
412                    .await
413                    .context(CatalogSnafu)?
414                    .context(TableNotFoundSnafu { table_name: &table })?;
415                let table_name = TableName::new(catalog, schema, table);
416
417                match show.variant {
418                    ShowCreateTableVariant::Original => {
419                        self.show_create_table(table_name, table_ref, query_ctx)
420                            .await
421                    }
422                    ShowCreateTableVariant::PostgresForeignTable => {
423                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
424                            .await
425                    }
426                }
427            }
428            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
429            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
430            #[cfg(feature = "enterprise")]
431            Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
432            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
433            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
434            Statement::Comment(stmt) => self.comment(stmt, query_ctx).await,
435            Statement::ShowColumns(show_columns) => {
436                self.show_columns(show_columns, query_ctx).await
437            }
438            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
439            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
440            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
441            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
442            Statement::Use(db) => self.use_database(db, query_ctx).await,
443            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
444            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
445            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
446        }
447    }
448
449    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
450        let catalog = query_ctx.current_catalog();
451        ensure!(
452            self.catalog_manager
453                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
454                .await
455                .context(CatalogSnafu)?,
456            SchemaNotFoundSnafu { schema_info: &db }
457        );
458
459        query_ctx.set_current_schema(&db);
460
461        Ok(Output::new_with_record_batches(RecordBatches::empty()))
462    }
463
464    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
465        let var_name = set_var.variable.to_string().to_uppercase();
466
467        debug!(
468            "Trying to set {}={} for session: {} ",
469            var_name,
470            set_var.value.iter().map(|e| e.to_string()).join(", "),
471            query_ctx.conn_info()
472        );
473
474        match var_name.as_str() {
475            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
476
477            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
478                set_timezone(set_var.value, query_ctx)?
479            }
480
481            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
482
483            // Same as "bytea_output", we just ignore it here.
484            // Not harmful since it only relates to how date is viewed in client app's output.
485            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
486            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
487            "INTERVALSTYLE" => set_intervalstyle(set_var.value, query_ctx)?,
488
489            // Allow query to fallback when failed to push down.
490            "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
491
492            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
493            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
494                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
495                Channel::Postgres => {
496                    warn!(
497                        "Unsupported set variable {} for channel {:?}",
498                        var_name,
499                        query_ctx.channel()
500                    );
501                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
502                }
503                _ => {
504                    return NotSupportedSnafu {
505                        feat: format!("Unsupported set variable {}", var_name),
506                    }
507                    .fail();
508                }
509            },
510            "STATEMENT_TIMEOUT" => match query_ctx.channel() {
511                Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
512                Channel::Mysql => {
513                    warn!(
514                        "Unsupported set variable {} for channel {:?}",
515                        var_name,
516                        query_ctx.channel()
517                    );
518                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
519                }
520                _ => {
521                    return NotSupportedSnafu {
522                        feat: format!("Unsupported set variable {}", var_name),
523                    }
524                    .fail();
525                }
526            },
527            "SEARCH_PATH" => {
528                if query_ctx.channel() == Channel::Postgres {
529                    set_search_path(set_var.value, query_ctx)?
530                } else {
531                    return NotSupportedSnafu {
532                        feat: format!("Unsupported set variable {}", var_name),
533                    }
534                    .fail();
535                }
536            }
537            _ => {
538                if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
539                {
540                    // For unknown SET statements, we give a warning with success.
541                    // This prevents the SET call from becoming a blocker of MySQL/Postgres clients'
542                    // connection establishment.
543                    warn!(
544                        "Unsupported set variable {} for channel {:?}",
545                        var_name,
546                        query_ctx.channel()
547                    );
548                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
549                } else {
550                    return NotSupportedSnafu {
551                        feat: format!("Unsupported set variable {}", var_name),
552                    }
553                    .fail();
554                }
555            }
556        }
557        Ok(Output::new_with_affected_rows(0))
558    }
559
560    #[tracing::instrument(skip_all)]
561    pub async fn plan(
562        &self,
563        stmt: &QueryStatement,
564        query_ctx: QueryContextRef,
565    ) -> Result<LogicalPlan> {
566        self.query_engine
567            .planner()
568            .plan(stmt, query_ctx)
569            .await
570            .context(PlanStatementSnafu)
571    }
572
573    /// Execute [`LogicalPlan`] directly.
574    #[tracing::instrument(skip_all)]
575    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
576        self.query_engine
577            .execute(plan, query_ctx)
578            .await
579            .context(ExecLogicalPlanSnafu)
580    }
581
582    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
583        self.query_engine
584            .planner()
585            .optimize(plan)
586            .context(PlanStatementSnafu)
587    }
588
589    #[tracing::instrument(skip_all)]
590    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
591        let plan = self.plan(&stmt, query_ctx.clone()).await?;
592        self.exec_plan(plan, query_ctx).await
593    }
594
595    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
596        let TableReference {
597            catalog,
598            schema,
599            table,
600        } = table_ref;
601        self.catalog_manager
602            .table(catalog, schema, table, None)
603            .await
604            .context(CatalogSnafu)?
605            .with_context(|| TableNotFoundSnafu {
606                table_name: table_ref.to_string(),
607            })
608    }
609
610    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
611        &self.procedure_executor
612    }
613
614    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
615        &self.cache_invalidator
616    }
617
618    /// Convert truncate time ranges for the given table from sql values to timestamps
619    ///
620    pub async fn convert_truncate_time_ranges(
621        &self,
622        table_name: &TableName,
623        sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
624        query_ctx: &QueryContextRef,
625    ) -> Result<Vec<(Timestamp, Timestamp)>> {
626        if sql_values_time_range.is_empty() {
627            return Ok(vec![]);
628        }
629        let table = self.get_table(&table_name.table_ref()).await?;
630        let info = table.table_info();
631        let time_index_dt = info
632            .meta
633            .schema
634            .timestamp_column()
635            .context(UnexpectedSnafu {
636                violated: "Table must have a timestamp column",
637            })?;
638
639        let time_unit = time_index_dt
640            .data_type
641            .as_timestamp()
642            .with_context(|| UnexpectedSnafu {
643                violated: format!(
644                    "Table {}'s time index column must be a timestamp type, found: {:?}",
645                    table_name, time_index_dt
646                ),
647            })?
648            .unit();
649
650        let start_column = ColumnSchema::new(
651            "range_start",
652            ConcreteDataType::timestamp_datatype(time_unit),
653            false,
654        );
655        let end_column = ColumnSchema::new(
656            "range_end",
657            ConcreteDataType::timestamp_datatype(time_unit),
658            false,
659        );
660        let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
661        for (start, end) in sql_values_time_range {
662            let start = common_sql::convert::sql_value_to_value(
663                &start_column,
664                start,
665                Some(&query_ctx.timezone()),
666                None,
667                false,
668            )
669            .context(SqlCommonSnafu)
670            .and_then(|v| {
671                if let datatypes::value::Value::Timestamp(t) = v {
672                    Ok(t)
673                } else {
674                    error::InvalidSqlSnafu {
675                        err_msg: format!("Expected a timestamp value, found {v:?}"),
676                    }
677                    .fail()
678                }
679            })?;
680
681            let end = common_sql::convert::sql_value_to_value(
682                &end_column,
683                end,
684                Some(&query_ctx.timezone()),
685                None,
686                false,
687            )
688            .context(SqlCommonSnafu)
689            .and_then(|v| {
690                if let datatypes::value::Value::Timestamp(t) = v {
691                    Ok(t)
692                } else {
693                    error::InvalidSqlSnafu {
694                        err_msg: format!("Expected a timestamp value, found {v:?}"),
695                    }
696                    .fail()
697                }
698            })?;
699            time_ranges.push((start, end));
700        }
701        Ok(time_ranges)
702    }
703
704    /// Returns the inserter for the statement executor.
705    pub(crate) fn inserter(&self) -> &InserterRef {
706        &self.inserter
707    }
708}
709
710fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
711    let CopyQueryToArgument {
712        with,
713        connection,
714        location,
715    } = stmt;
716
717    Ok(CopyQueryToRequest {
718        location,
719        with: with.into_map(),
720        connection: connection.into_map(),
721    })
722}
723
724// Verifies time related format is valid
725fn verify_time_related_format(with: &OptionMap) -> Result<()> {
726    let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
727    let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
728    let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
729    let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
730
731    if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
732        ensure!(
733            time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
734            error::TimestampFormatNotSupportedSnafu {
735                format: "<unknown>".to_string(),
736                file_format: file_format.unwrap_or_default(),
737            }
738        );
739    }
740
741    for (key, format_opt) in [
742        (common_datasource::file_format::TIME_FORMAT, time_format),
743        (common_datasource::file_format::DATE_FORMAT, date_format),
744        (
745            common_datasource::file_format::TIMESTAMP_FORMAT,
746            timestamp_format,
747        ),
748    ] {
749        if let Some(format) = format_opt {
750            chrono::format::strftime::StrftimeItems::new(format)
751                .parse()
752                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
753        }
754    }
755
756    Ok(())
757}
758
759fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
760    let direction = match stmt {
761        CopyTable::To(_) => CopyDirection::Export,
762        CopyTable::From(_) => CopyDirection::Import,
763    };
764
765    let CopyTableArgument {
766        location,
767        connection,
768        with,
769        table_name,
770        limit,
771        ..
772    } = match stmt {
773        CopyTable::To(arg) => arg,
774        CopyTable::From(arg) => arg,
775    };
776    let (catalog_name, schema_name, table_name) =
777        table_idents_to_full_name(&table_name, &query_ctx)
778            .map_err(BoxedError::new)
779            .context(ExternalSnafu)?;
780
781    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
782
783    verify_time_related_format(&with)?;
784
785    let pattern = with
786        .get(common_datasource::file_format::FILE_PATTERN)
787        .map(|x| x.to_string());
788
789    Ok(CopyTableRequest {
790        catalog_name,
791        schema_name,
792        table_name,
793        location,
794        with: with.into_map(),
795        connection: connection.into_map(),
796        pattern,
797        direction,
798        timestamp_range,
799        limit,
800    })
801}
802
803/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
804/// This function extracts the necessary info including catalog/database name, time range, etc.
805fn to_copy_database_request(
806    arg: CopyDatabaseArgument,
807    query_ctx: &QueryContextRef,
808) -> Result<CopyDatabaseRequest> {
809    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
810        .map_err(BoxedError::new)
811        .context(ExternalSnafu)?;
812    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
813
814    Ok(CopyDatabaseRequest {
815        catalog_name,
816        schema_name: database_name,
817        location: arg.location,
818        with: arg.with.into_map(),
819        connection: arg.connection.into_map(),
820        time_range,
821    })
822}
823
824/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
825/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
826/// The timezone used for conversion will respect that inside `query_ctx`.
827fn timestamp_range_from_option_map(
828    options: &OptionMap,
829    query_ctx: &QueryContextRef,
830) -> Result<Option<TimestampRange>> {
831    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
832    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
833    let time_range = match (start_timestamp, end_timestamp) {
834        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
835            error::InvalidTimestampRangeSnafu {
836                start: start.to_iso8601_string(),
837                end: end.to_iso8601_string(),
838            }
839        })?),
840        (Some(start), None) => Some(TimestampRange::from_start(start)),
841        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
842        (None, None) => None,
843    };
844    Ok(time_range)
845}
846
847/// Extracts timestamp from a [HashMap<String, String>] with given key.
848fn extract_timestamp(
849    map: &OptionMap,
850    key: &str,
851    query_ctx: &QueryContextRef,
852) -> Result<Option<Timestamp>> {
853    map.get(key)
854        .map(|v| {
855            Timestamp::from_str(v, Some(&query_ctx.timezone()))
856                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
857        })
858        .transpose()
859}
860
861fn idents_to_full_database_name(
862    obj_name: &ObjectName,
863    query_ctx: &QueryContextRef,
864) -> Result<(String, String)> {
865    match &obj_name.0[..] {
866        [database] => Ok((
867            query_ctx.current_catalog().to_owned(),
868            database.to_string_unquoted(),
869        )),
870        [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
871        _ => InvalidSqlSnafu {
872            err_msg: format!(
873                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
874            ),
875        }
876        .fail(),
877    }
878}
879
880/// The [`Inserter`] implementation for the statement executor.
881pub struct InserterImpl {
882    statement_executor: StatementExecutorRef,
883    options: Option<InsertOptions>,
884}
885
886impl InserterImpl {
887    pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
888        Self {
889            statement_executor,
890            options,
891        }
892    }
893}
894
895#[async_trait::async_trait]
896impl Inserter for InserterImpl {
897    async fn insert_rows(
898        &self,
899        context: &client::inserter::Context<'_>,
900        requests: RowInsertRequests,
901    ) -> ClientResult<()> {
902        let mut ctx_builder = QueryContextBuilder::default()
903            .current_catalog(context.catalog.to_string())
904            .current_schema(context.schema.to_string());
905        if let Some(options) = self.options.as_ref() {
906            ctx_builder = ctx_builder
907                .set_extension(
908                    TTL_KEY.to_string(),
909                    format_duration(options.ttl).to_string(),
910                )
911                .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
912        }
913        let query_ctx = ctx_builder.build().into();
914
915        self.statement_executor
916            .inserter()
917            .handle_row_inserts(
918                requests,
919                query_ctx,
920                self.statement_executor.as_ref(),
921                false,
922                false,
923            )
924            .await
925            .map_err(BoxedError::new)
926            .context(ClientExternalSnafu)
927            .map(|_| ())
928    }
929
930    fn set_options(&mut self, options: &InsertOptions) {
931        self.options = Some(*options);
932    }
933}
934
935#[cfg(test)]
936mod tests {
937    use std::assert_matches::assert_matches;
938    use std::collections::HashMap;
939
940    use common_time::range::TimestampRange;
941    use common_time::{Timestamp, Timezone};
942    use session::context::QueryContextBuilder;
943    use sql::statements::OptionMap;
944
945    use crate::error;
946    use crate::statement::copy_database::{
947        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
948    };
949    use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
950
951    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
952        let query_ctx = QueryContextBuilder::default()
953            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
954            .build()
955            .into();
956        let map = OptionMap::from(
957            [
958                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
959                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
960            ]
961            .into_iter()
962            .collect::<HashMap<_, _>>(),
963        );
964        timestamp_range_from_option_map(&map, &query_ctx)
965    }
966
967    #[test]
968    fn test_timestamp_range_from_option_map() {
969        assert_eq!(
970            Some(
971                TimestampRange::new(
972                    Timestamp::new_second(1649635200),
973                    Timestamp::new_second(1649664000),
974                )
975                .unwrap(),
976            ),
977            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
978        );
979
980        assert_matches!(
981            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
982            error::Error::InvalidTimestampRange { .. }
983        );
984    }
985
986    #[test]
987    fn test_verify_timestamp_format() {
988        let map = OptionMap::from(
989            [
990                (
991                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
992                    "%Y-%m-%d %H:%M:%S".to_string(),
993                ),
994                (
995                    common_datasource::file_format::FORMAT_TYPE.to_string(),
996                    "csv".to_string(),
997                ),
998            ]
999            .into_iter()
1000            .collect::<HashMap<_, _>>(),
1001        );
1002        assert!(verify_time_related_format(&map).is_ok());
1003
1004        let map = OptionMap::from(
1005            [
1006                (
1007                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1008                    "%Y-%m-%d %H:%M:%S".to_string(),
1009                ),
1010                (
1011                    common_datasource::file_format::FORMAT_TYPE.to_string(),
1012                    "json".to_string(),
1013                ),
1014            ]
1015            .into_iter()
1016            .collect::<HashMap<_, _>>(),
1017        );
1018
1019        assert_matches!(
1020            verify_time_related_format(&map).unwrap_err(),
1021            error::Error::TimestampFormatNotSupported { .. }
1022        );
1023        let map = OptionMap::from(
1024            [
1025                (
1026                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1027                    "%111112".to_string(),
1028                ),
1029                (
1030                    common_datasource::file_format::FORMAT_TYPE.to_string(),
1031                    "csv".to_string(),
1032                ),
1033            ]
1034            .into_iter()
1035            .collect::<HashMap<_, _>>(),
1036        );
1037
1038        assert_matches!(
1039            verify_time_related_format(&map).unwrap_err(),
1040            error::Error::InvalidCopyParameter { .. }
1041        );
1042    }
1043}