operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod comment;
17mod copy_database;
18mod copy_query_to;
19mod copy_table_from;
20mod copy_table_to;
21mod cursor;
22pub mod ddl;
23mod describe;
24mod dml;
25mod kill;
26mod set;
27mod show;
28mod tql;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32
33use api::v1::RowInsertRequests;
34use catalog::CatalogManagerRef;
35use catalog::kvbackend::KvBackendCatalogManager;
36use catalog::process_manager::ProcessManagerRef;
37use client::RecordBatches;
38use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
39use client::inserter::{InsertOptions, Inserter};
40use common_error::ext::BoxedError;
41use common_meta::cache::TableRouteCacheRef;
42use common_meta::cache_invalidator::CacheInvalidatorRef;
43use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
44use common_meta::key::schema_name::SchemaNameKey;
45use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
46use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
47use common_meta::kv_backend::KvBackendRef;
48use common_meta::procedure_executor::ProcedureExecutorRef;
49use common_query::Output;
50use common_telemetry::{debug, tracing, warn};
51use common_time::Timestamp;
52use common_time::range::TimestampRange;
53use datafusion_expr::LogicalPlan;
54use datatypes::prelude::ConcreteDataType;
55use datatypes::schema::ColumnSchema;
56use humantime::format_duration;
57use itertools::Itertools;
58use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
59use query::QueryEngineRef;
60use query::parser::QueryStatement;
61use session::context::{Channel, QueryContextBuilder, QueryContextRef};
62use session::table_name::table_idents_to_full_name;
63use set::{set_query_timeout, set_read_preference};
64use snafu::{OptionExt, ResultExt, ensure};
65use sql::ast::ObjectNamePartExt;
66use sql::statements::OptionMap;
67use sql::statements::copy::{
68    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
69};
70use sql::statements::set_variables::SetVariables;
71use sql::statements::show::ShowCreateTableVariant;
72use sql::statements::statement::Statement;
73use sql::util::format_raw_object_name;
74use sqlparser::ast::ObjectName;
75use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
76use table::TableRef;
77use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
78use table::table_name::TableName;
79use table::table_reference::TableReference;
80
81use self::set::{
82    set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
83};
84use crate::error::{
85    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
86    PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
87    TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
88};
89use crate::insert::InserterRef;
90use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
91use crate::statement::set::set_allow_query_fallback;
92
93/// A configurator that customizes or enhances a [`StatementExecutor`].
94#[async_trait::async_trait]
95pub trait StatementExecutorConfigurator: Send + Sync {
96    async fn configure(
97        &self,
98        executor: StatementExecutor,
99        ctx: ExecutorConfigureContext,
100    ) -> std::result::Result<StatementExecutor, BoxedError>;
101}
102
103pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
104
105pub struct ExecutorConfigureContext {
106    pub kv_backend: KvBackendRef,
107}
108
109#[derive(Clone)]
110pub struct StatementExecutor {
111    catalog_manager: CatalogManagerRef,
112    query_engine: QueryEngineRef,
113    procedure_executor: ProcedureExecutorRef,
114    table_metadata_manager: TableMetadataManagerRef,
115    flow_metadata_manager: FlowMetadataManagerRef,
116    view_info_manager: ViewInfoManagerRef,
117    partition_manager: PartitionRuleManagerRef,
118    cache_invalidator: CacheInvalidatorRef,
119    inserter: InserterRef,
120    process_manager: Option<ProcessManagerRef>,
121    #[cfg(feature = "enterprise")]
122    trigger_querier: Option<TriggerQuerierRef>,
123}
124
125pub type StatementExecutorRef = Arc<StatementExecutor>;
126
127/// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc.
128#[cfg(feature = "enterprise")]
129#[async_trait::async_trait]
130pub trait TriggerQuerier: Send + Sync {
131    // Query the `SHOW CREATE TRIGGER` statement for the given trigger.
132    async fn show_create_trigger(
133        &self,
134        catalog: &str,
135        trigger: &str,
136        query_ctx: &QueryContextRef,
137    ) -> std::result::Result<Output, BoxedError>;
138
139    fn as_any(&self) -> &dyn std::any::Any;
140}
141
142#[cfg(feature = "enterprise")]
143pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
144
145impl StatementExecutor {
146    #[allow(clippy::too_many_arguments)]
147    pub fn new(
148        catalog_manager: CatalogManagerRef,
149        query_engine: QueryEngineRef,
150        procedure_executor: ProcedureExecutorRef,
151        kv_backend: KvBackendRef,
152        cache_invalidator: CacheInvalidatorRef,
153        inserter: InserterRef,
154        table_route_cache: TableRouteCacheRef,
155        process_manager: Option<ProcessManagerRef>,
156    ) -> Self {
157        Self {
158            catalog_manager,
159            query_engine,
160            procedure_executor,
161            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
162            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
163            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
164            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
165            cache_invalidator,
166            inserter,
167            process_manager,
168            #[cfg(feature = "enterprise")]
169            trigger_querier: None,
170        }
171    }
172
173    #[cfg(feature = "enterprise")]
174    pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
175        self.trigger_querier = Some(querier);
176        self
177    }
178
179    #[cfg(feature = "testing")]
180    pub async fn execute_stmt(
181        &self,
182        stmt: QueryStatement,
183        query_ctx: QueryContextRef,
184    ) -> Result<Output> {
185        match stmt {
186            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
187            QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
188        }
189    }
190
191    #[tracing::instrument(skip_all)]
192    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
193        match stmt {
194            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
195                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
196            }
197
198            Statement::DeclareCursor(declare_cursor) => {
199                self.declare_cursor(declare_cursor, query_ctx).await
200            }
201            Statement::FetchCursor(fetch_cursor) => {
202                self.fetch_cursor(fetch_cursor, query_ctx).await
203            }
204            Statement::CloseCursor(close_cursor) => {
205                self.close_cursor(close_cursor, query_ctx).await
206            }
207
208            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
209
210            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
211
212            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
213
214            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
215
216            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
217
218            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
219
220            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
221
222            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
223
224            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
225
226            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
227
228            #[cfg(feature = "enterprise")]
229            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
230
231            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
232                let query_output = self
233                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
234                    .await?;
235                let req = to_copy_query_request(stmt.arg)?;
236
237                self.copy_query_to(req, query_output)
238                    .await
239                    .map(Output::new_with_affected_rows)
240            }
241
242            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
243                let req = to_copy_table_request(stmt, query_ctx.clone())?;
244                match req.direction {
245                    CopyDirection::Export => self
246                        .copy_table_to(req, query_ctx)
247                        .await
248                        .map(Output::new_with_affected_rows),
249                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
250                }
251            }
252
253            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
254                match copy_database {
255                    CopyDatabase::To(arg) => {
256                        self.copy_database_to(
257                            to_copy_database_request(arg, &query_ctx)?,
258                            query_ctx.clone(),
259                        )
260                        .await
261                    }
262                    CopyDatabase::From(arg) => {
263                        self.copy_database_from(
264                            to_copy_database_request(arg, &query_ctx)?,
265                            query_ctx,
266                        )
267                        .await
268                    }
269                }
270            }
271
272            Statement::CreateTable(stmt) => {
273                let _ = self.create_table(stmt, query_ctx).await?;
274                Ok(Output::new_with_affected_rows(0))
275            }
276            Statement::CreateTableLike(stmt) => {
277                let _ = self.create_table_like(stmt, query_ctx).await?;
278                Ok(Output::new_with_affected_rows(0))
279            }
280            Statement::CreateExternalTable(stmt) => {
281                let _ = self.create_external_table(stmt, query_ctx).await?;
282                Ok(Output::new_with_affected_rows(0))
283            }
284            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
285            #[cfg(feature = "enterprise")]
286            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
287            Statement::DropFlow(stmt) => {
288                self.drop_flow(
289                    query_ctx.current_catalog().to_string(),
290                    format_raw_object_name(stmt.flow_name()),
291                    stmt.drop_if_exists(),
292                    query_ctx,
293                )
294                .await
295            }
296            #[cfg(feature = "enterprise")]
297            Statement::DropTrigger(stmt) => {
298                self.drop_trigger(
299                    query_ctx.current_catalog().to_string(),
300                    format_raw_object_name(stmt.trigger_name()),
301                    stmt.drop_if_exists(),
302                    query_ctx,
303                )
304                .await
305            }
306            Statement::CreateView(stmt) => {
307                let _ = self.create_view(stmt, query_ctx).await?;
308                Ok(Output::new_with_affected_rows(0))
309            }
310            Statement::DropView(stmt) => {
311                let (catalog_name, schema_name, view_name) =
312                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
313                        .map_err(BoxedError::new)
314                        .context(ExternalSnafu)?;
315
316                self.drop_view(
317                    catalog_name,
318                    schema_name,
319                    view_name,
320                    stmt.drop_if_exists,
321                    query_ctx,
322                )
323                .await
324            }
325            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
326
327            Statement::AlterDatabase(alter_database) => {
328                self.alter_database(alter_database, query_ctx).await
329            }
330
331            #[cfg(feature = "enterprise")]
332            Statement::AlterTrigger(alter_trigger) => {
333                self.alter_trigger(alter_trigger, query_ctx).await
334            }
335
336            Statement::DropTable(stmt) => {
337                let mut table_names = Vec::with_capacity(stmt.table_names().len());
338                for table_name_stmt in stmt.table_names() {
339                    let (catalog, schema, table) =
340                        table_idents_to_full_name(table_name_stmt, &query_ctx)
341                            .map_err(BoxedError::new)
342                            .context(ExternalSnafu)?;
343                    table_names.push(TableName::new(catalog, schema, table));
344                }
345                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
346                    .await
347            }
348            Statement::DropDatabase(stmt) => {
349                self.drop_database(
350                    query_ctx.current_catalog().to_string(),
351                    format_raw_object_name(stmt.name()),
352                    stmt.drop_if_exists(),
353                    query_ctx,
354                )
355                .await
356            }
357            Statement::TruncateTable(stmt) => {
358                let (catalog, schema, table) =
359                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
360                        .map_err(BoxedError::new)
361                        .context(ExternalSnafu)?;
362                let table_name = TableName::new(catalog, schema, table);
363                let time_ranges = self
364                    .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
365                    .await?;
366                self.truncate_table(table_name, time_ranges, query_ctx)
367                    .await
368            }
369            Statement::CreateDatabase(stmt) => {
370                self.create_database(
371                    &format_raw_object_name(&stmt.name),
372                    stmt.if_not_exists,
373                    stmt.options.into_map(),
374                    query_ctx,
375                )
376                .await
377            }
378            Statement::ShowCreateDatabase(show) => {
379                let (catalog, database) =
380                    idents_to_full_database_name(&show.database_name, &query_ctx)
381                        .map_err(BoxedError::new)
382                        .context(ExternalSnafu)?;
383                let table_metadata_manager = self
384                    .catalog_manager
385                    .as_any()
386                    .downcast_ref::<KvBackendCatalogManager>()
387                    .map(|manager| manager.table_metadata_manager_ref().clone())
388                    .context(UpgradeCatalogManagerRefSnafu)?;
389                let opts: HashMap<String, String> = table_metadata_manager
390                    .schema_manager()
391                    .get(SchemaNameKey::new(&catalog, &database))
392                    .await
393                    .context(TableMetadataManagerSnafu)?
394                    .context(SchemaNotFoundSnafu {
395                        schema_info: &database,
396                    })?
397                    .into_inner()
398                    .into();
399
400                self.show_create_database(&database, opts.into()).await
401            }
402            Statement::ShowCreateTable(show) => {
403                let (catalog, schema, table) =
404                    table_idents_to_full_name(&show.table_name, &query_ctx)
405                        .map_err(BoxedError::new)
406                        .context(ExternalSnafu)?;
407
408                let table_ref = self
409                    .catalog_manager
410                    .table(&catalog, &schema, &table, Some(&query_ctx))
411                    .await
412                    .context(CatalogSnafu)?
413                    .context(TableNotFoundSnafu { table_name: &table })?;
414                let table_name = TableName::new(catalog, schema, table);
415
416                match show.variant {
417                    ShowCreateTableVariant::Original => {
418                        self.show_create_table(table_name, table_ref, query_ctx)
419                            .await
420                    }
421                    ShowCreateTableVariant::PostgresForeignTable => {
422                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
423                            .await
424                    }
425                }
426            }
427            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
428            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
429            #[cfg(feature = "enterprise")]
430            Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
431            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
432            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
433            Statement::Comment(stmt) => self.comment(stmt, query_ctx).await,
434            Statement::ShowColumns(show_columns) => {
435                self.show_columns(show_columns, query_ctx).await
436            }
437            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
438            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
439            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
440            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
441            Statement::Use(db) => self.use_database(db, query_ctx).await,
442            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
443            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
444            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
445        }
446    }
447
448    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
449        let catalog = query_ctx.current_catalog();
450        ensure!(
451            self.catalog_manager
452                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
453                .await
454                .context(CatalogSnafu)?,
455            SchemaNotFoundSnafu { schema_info: &db }
456        );
457
458        query_ctx.set_current_schema(&db);
459
460        Ok(Output::new_with_record_batches(RecordBatches::empty()))
461    }
462
463    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
464        let var_name = set_var.variable.to_string().to_uppercase();
465
466        debug!(
467            "Trying to set {}={} for session: {} ",
468            var_name,
469            set_var.value.iter().map(|e| e.to_string()).join(", "),
470            query_ctx.conn_info()
471        );
472
473        match var_name.as_str() {
474            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
475
476            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
477                set_timezone(set_var.value, query_ctx)?
478            }
479
480            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
481
482            // Same as "bytea_output", we just ignore it here.
483            // Not harmful since it only relates to how date is viewed in client app's output.
484            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
485            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
486
487            // Allow query to fallback when failed to push down.
488            "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
489
490            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
491            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
492                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
493                Channel::Postgres => {
494                    warn!(
495                        "Unsupported set variable {} for channel {:?}",
496                        var_name,
497                        query_ctx.channel()
498                    );
499                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
500                }
501                _ => {
502                    return NotSupportedSnafu {
503                        feat: format!("Unsupported set variable {}", var_name),
504                    }
505                    .fail();
506                }
507            },
508            "STATEMENT_TIMEOUT" => match query_ctx.channel() {
509                Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
510                Channel::Mysql => {
511                    warn!(
512                        "Unsupported set variable {} for channel {:?}",
513                        var_name,
514                        query_ctx.channel()
515                    );
516                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
517                }
518                _ => {
519                    return NotSupportedSnafu {
520                        feat: format!("Unsupported set variable {}", var_name),
521                    }
522                    .fail();
523                }
524            },
525            "SEARCH_PATH" => {
526                if query_ctx.channel() == Channel::Postgres {
527                    set_search_path(set_var.value, query_ctx)?
528                } else {
529                    return NotSupportedSnafu {
530                        feat: format!("Unsupported set variable {}", var_name),
531                    }
532                    .fail();
533                }
534            }
535            _ => {
536                if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
537                {
538                    // For unknown SET statements, we give a warning with success.
539                    // This prevents the SET call from becoming a blocker of MySQL/Postgres clients'
540                    // connection establishment.
541                    warn!(
542                        "Unsupported set variable {} for channel {:?}",
543                        var_name,
544                        query_ctx.channel()
545                    );
546                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
547                } else {
548                    return NotSupportedSnafu {
549                        feat: format!("Unsupported set variable {}", var_name),
550                    }
551                    .fail();
552                }
553            }
554        }
555        Ok(Output::new_with_affected_rows(0))
556    }
557
558    #[tracing::instrument(skip_all)]
559    pub async fn plan(
560        &self,
561        stmt: &QueryStatement,
562        query_ctx: QueryContextRef,
563    ) -> Result<LogicalPlan> {
564        self.query_engine
565            .planner()
566            .plan(stmt, query_ctx)
567            .await
568            .context(PlanStatementSnafu)
569    }
570
571    /// Execute [`LogicalPlan`] directly.
572    #[tracing::instrument(skip_all)]
573    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
574        self.query_engine
575            .execute(plan, query_ctx)
576            .await
577            .context(ExecLogicalPlanSnafu)
578    }
579
580    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
581        self.query_engine
582            .planner()
583            .optimize(plan)
584            .context(PlanStatementSnafu)
585    }
586
587    #[tracing::instrument(skip_all)]
588    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
589        let plan = self.plan(&stmt, query_ctx.clone()).await?;
590        self.exec_plan(plan, query_ctx).await
591    }
592
593    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
594        let TableReference {
595            catalog,
596            schema,
597            table,
598        } = table_ref;
599        self.catalog_manager
600            .table(catalog, schema, table, None)
601            .await
602            .context(CatalogSnafu)?
603            .with_context(|| TableNotFoundSnafu {
604                table_name: table_ref.to_string(),
605            })
606    }
607
608    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
609        &self.procedure_executor
610    }
611
612    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
613        &self.cache_invalidator
614    }
615
616    /// Convert truncate time ranges for the given table from sql values to timestamps
617    ///
618    pub async fn convert_truncate_time_ranges(
619        &self,
620        table_name: &TableName,
621        sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
622        query_ctx: &QueryContextRef,
623    ) -> Result<Vec<(Timestamp, Timestamp)>> {
624        if sql_values_time_range.is_empty() {
625            return Ok(vec![]);
626        }
627        let table = self.get_table(&table_name.table_ref()).await?;
628        let info = table.table_info();
629        let time_index_dt = info
630            .meta
631            .schema
632            .timestamp_column()
633            .context(UnexpectedSnafu {
634                violated: "Table must have a timestamp column",
635            })?;
636
637        let time_unit = time_index_dt
638            .data_type
639            .as_timestamp()
640            .with_context(|| UnexpectedSnafu {
641                violated: format!(
642                    "Table {}'s time index column must be a timestamp type, found: {:?}",
643                    table_name, time_index_dt
644                ),
645            })?
646            .unit();
647
648        let start_column = ColumnSchema::new(
649            "range_start",
650            ConcreteDataType::timestamp_datatype(time_unit),
651            false,
652        );
653        let end_column = ColumnSchema::new(
654            "range_end",
655            ConcreteDataType::timestamp_datatype(time_unit),
656            false,
657        );
658        let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
659        for (start, end) in sql_values_time_range {
660            let start = common_sql::convert::sql_value_to_value(
661                &start_column,
662                start,
663                Some(&query_ctx.timezone()),
664                None,
665                false,
666            )
667            .context(SqlCommonSnafu)
668            .and_then(|v| {
669                if let datatypes::value::Value::Timestamp(t) = v {
670                    Ok(t)
671                } else {
672                    error::InvalidSqlSnafu {
673                        err_msg: format!("Expected a timestamp value, found {v:?}"),
674                    }
675                    .fail()
676                }
677            })?;
678
679            let end = common_sql::convert::sql_value_to_value(
680                &end_column,
681                end,
682                Some(&query_ctx.timezone()),
683                None,
684                false,
685            )
686            .context(SqlCommonSnafu)
687            .and_then(|v| {
688                if let datatypes::value::Value::Timestamp(t) = v {
689                    Ok(t)
690                } else {
691                    error::InvalidSqlSnafu {
692                        err_msg: format!("Expected a timestamp value, found {v:?}"),
693                    }
694                    .fail()
695                }
696            })?;
697            time_ranges.push((start, end));
698        }
699        Ok(time_ranges)
700    }
701
702    /// Returns the inserter for the statement executor.
703    pub(crate) fn inserter(&self) -> &InserterRef {
704        &self.inserter
705    }
706}
707
708fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
709    let CopyQueryToArgument {
710        with,
711        connection,
712        location,
713    } = stmt;
714
715    Ok(CopyQueryToRequest {
716        location,
717        with: with.into_map(),
718        connection: connection.into_map(),
719    })
720}
721
722// Verifies time related format is valid
723fn verify_time_related_format(with: &OptionMap) -> Result<()> {
724    let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
725    let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
726    let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
727    let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
728
729    if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
730        ensure!(
731            time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
732            error::TimestampFormatNotSupportedSnafu {
733                format: "<unknown>".to_string(),
734                file_format: file_format.unwrap_or_default(),
735            }
736        );
737    }
738
739    for (key, format_opt) in [
740        (common_datasource::file_format::TIME_FORMAT, time_format),
741        (common_datasource::file_format::DATE_FORMAT, date_format),
742        (
743            common_datasource::file_format::TIMESTAMP_FORMAT,
744            timestamp_format,
745        ),
746    ] {
747        if let Some(format) = format_opt {
748            chrono::format::strftime::StrftimeItems::new(format)
749                .parse()
750                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
751        }
752    }
753
754    Ok(())
755}
756
757fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
758    let direction = match stmt {
759        CopyTable::To(_) => CopyDirection::Export,
760        CopyTable::From(_) => CopyDirection::Import,
761    };
762
763    let CopyTableArgument {
764        location,
765        connection,
766        with,
767        table_name,
768        limit,
769        ..
770    } = match stmt {
771        CopyTable::To(arg) => arg,
772        CopyTable::From(arg) => arg,
773    };
774    let (catalog_name, schema_name, table_name) =
775        table_idents_to_full_name(&table_name, &query_ctx)
776            .map_err(BoxedError::new)
777            .context(ExternalSnafu)?;
778
779    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
780
781    verify_time_related_format(&with)?;
782
783    let pattern = with
784        .get(common_datasource::file_format::FILE_PATTERN)
785        .map(|x| x.to_string());
786
787    Ok(CopyTableRequest {
788        catalog_name,
789        schema_name,
790        table_name,
791        location,
792        with: with.into_map(),
793        connection: connection.into_map(),
794        pattern,
795        direction,
796        timestamp_range,
797        limit,
798    })
799}
800
801/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
802/// This function extracts the necessary info including catalog/database name, time range, etc.
803fn to_copy_database_request(
804    arg: CopyDatabaseArgument,
805    query_ctx: &QueryContextRef,
806) -> Result<CopyDatabaseRequest> {
807    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
808        .map_err(BoxedError::new)
809        .context(ExternalSnafu)?;
810    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
811
812    Ok(CopyDatabaseRequest {
813        catalog_name,
814        schema_name: database_name,
815        location: arg.location,
816        with: arg.with.into_map(),
817        connection: arg.connection.into_map(),
818        time_range,
819    })
820}
821
822/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
823/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
824/// The timezone used for conversion will respect that inside `query_ctx`.
825fn timestamp_range_from_option_map(
826    options: &OptionMap,
827    query_ctx: &QueryContextRef,
828) -> Result<Option<TimestampRange>> {
829    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
830    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
831    let time_range = match (start_timestamp, end_timestamp) {
832        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
833            error::InvalidTimestampRangeSnafu {
834                start: start.to_iso8601_string(),
835                end: end.to_iso8601_string(),
836            }
837        })?),
838        (Some(start), None) => Some(TimestampRange::from_start(start)),
839        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
840        (None, None) => None,
841    };
842    Ok(time_range)
843}
844
845/// Extracts timestamp from a [HashMap<String, String>] with given key.
846fn extract_timestamp(
847    map: &OptionMap,
848    key: &str,
849    query_ctx: &QueryContextRef,
850) -> Result<Option<Timestamp>> {
851    map.get(key)
852        .map(|v| {
853            Timestamp::from_str(v, Some(&query_ctx.timezone()))
854                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
855        })
856        .transpose()
857}
858
859fn idents_to_full_database_name(
860    obj_name: &ObjectName,
861    query_ctx: &QueryContextRef,
862) -> Result<(String, String)> {
863    match &obj_name.0[..] {
864        [database] => Ok((
865            query_ctx.current_catalog().to_owned(),
866            database.to_string_unquoted(),
867        )),
868        [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
869        _ => InvalidSqlSnafu {
870            err_msg: format!(
871                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
872            ),
873        }
874        .fail(),
875    }
876}
877
878/// The [`Inserter`] implementation for the statement executor.
879pub struct InserterImpl {
880    statement_executor: StatementExecutorRef,
881    options: Option<InsertOptions>,
882}
883
884impl InserterImpl {
885    pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
886        Self {
887            statement_executor,
888            options,
889        }
890    }
891}
892
893#[async_trait::async_trait]
894impl Inserter for InserterImpl {
895    async fn insert_rows(
896        &self,
897        context: &client::inserter::Context<'_>,
898        requests: RowInsertRequests,
899    ) -> ClientResult<()> {
900        let mut ctx_builder = QueryContextBuilder::default()
901            .current_catalog(context.catalog.to_string())
902            .current_schema(context.schema.to_string());
903        if let Some(options) = self.options.as_ref() {
904            ctx_builder = ctx_builder
905                .set_extension(
906                    TTL_KEY.to_string(),
907                    format_duration(options.ttl).to_string(),
908                )
909                .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
910        }
911        let query_ctx = ctx_builder.build().into();
912
913        self.statement_executor
914            .inserter()
915            .handle_row_inserts(
916                requests,
917                query_ctx,
918                self.statement_executor.as_ref(),
919                false,
920                false,
921            )
922            .await
923            .map_err(BoxedError::new)
924            .context(ClientExternalSnafu)
925            .map(|_| ())
926    }
927
928    fn set_options(&mut self, options: &InsertOptions) {
929        self.options = Some(*options);
930    }
931}
932
933#[cfg(test)]
934mod tests {
935    use std::assert_matches::assert_matches;
936    use std::collections::HashMap;
937
938    use common_time::range::TimestampRange;
939    use common_time::{Timestamp, Timezone};
940    use session::context::QueryContextBuilder;
941    use sql::statements::OptionMap;
942
943    use crate::error;
944    use crate::statement::copy_database::{
945        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
946    };
947    use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
948
949    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
950        let query_ctx = QueryContextBuilder::default()
951            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
952            .build()
953            .into();
954        let map = OptionMap::from(
955            [
956                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
957                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
958            ]
959            .into_iter()
960            .collect::<HashMap<_, _>>(),
961        );
962        timestamp_range_from_option_map(&map, &query_ctx)
963    }
964
965    #[test]
966    fn test_timestamp_range_from_option_map() {
967        assert_eq!(
968            Some(
969                TimestampRange::new(
970                    Timestamp::new_second(1649635200),
971                    Timestamp::new_second(1649664000),
972                )
973                .unwrap(),
974            ),
975            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
976        );
977
978        assert_matches!(
979            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
980            error::Error::InvalidTimestampRange { .. }
981        );
982    }
983
984    #[test]
985    fn test_verify_timestamp_format() {
986        let map = OptionMap::from(
987            [
988                (
989                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
990                    "%Y-%m-%d %H:%M:%S".to_string(),
991                ),
992                (
993                    common_datasource::file_format::FORMAT_TYPE.to_string(),
994                    "csv".to_string(),
995                ),
996            ]
997            .into_iter()
998            .collect::<HashMap<_, _>>(),
999        );
1000        assert!(verify_time_related_format(&map).is_ok());
1001
1002        let map = OptionMap::from(
1003            [
1004                (
1005                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1006                    "%Y-%m-%d %H:%M:%S".to_string(),
1007                ),
1008                (
1009                    common_datasource::file_format::FORMAT_TYPE.to_string(),
1010                    "json".to_string(),
1011                ),
1012            ]
1013            .into_iter()
1014            .collect::<HashMap<_, _>>(),
1015        );
1016
1017        assert_matches!(
1018            verify_time_related_format(&map).unwrap_err(),
1019            error::Error::TimestampFormatNotSupported { .. }
1020        );
1021        let map = OptionMap::from(
1022            [
1023                (
1024                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1025                    "%111112".to_string(),
1026                ),
1027                (
1028                    common_datasource::file_format::FORMAT_TYPE.to_string(),
1029                    "csv".to_string(),
1030                ),
1031            ]
1032            .into_iter()
1033            .collect::<HashMap<_, _>>(),
1034        );
1035
1036        assert_matches!(
1037            verify_time_related_format(&map).unwrap_err(),
1038            error::Error::InvalidCopyParameter { .. }
1039        );
1040    }
1041}