operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::v1::RowInsertRequests;
33use catalog::CatalogManagerRef;
34use catalog::kvbackend::KvBackendCatalogManager;
35use catalog::process_manager::ProcessManagerRef;
36use client::RecordBatches;
37use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
38use client::inserter::{InsertOptions, Inserter};
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
43use common_meta::key::schema_name::SchemaNameKey;
44use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
45use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
46use common_meta::kv_backend::KvBackendRef;
47use common_meta::procedure_executor::ProcedureExecutorRef;
48use common_query::Output;
49use common_telemetry::tracing;
50use common_time::Timestamp;
51use common_time::range::TimestampRange;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use humantime::format_duration;
55use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
56use query::QueryEngineRef;
57use query::parser::QueryStatement;
58use session::context::{Channel, QueryContextBuilder, QueryContextRef};
59use session::table_name::table_idents_to_full_name;
60use set::{set_query_timeout, set_read_preference};
61use snafu::{OptionExt, ResultExt, ensure};
62use sql::ast::ObjectNamePartExt;
63use sql::statements::OptionMap;
64use sql::statements::copy::{
65    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
66};
67use sql::statements::set_variables::SetVariables;
68use sql::statements::show::ShowCreateTableVariant;
69use sql::statements::statement::Statement;
70use sql::util::format_raw_object_name;
71use sqlparser::ast::ObjectName;
72use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
73use table::TableRef;
74use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
75use table::table_name::TableName;
76use table::table_reference::TableReference;
77
78use self::set::{
79    set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
80};
81use crate::error::{
82    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
83    PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
84    TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
85};
86use crate::insert::InserterRef;
87use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
88use crate::statement::set::set_allow_query_fallback;
89
90#[derive(Clone)]
91pub struct StatementExecutor {
92    catalog_manager: CatalogManagerRef,
93    query_engine: QueryEngineRef,
94    procedure_executor: ProcedureExecutorRef,
95    table_metadata_manager: TableMetadataManagerRef,
96    flow_metadata_manager: FlowMetadataManagerRef,
97    view_info_manager: ViewInfoManagerRef,
98    partition_manager: PartitionRuleManagerRef,
99    cache_invalidator: CacheInvalidatorRef,
100    inserter: InserterRef,
101    process_manager: Option<ProcessManagerRef>,
102    #[cfg(feature = "enterprise")]
103    trigger_querier: Option<TriggerQuerierRef>,
104}
105
106pub type StatementExecutorRef = Arc<StatementExecutor>;
107
108/// Trait for creating [`TriggerQuerier`] instance.
109#[cfg(feature = "enterprise")]
110pub trait TriggerQuerierFactory: Send + Sync {
111    fn create(&self, kv_backend: KvBackendRef) -> TriggerQuerierRef;
112}
113
114#[cfg(feature = "enterprise")]
115pub type TriggerQuerierFactoryRef = Arc<dyn TriggerQuerierFactory>;
116
117/// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc.
118#[cfg(feature = "enterprise")]
119#[async_trait::async_trait]
120pub trait TriggerQuerier: Send + Sync {
121    // Query the `SHOW CREATE TRIGGER` statement for the given trigger.
122    async fn show_create_trigger(
123        &self,
124        catalog: &str,
125        trigger: &str,
126        query_ctx: &QueryContextRef,
127    ) -> std::result::Result<Output, BoxedError>;
128
129    fn as_any(&self) -> &dyn std::any::Any;
130}
131
132#[cfg(feature = "enterprise")]
133pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
134
135impl StatementExecutor {
136    #[allow(clippy::too_many_arguments)]
137    pub fn new(
138        catalog_manager: CatalogManagerRef,
139        query_engine: QueryEngineRef,
140        procedure_executor: ProcedureExecutorRef,
141        kv_backend: KvBackendRef,
142        cache_invalidator: CacheInvalidatorRef,
143        inserter: InserterRef,
144        table_route_cache: TableRouteCacheRef,
145        process_manager: Option<ProcessManagerRef>,
146    ) -> Self {
147        Self {
148            catalog_manager,
149            query_engine,
150            procedure_executor,
151            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
152            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
153            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
154            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
155            cache_invalidator,
156            inserter,
157            process_manager,
158            #[cfg(feature = "enterprise")]
159            trigger_querier: None,
160        }
161    }
162
163    #[cfg(feature = "enterprise")]
164    pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
165        self.trigger_querier = Some(querier);
166        self
167    }
168
169    #[cfg(feature = "testing")]
170    pub async fn execute_stmt(
171        &self,
172        stmt: QueryStatement,
173        query_ctx: QueryContextRef,
174    ) -> Result<Output> {
175        match stmt {
176            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
177            QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
178        }
179    }
180
181    #[tracing::instrument(skip_all)]
182    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
183        match stmt {
184            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
185                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
186            }
187
188            Statement::DeclareCursor(declare_cursor) => {
189                self.declare_cursor(declare_cursor, query_ctx).await
190            }
191            Statement::FetchCursor(fetch_cursor) => {
192                self.fetch_cursor(fetch_cursor, query_ctx).await
193            }
194            Statement::CloseCursor(close_cursor) => {
195                self.close_cursor(close_cursor, query_ctx).await
196            }
197
198            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
199
200            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
201
202            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
203
204            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
205
206            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
207
208            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
209
210            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
211
212            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
213
214            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
215
216            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
217
218            #[cfg(feature = "enterprise")]
219            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
220
221            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
222                let query_output = self
223                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
224                    .await?;
225                let req = to_copy_query_request(stmt.arg)?;
226
227                self.copy_query_to(req, query_output)
228                    .await
229                    .map(Output::new_with_affected_rows)
230            }
231
232            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
233                let req = to_copy_table_request(stmt, query_ctx.clone())?;
234                match req.direction {
235                    CopyDirection::Export => self
236                        .copy_table_to(req, query_ctx)
237                        .await
238                        .map(Output::new_with_affected_rows),
239                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
240                }
241            }
242
243            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
244                match copy_database {
245                    CopyDatabase::To(arg) => {
246                        self.copy_database_to(
247                            to_copy_database_request(arg, &query_ctx)?,
248                            query_ctx.clone(),
249                        )
250                        .await
251                    }
252                    CopyDatabase::From(arg) => {
253                        self.copy_database_from(
254                            to_copy_database_request(arg, &query_ctx)?,
255                            query_ctx,
256                        )
257                        .await
258                    }
259                }
260            }
261
262            Statement::CreateTable(stmt) => {
263                let _ = self.create_table(stmt, query_ctx).await?;
264                Ok(Output::new_with_affected_rows(0))
265            }
266            Statement::CreateTableLike(stmt) => {
267                let _ = self.create_table_like(stmt, query_ctx).await?;
268                Ok(Output::new_with_affected_rows(0))
269            }
270            Statement::CreateExternalTable(stmt) => {
271                let _ = self.create_external_table(stmt, query_ctx).await?;
272                Ok(Output::new_with_affected_rows(0))
273            }
274            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
275            #[cfg(feature = "enterprise")]
276            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
277            Statement::DropFlow(stmt) => {
278                self.drop_flow(
279                    query_ctx.current_catalog().to_string(),
280                    format_raw_object_name(stmt.flow_name()),
281                    stmt.drop_if_exists(),
282                    query_ctx,
283                )
284                .await
285            }
286            #[cfg(feature = "enterprise")]
287            Statement::DropTrigger(stmt) => {
288                self.drop_trigger(
289                    query_ctx.current_catalog().to_string(),
290                    format_raw_object_name(stmt.trigger_name()),
291                    stmt.drop_if_exists(),
292                    query_ctx,
293                )
294                .await
295            }
296            Statement::CreateView(stmt) => {
297                let _ = self.create_view(stmt, query_ctx).await?;
298                Ok(Output::new_with_affected_rows(0))
299            }
300            Statement::DropView(stmt) => {
301                let (catalog_name, schema_name, view_name) =
302                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
303                        .map_err(BoxedError::new)
304                        .context(ExternalSnafu)?;
305
306                self.drop_view(
307                    catalog_name,
308                    schema_name,
309                    view_name,
310                    stmt.drop_if_exists,
311                    query_ctx,
312                )
313                .await
314            }
315            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
316
317            Statement::AlterDatabase(alter_database) => {
318                self.alter_database(alter_database, query_ctx).await
319            }
320
321            #[cfg(feature = "enterprise")]
322            Statement::AlterTrigger(alter_trigger) => {
323                self.alter_trigger(alter_trigger, query_ctx).await
324            }
325
326            Statement::DropTable(stmt) => {
327                let mut table_names = Vec::with_capacity(stmt.table_names().len());
328                for table_name_stmt in stmt.table_names() {
329                    let (catalog, schema, table) =
330                        table_idents_to_full_name(table_name_stmt, &query_ctx)
331                            .map_err(BoxedError::new)
332                            .context(ExternalSnafu)?;
333                    table_names.push(TableName::new(catalog, schema, table));
334                }
335                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
336                    .await
337            }
338            Statement::DropDatabase(stmt) => {
339                self.drop_database(
340                    query_ctx.current_catalog().to_string(),
341                    format_raw_object_name(stmt.name()),
342                    stmt.drop_if_exists(),
343                    query_ctx,
344                )
345                .await
346            }
347            Statement::TruncateTable(stmt) => {
348                let (catalog, schema, table) =
349                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
350                        .map_err(BoxedError::new)
351                        .context(ExternalSnafu)?;
352                let table_name = TableName::new(catalog, schema, table);
353                let time_ranges = self
354                    .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
355                    .await?;
356                self.truncate_table(table_name, time_ranges, query_ctx)
357                    .await
358            }
359            Statement::CreateDatabase(stmt) => {
360                self.create_database(
361                    &format_raw_object_name(&stmt.name),
362                    stmt.if_not_exists,
363                    stmt.options.into_map(),
364                    query_ctx,
365                )
366                .await
367            }
368            Statement::ShowCreateDatabase(show) => {
369                let (catalog, database) =
370                    idents_to_full_database_name(&show.database_name, &query_ctx)
371                        .map_err(BoxedError::new)
372                        .context(ExternalSnafu)?;
373                let table_metadata_manager = self
374                    .catalog_manager
375                    .as_any()
376                    .downcast_ref::<KvBackendCatalogManager>()
377                    .map(|manager| manager.table_metadata_manager_ref().clone())
378                    .context(UpgradeCatalogManagerRefSnafu)?;
379                let opts: HashMap<String, String> = table_metadata_manager
380                    .schema_manager()
381                    .get(SchemaNameKey::new(&catalog, &database))
382                    .await
383                    .context(TableMetadataManagerSnafu)?
384                    .context(SchemaNotFoundSnafu {
385                        schema_info: &database,
386                    })?
387                    .into_inner()
388                    .into();
389
390                self.show_create_database(&database, opts.into()).await
391            }
392            Statement::ShowCreateTable(show) => {
393                let (catalog, schema, table) =
394                    table_idents_to_full_name(&show.table_name, &query_ctx)
395                        .map_err(BoxedError::new)
396                        .context(ExternalSnafu)?;
397
398                let table_ref = self
399                    .catalog_manager
400                    .table(&catalog, &schema, &table, Some(&query_ctx))
401                    .await
402                    .context(CatalogSnafu)?
403                    .context(TableNotFoundSnafu { table_name: &table })?;
404                let table_name = TableName::new(catalog, schema, table);
405
406                match show.variant {
407                    ShowCreateTableVariant::Original => {
408                        self.show_create_table(table_name, table_ref, query_ctx)
409                            .await
410                    }
411                    ShowCreateTableVariant::PostgresForeignTable => {
412                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
413                            .await
414                    }
415                }
416            }
417            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
418            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
419            #[cfg(feature = "enterprise")]
420            Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
421            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
422            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
423            Statement::ShowColumns(show_columns) => {
424                self.show_columns(show_columns, query_ctx).await
425            }
426            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
427            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
428            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
429            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
430            Statement::Use(db) => self.use_database(db, query_ctx).await,
431            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
432            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
433            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
434        }
435    }
436
437    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
438        let catalog = query_ctx.current_catalog();
439        ensure!(
440            self.catalog_manager
441                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
442                .await
443                .context(CatalogSnafu)?,
444            SchemaNotFoundSnafu { schema_info: &db }
445        );
446
447        query_ctx.set_current_schema(&db);
448
449        Ok(Output::new_with_record_batches(RecordBatches::empty()))
450    }
451
452    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
453        let var_name = set_var.variable.to_string().to_uppercase();
454
455        match var_name.as_str() {
456            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
457
458            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
459                set_timezone(set_var.value, query_ctx)?
460            }
461
462            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
463
464            // Same as "bytea_output", we just ignore it here.
465            // Not harmful since it only relates to how date is viewed in client app's output.
466            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
467            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
468
469            // Allow query to fallback when failed to push down.
470            "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
471
472            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
473            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
474                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
475                Channel::Postgres => {
476                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
477                }
478                _ => {
479                    return NotSupportedSnafu {
480                        feat: format!("Unsupported set variable {}", var_name),
481                    }
482                    .fail();
483                }
484            },
485            "STATEMENT_TIMEOUT" => {
486                if query_ctx.channel() == Channel::Postgres {
487                    set_query_timeout(set_var.value, query_ctx)?
488                } else {
489                    return NotSupportedSnafu {
490                        feat: format!("Unsupported set variable {}", var_name),
491                    }
492                    .fail();
493                }
494            }
495            "SEARCH_PATH" => {
496                if query_ctx.channel() == Channel::Postgres {
497                    set_search_path(set_var.value, query_ctx)?
498                } else {
499                    return NotSupportedSnafu {
500                        feat: format!("Unsupported set variable {}", var_name),
501                    }
502                    .fail();
503                }
504            }
505            _ => {
506                // for postgres, we give unknown SET statements a warning with
507                //  success, this is prevent the SET call becoming a blocker
508                //  of connection establishment
509                //
510                if query_ctx.channel() == Channel::Postgres {
511                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
512                } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
513                    // Just ignore `SET @@` commands for MySQL
514                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
515                } else {
516                    return NotSupportedSnafu {
517                        feat: format!("Unsupported set variable {}", var_name),
518                    }
519                    .fail();
520                }
521            }
522        }
523        Ok(Output::new_with_affected_rows(0))
524    }
525
526    #[tracing::instrument(skip_all)]
527    pub async fn plan(
528        &self,
529        stmt: &QueryStatement,
530        query_ctx: QueryContextRef,
531    ) -> Result<LogicalPlan> {
532        self.query_engine
533            .planner()
534            .plan(stmt, query_ctx)
535            .await
536            .context(PlanStatementSnafu)
537    }
538
539    /// Execute [`LogicalPlan`] directly.
540    #[tracing::instrument(skip_all)]
541    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
542        self.query_engine
543            .execute(plan, query_ctx)
544            .await
545            .context(ExecLogicalPlanSnafu)
546    }
547
548    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
549        self.query_engine
550            .planner()
551            .optimize(plan)
552            .context(PlanStatementSnafu)
553    }
554
555    #[tracing::instrument(skip_all)]
556    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
557        let plan = self.plan(&stmt, query_ctx.clone()).await?;
558        self.exec_plan(plan, query_ctx).await
559    }
560
561    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
562        let TableReference {
563            catalog,
564            schema,
565            table,
566        } = table_ref;
567        self.catalog_manager
568            .table(catalog, schema, table, None)
569            .await
570            .context(CatalogSnafu)?
571            .with_context(|| TableNotFoundSnafu {
572                table_name: table_ref.to_string(),
573            })
574    }
575
576    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
577        &self.procedure_executor
578    }
579
580    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
581        &self.cache_invalidator
582    }
583
584    /// Convert truncate time ranges for the given table from sql values to timestamps
585    ///
586    pub async fn convert_truncate_time_ranges(
587        &self,
588        table_name: &TableName,
589        sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
590        query_ctx: &QueryContextRef,
591    ) -> Result<Vec<(Timestamp, Timestamp)>> {
592        if sql_values_time_range.is_empty() {
593            return Ok(vec![]);
594        }
595        let table = self.get_table(&table_name.table_ref()).await?;
596        let info = table.table_info();
597        let time_index_dt = info
598            .meta
599            .schema
600            .timestamp_column()
601            .context(UnexpectedSnafu {
602                violated: "Table must have a timestamp column",
603            })?;
604
605        let time_unit = time_index_dt
606            .data_type
607            .as_timestamp()
608            .with_context(|| UnexpectedSnafu {
609                violated: format!(
610                    "Table {}'s time index column must be a timestamp type, found: {:?}",
611                    table_name, time_index_dt
612                ),
613            })?
614            .unit();
615
616        let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
617        for (start, end) in sql_values_time_range {
618            let start = common_sql::convert::sql_value_to_value(
619                "range_start",
620                &ConcreteDataType::timestamp_datatype(time_unit),
621                start,
622                Some(&query_ctx.timezone()),
623                None,
624                false,
625            )
626            .context(SqlCommonSnafu)
627            .and_then(|v| {
628                if let datatypes::value::Value::Timestamp(t) = v {
629                    Ok(t)
630                } else {
631                    error::InvalidSqlSnafu {
632                        err_msg: format!("Expected a timestamp value, found {v:?}"),
633                    }
634                    .fail()
635                }
636            })?;
637
638            let end = common_sql::convert::sql_value_to_value(
639                "range_end",
640                &ConcreteDataType::timestamp_datatype(time_unit),
641                end,
642                Some(&query_ctx.timezone()),
643                None,
644                false,
645            )
646            .context(SqlCommonSnafu)
647            .and_then(|v| {
648                if let datatypes::value::Value::Timestamp(t) = v {
649                    Ok(t)
650                } else {
651                    error::InvalidSqlSnafu {
652                        err_msg: format!("Expected a timestamp value, found {v:?}"),
653                    }
654                    .fail()
655                }
656            })?;
657            time_ranges.push((start, end));
658        }
659        Ok(time_ranges)
660    }
661
662    /// Returns the inserter for the statement executor.
663    pub(crate) fn inserter(&self) -> &InserterRef {
664        &self.inserter
665    }
666}
667
668fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
669    let CopyQueryToArgument {
670        with,
671        connection,
672        location,
673    } = stmt;
674
675    Ok(CopyQueryToRequest {
676        location,
677        with: with.into_map(),
678        connection: connection.into_map(),
679    })
680}
681
682// Verifies time related format is valid
683fn verify_time_related_format(with: &OptionMap) -> Result<()> {
684    let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
685    let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
686    let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
687    let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
688
689    if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
690        ensure!(
691            time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
692            error::TimestampFormatNotSupportedSnafu {
693                format: "<unknown>".to_string(),
694                file_format: file_format.cloned().unwrap_or_default(),
695            }
696        );
697    }
698
699    for (key, format_opt) in [
700        (common_datasource::file_format::TIME_FORMAT, time_format),
701        (common_datasource::file_format::DATE_FORMAT, date_format),
702        (
703            common_datasource::file_format::TIMESTAMP_FORMAT,
704            timestamp_format,
705        ),
706    ] {
707        if let Some(format) = format_opt {
708            chrono::format::strftime::StrftimeItems::new(format)
709                .parse()
710                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
711        }
712    }
713
714    Ok(())
715}
716
717fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
718    let direction = match stmt {
719        CopyTable::To(_) => CopyDirection::Export,
720        CopyTable::From(_) => CopyDirection::Import,
721    };
722
723    let CopyTableArgument {
724        location,
725        connection,
726        with,
727        table_name,
728        limit,
729        ..
730    } = match stmt {
731        CopyTable::To(arg) => arg,
732        CopyTable::From(arg) => arg,
733    };
734    let (catalog_name, schema_name, table_name) =
735        table_idents_to_full_name(&table_name, &query_ctx)
736            .map_err(BoxedError::new)
737            .context(ExternalSnafu)?;
738
739    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
740
741    verify_time_related_format(&with)?;
742
743    let pattern = with
744        .get(common_datasource::file_format::FILE_PATTERN)
745        .cloned();
746
747    Ok(CopyTableRequest {
748        catalog_name,
749        schema_name,
750        table_name,
751        location,
752        with: with.into_map(),
753        connection: connection.into_map(),
754        pattern,
755        direction,
756        timestamp_range,
757        limit,
758    })
759}
760
761/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
762/// This function extracts the necessary info including catalog/database name, time range, etc.
763fn to_copy_database_request(
764    arg: CopyDatabaseArgument,
765    query_ctx: &QueryContextRef,
766) -> Result<CopyDatabaseRequest> {
767    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
768        .map_err(BoxedError::new)
769        .context(ExternalSnafu)?;
770    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
771
772    Ok(CopyDatabaseRequest {
773        catalog_name,
774        schema_name: database_name,
775        location: arg.location,
776        with: arg.with.into_map(),
777        connection: arg.connection.into_map(),
778        time_range,
779    })
780}
781
782/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
783/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
784/// The timezone used for conversion will respect that inside `query_ctx`.
785fn timestamp_range_from_option_map(
786    options: &OptionMap,
787    query_ctx: &QueryContextRef,
788) -> Result<Option<TimestampRange>> {
789    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
790    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
791    let time_range = match (start_timestamp, end_timestamp) {
792        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
793            error::InvalidTimestampRangeSnafu {
794                start: start.to_iso8601_string(),
795                end: end.to_iso8601_string(),
796            }
797        })?),
798        (Some(start), None) => Some(TimestampRange::from_start(start)),
799        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
800        (None, None) => None,
801    };
802    Ok(time_range)
803}
804
805/// Extracts timestamp from a [HashMap<String, String>] with given key.
806fn extract_timestamp(
807    map: &OptionMap,
808    key: &str,
809    query_ctx: &QueryContextRef,
810) -> Result<Option<Timestamp>> {
811    map.get(key)
812        .map(|v| {
813            Timestamp::from_str(v, Some(&query_ctx.timezone()))
814                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
815        })
816        .transpose()
817}
818
819fn idents_to_full_database_name(
820    obj_name: &ObjectName,
821    query_ctx: &QueryContextRef,
822) -> Result<(String, String)> {
823    match &obj_name.0[..] {
824        [database] => Ok((
825            query_ctx.current_catalog().to_owned(),
826            database.to_string_unquoted(),
827        )),
828        [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
829        _ => InvalidSqlSnafu {
830            err_msg: format!(
831                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
832            ),
833        }
834        .fail(),
835    }
836}
837
838/// The [`Inserter`] implementation for the statement executor.
839pub struct InserterImpl {
840    statement_executor: StatementExecutorRef,
841    options: Option<InsertOptions>,
842}
843
844impl InserterImpl {
845    pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
846        Self {
847            statement_executor,
848            options,
849        }
850    }
851}
852
853#[async_trait::async_trait]
854impl Inserter for InserterImpl {
855    async fn insert_rows(
856        &self,
857        context: &client::inserter::Context<'_>,
858        requests: RowInsertRequests,
859    ) -> ClientResult<()> {
860        let mut ctx_builder = QueryContextBuilder::default()
861            .current_catalog(context.catalog.to_string())
862            .current_schema(context.schema.to_string());
863        if let Some(options) = self.options.as_ref() {
864            ctx_builder = ctx_builder
865                .set_extension(
866                    TTL_KEY.to_string(),
867                    format_duration(options.ttl).to_string(),
868                )
869                .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
870        }
871        let query_ctx = ctx_builder.build().into();
872
873        self.statement_executor
874            .inserter()
875            .handle_row_inserts(
876                requests,
877                query_ctx,
878                self.statement_executor.as_ref(),
879                false,
880                false,
881            )
882            .await
883            .map_err(BoxedError::new)
884            .context(ClientExternalSnafu)
885            .map(|_| ())
886    }
887
888    fn set_options(&mut self, options: &InsertOptions) {
889        self.options = Some(*options);
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use std::assert_matches::assert_matches;
896    use std::collections::HashMap;
897
898    use common_time::range::TimestampRange;
899    use common_time::{Timestamp, Timezone};
900    use session::context::QueryContextBuilder;
901    use sql::statements::OptionMap;
902
903    use crate::error;
904    use crate::statement::copy_database::{
905        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
906    };
907    use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
908
909    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
910        let query_ctx = QueryContextBuilder::default()
911            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
912            .build()
913            .into();
914        let map = OptionMap::from(
915            [
916                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
917                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
918            ]
919            .into_iter()
920            .collect::<HashMap<_, _>>(),
921        );
922        timestamp_range_from_option_map(&map, &query_ctx)
923    }
924
925    #[test]
926    fn test_timestamp_range_from_option_map() {
927        assert_eq!(
928            Some(
929                TimestampRange::new(
930                    Timestamp::new_second(1649635200),
931                    Timestamp::new_second(1649664000),
932                )
933                .unwrap(),
934            ),
935            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
936        );
937
938        assert_matches!(
939            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
940            error::Error::InvalidTimestampRange { .. }
941        );
942    }
943
944    #[test]
945    fn test_verify_timestamp_format() {
946        let map = OptionMap::from(
947            [
948                (
949                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
950                    "%Y-%m-%d %H:%M:%S".to_string(),
951                ),
952                (
953                    common_datasource::file_format::FORMAT_TYPE.to_string(),
954                    "csv".to_string(),
955                ),
956            ]
957            .into_iter()
958            .collect::<HashMap<_, _>>(),
959        );
960        assert!(verify_time_related_format(&map).is_ok());
961
962        let map = OptionMap::from(
963            [
964                (
965                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
966                    "%Y-%m-%d %H:%M:%S".to_string(),
967                ),
968                (
969                    common_datasource::file_format::FORMAT_TYPE.to_string(),
970                    "json".to_string(),
971                ),
972            ]
973            .into_iter()
974            .collect::<HashMap<_, _>>(),
975        );
976
977        assert_matches!(
978            verify_time_related_format(&map).unwrap_err(),
979            error::Error::TimestampFormatNotSupported { .. }
980        );
981        let map = OptionMap::from(
982            [
983                (
984                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
985                    "%111112".to_string(),
986                ),
987                (
988                    common_datasource::file_format::FORMAT_TYPE.to_string(),
989                    "csv".to_string(),
990                ),
991            ]
992            .into_iter()
993            .collect::<HashMap<_, _>>(),
994        );
995
996        assert_matches!(
997            verify_time_related_format(&map).unwrap_err(),
998            error::Error::InvalidCopyParameter { .. }
999        );
1000    }
1001}