operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::v1::RowInsertRequests;
33use catalog::kvbackend::KvBackendCatalogManager;
34use catalog::process_manager::ProcessManagerRef;
35use catalog::CatalogManagerRef;
36use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
37use client::inserter::{InsertOptions, Inserter};
38use client::RecordBatches;
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
43use common_meta::key::schema_name::SchemaNameKey;
44use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
45use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
46use common_meta::kv_backend::KvBackendRef;
47use common_meta::procedure_executor::ProcedureExecutorRef;
48use common_query::Output;
49use common_telemetry::tracing;
50use common_time::range::TimestampRange;
51use common_time::Timestamp;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use humantime::format_duration;
55use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
56use query::parser::QueryStatement;
57use query::QueryEngineRef;
58use session::context::{Channel, QueryContextBuilder, QueryContextRef};
59use session::table_name::table_idents_to_full_name;
60use set::{set_query_timeout, set_read_preference};
61use snafu::{ensure, OptionExt, ResultExt};
62use sql::ast::ObjectNamePartExt;
63use sql::statements::copy::{
64    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
65};
66use sql::statements::set_variables::SetVariables;
67use sql::statements::show::ShowCreateTableVariant;
68use sql::statements::statement::Statement;
69use sql::statements::OptionMap;
70use sql::util::format_raw_object_name;
71use sqlparser::ast::ObjectName;
72use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
73use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
74use table::table_name::TableName;
75use table::table_reference::TableReference;
76use table::TableRef;
77
78use self::set::{
79    set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
80};
81use crate::error::{
82    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
83    PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
84    TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
85};
86use crate::insert::InserterRef;
87use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
88use crate::statement::set::set_allow_query_fallback;
89
90#[derive(Clone)]
91pub struct StatementExecutor {
92    catalog_manager: CatalogManagerRef,
93    query_engine: QueryEngineRef,
94    procedure_executor: ProcedureExecutorRef,
95    table_metadata_manager: TableMetadataManagerRef,
96    flow_metadata_manager: FlowMetadataManagerRef,
97    view_info_manager: ViewInfoManagerRef,
98    partition_manager: PartitionRuleManagerRef,
99    cache_invalidator: CacheInvalidatorRef,
100    inserter: InserterRef,
101    process_manager: Option<ProcessManagerRef>,
102}
103
104pub type StatementExecutorRef = Arc<StatementExecutor>;
105
106impl StatementExecutor {
107    #[allow(clippy::too_many_arguments)]
108    pub fn new(
109        catalog_manager: CatalogManagerRef,
110        query_engine: QueryEngineRef,
111        procedure_executor: ProcedureExecutorRef,
112        kv_backend: KvBackendRef,
113        cache_invalidator: CacheInvalidatorRef,
114        inserter: InserterRef,
115        table_route_cache: TableRouteCacheRef,
116        process_manager: Option<ProcessManagerRef>,
117    ) -> Self {
118        Self {
119            catalog_manager,
120            query_engine,
121            procedure_executor,
122            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
123            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
124            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
125            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
126            cache_invalidator,
127            inserter,
128            process_manager,
129        }
130    }
131
132    #[cfg(feature = "testing")]
133    pub async fn execute_stmt(
134        &self,
135        stmt: QueryStatement,
136        query_ctx: QueryContextRef,
137    ) -> Result<Output> {
138        match stmt {
139            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
140            QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
141        }
142    }
143
144    #[tracing::instrument(skip_all)]
145    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
146        match stmt {
147            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
148                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
149            }
150
151            Statement::DeclareCursor(declare_cursor) => {
152                self.declare_cursor(declare_cursor, query_ctx).await
153            }
154            Statement::FetchCursor(fetch_cursor) => {
155                self.fetch_cursor(fetch_cursor, query_ctx).await
156            }
157            Statement::CloseCursor(close_cursor) => {
158                self.close_cursor(close_cursor, query_ctx).await
159            }
160
161            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
162
163            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
164
165            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
166
167            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
168
169            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
170
171            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
172
173            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
174
175            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
176
177            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
178
179            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
180
181            #[cfg(feature = "enterprise")]
182            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
183
184            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
185                let query_output = self
186                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
187                    .await?;
188                let req = to_copy_query_request(stmt.arg)?;
189
190                self.copy_query_to(req, query_output)
191                    .await
192                    .map(Output::new_with_affected_rows)
193            }
194
195            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
196                let req = to_copy_table_request(stmt, query_ctx.clone())?;
197                match req.direction {
198                    CopyDirection::Export => self
199                        .copy_table_to(req, query_ctx)
200                        .await
201                        .map(Output::new_with_affected_rows),
202                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
203                }
204            }
205
206            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
207                match copy_database {
208                    CopyDatabase::To(arg) => {
209                        self.copy_database_to(
210                            to_copy_database_request(arg, &query_ctx)?,
211                            query_ctx.clone(),
212                        )
213                        .await
214                    }
215                    CopyDatabase::From(arg) => {
216                        self.copy_database_from(
217                            to_copy_database_request(arg, &query_ctx)?,
218                            query_ctx,
219                        )
220                        .await
221                    }
222                }
223            }
224
225            Statement::CreateTable(stmt) => {
226                let _ = self.create_table(stmt, query_ctx).await?;
227                Ok(Output::new_with_affected_rows(0))
228            }
229            Statement::CreateTableLike(stmt) => {
230                let _ = self.create_table_like(stmt, query_ctx).await?;
231                Ok(Output::new_with_affected_rows(0))
232            }
233            Statement::CreateExternalTable(stmt) => {
234                let _ = self.create_external_table(stmt, query_ctx).await?;
235                Ok(Output::new_with_affected_rows(0))
236            }
237            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
238            #[cfg(feature = "enterprise")]
239            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
240            Statement::DropFlow(stmt) => {
241                self.drop_flow(
242                    query_ctx.current_catalog().to_string(),
243                    format_raw_object_name(stmt.flow_name()),
244                    stmt.drop_if_exists(),
245                    query_ctx,
246                )
247                .await
248            }
249            #[cfg(feature = "enterprise")]
250            Statement::DropTrigger(stmt) => {
251                self.drop_trigger(
252                    query_ctx.current_catalog().to_string(),
253                    format_raw_object_name(stmt.trigger_name()),
254                    stmt.drop_if_exists(),
255                    query_ctx,
256                )
257                .await
258            }
259            Statement::CreateView(stmt) => {
260                let _ = self.create_view(stmt, query_ctx).await?;
261                Ok(Output::new_with_affected_rows(0))
262            }
263            Statement::DropView(stmt) => {
264                let (catalog_name, schema_name, view_name) =
265                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
266                        .map_err(BoxedError::new)
267                        .context(ExternalSnafu)?;
268
269                self.drop_view(
270                    catalog_name,
271                    schema_name,
272                    view_name,
273                    stmt.drop_if_exists,
274                    query_ctx,
275                )
276                .await
277            }
278            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
279
280            Statement::AlterDatabase(alter_database) => {
281                self.alter_database(alter_database, query_ctx).await
282            }
283
284            #[cfg(feature = "enterprise")]
285            Statement::AlterTrigger(alter_trigger) => {
286                self.alter_trigger(alter_trigger, query_ctx).await
287            }
288
289            Statement::DropTable(stmt) => {
290                let mut table_names = Vec::with_capacity(stmt.table_names().len());
291                for table_name_stmt in stmt.table_names() {
292                    let (catalog, schema, table) =
293                        table_idents_to_full_name(table_name_stmt, &query_ctx)
294                            .map_err(BoxedError::new)
295                            .context(ExternalSnafu)?;
296                    table_names.push(TableName::new(catalog, schema, table));
297                }
298                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
299                    .await
300            }
301            Statement::DropDatabase(stmt) => {
302                self.drop_database(
303                    query_ctx.current_catalog().to_string(),
304                    format_raw_object_name(stmt.name()),
305                    stmt.drop_if_exists(),
306                    query_ctx,
307                )
308                .await
309            }
310            Statement::TruncateTable(stmt) => {
311                let (catalog, schema, table) =
312                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
313                        .map_err(BoxedError::new)
314                        .context(ExternalSnafu)?;
315                let table_name = TableName::new(catalog, schema, table);
316                let time_ranges = self
317                    .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
318                    .await?;
319                self.truncate_table(table_name, time_ranges, query_ctx)
320                    .await
321            }
322            Statement::CreateDatabase(stmt) => {
323                self.create_database(
324                    &format_raw_object_name(&stmt.name),
325                    stmt.if_not_exists,
326                    stmt.options.into_map(),
327                    query_ctx,
328                )
329                .await
330            }
331            Statement::ShowCreateDatabase(show) => {
332                let (catalog, database) =
333                    idents_to_full_database_name(&show.database_name, &query_ctx)
334                        .map_err(BoxedError::new)
335                        .context(ExternalSnafu)?;
336                let table_metadata_manager = self
337                    .catalog_manager
338                    .as_any()
339                    .downcast_ref::<KvBackendCatalogManager>()
340                    .map(|manager| manager.table_metadata_manager_ref().clone())
341                    .context(UpgradeCatalogManagerRefSnafu)?;
342                let opts: HashMap<String, String> = table_metadata_manager
343                    .schema_manager()
344                    .get(SchemaNameKey::new(&catalog, &database))
345                    .await
346                    .context(TableMetadataManagerSnafu)?
347                    .context(SchemaNotFoundSnafu {
348                        schema_info: &database,
349                    })?
350                    .into_inner()
351                    .into();
352
353                self.show_create_database(&database, opts.into()).await
354            }
355            Statement::ShowCreateTable(show) => {
356                let (catalog, schema, table) =
357                    table_idents_to_full_name(&show.table_name, &query_ctx)
358                        .map_err(BoxedError::new)
359                        .context(ExternalSnafu)?;
360
361                let table_ref = self
362                    .catalog_manager
363                    .table(&catalog, &schema, &table, Some(&query_ctx))
364                    .await
365                    .context(CatalogSnafu)?
366                    .context(TableNotFoundSnafu { table_name: &table })?;
367                let table_name = TableName::new(catalog, schema, table);
368
369                match show.variant {
370                    ShowCreateTableVariant::Original => {
371                        self.show_create_table(table_name, table_ref, query_ctx)
372                            .await
373                    }
374                    ShowCreateTableVariant::PostgresForeignTable => {
375                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
376                            .await
377                    }
378                }
379            }
380            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
381            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
382            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
383            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
384            Statement::ShowColumns(show_columns) => {
385                self.show_columns(show_columns, query_ctx).await
386            }
387            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
388            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
389            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
390            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
391            Statement::Use(db) => self.use_database(db, query_ctx).await,
392            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
393            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
394            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
395        }
396    }
397
398    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
399        let catalog = query_ctx.current_catalog();
400        ensure!(
401            self.catalog_manager
402                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
403                .await
404                .context(CatalogSnafu)?,
405            SchemaNotFoundSnafu { schema_info: &db }
406        );
407
408        query_ctx.set_current_schema(&db);
409
410        Ok(Output::new_with_record_batches(RecordBatches::empty()))
411    }
412
413    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
414        let var_name = set_var.variable.to_string().to_uppercase();
415
416        match var_name.as_str() {
417            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
418
419            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
420                set_timezone(set_var.value, query_ctx)?
421            }
422
423            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
424
425            // Same as "bytea_output", we just ignore it here.
426            // Not harmful since it only relates to how date is viewed in client app's output.
427            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
428            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
429
430            // Allow query to fallback when failed to push down.
431            "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
432
433            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
434            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
435                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
436                Channel::Postgres => {
437                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
438                }
439                _ => {
440                    return NotSupportedSnafu {
441                        feat: format!("Unsupported set variable {}", var_name),
442                    }
443                    .fail()
444                }
445            },
446            "STATEMENT_TIMEOUT" => {
447                if query_ctx.channel() == Channel::Postgres {
448                    set_query_timeout(set_var.value, query_ctx)?
449                } else {
450                    return NotSupportedSnafu {
451                        feat: format!("Unsupported set variable {}", var_name),
452                    }
453                    .fail();
454                }
455            }
456            "SEARCH_PATH" => {
457                if query_ctx.channel() == Channel::Postgres {
458                    set_search_path(set_var.value, query_ctx)?
459                } else {
460                    return NotSupportedSnafu {
461                        feat: format!("Unsupported set variable {}", var_name),
462                    }
463                    .fail();
464                }
465            }
466            _ => {
467                // for postgres, we give unknown SET statements a warning with
468                //  success, this is prevent the SET call becoming a blocker
469                //  of connection establishment
470                //
471                if query_ctx.channel() == Channel::Postgres {
472                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
473                } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
474                    // Just ignore `SET @@` commands for MySQL
475                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
476                } else {
477                    return NotSupportedSnafu {
478                        feat: format!("Unsupported set variable {}", var_name),
479                    }
480                    .fail();
481                }
482            }
483        }
484        Ok(Output::new_with_affected_rows(0))
485    }
486
487    #[tracing::instrument(skip_all)]
488    pub async fn plan(
489        &self,
490        stmt: &QueryStatement,
491        query_ctx: QueryContextRef,
492    ) -> Result<LogicalPlan> {
493        self.query_engine
494            .planner()
495            .plan(stmt, query_ctx)
496            .await
497            .context(PlanStatementSnafu)
498    }
499
500    /// Execute [`LogicalPlan`] directly.
501    #[tracing::instrument(skip_all)]
502    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
503        self.query_engine
504            .execute(plan, query_ctx)
505            .await
506            .context(ExecLogicalPlanSnafu)
507    }
508
509    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
510        self.query_engine
511            .planner()
512            .optimize(plan)
513            .context(PlanStatementSnafu)
514    }
515
516    #[tracing::instrument(skip_all)]
517    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
518        let plan = self.plan(&stmt, query_ctx.clone()).await?;
519        self.exec_plan(plan, query_ctx).await
520    }
521
522    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
523        let TableReference {
524            catalog,
525            schema,
526            table,
527        } = table_ref;
528        self.catalog_manager
529            .table(catalog, schema, table, None)
530            .await
531            .context(CatalogSnafu)?
532            .with_context(|| TableNotFoundSnafu {
533                table_name: table_ref.to_string(),
534            })
535    }
536
537    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
538        &self.procedure_executor
539    }
540
541    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
542        &self.cache_invalidator
543    }
544
545    /// Convert truncate time ranges for the given table from sql values to timestamps
546    ///
547    pub async fn convert_truncate_time_ranges(
548        &self,
549        table_name: &TableName,
550        sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
551        query_ctx: &QueryContextRef,
552    ) -> Result<Vec<(Timestamp, Timestamp)>> {
553        if sql_values_time_range.is_empty() {
554            return Ok(vec![]);
555        }
556        let table = self.get_table(&table_name.table_ref()).await?;
557        let info = table.table_info();
558        let time_index_dt = info
559            .meta
560            .schema
561            .timestamp_column()
562            .context(UnexpectedSnafu {
563                violated: "Table must have a timestamp column",
564            })?;
565
566        let time_unit = time_index_dt
567            .data_type
568            .as_timestamp()
569            .with_context(|| UnexpectedSnafu {
570                violated: format!(
571                    "Table {}'s time index column must be a timestamp type, found: {:?}",
572                    table_name, time_index_dt
573                ),
574            })?
575            .unit();
576
577        let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
578        for (start, end) in sql_values_time_range {
579            let start = common_sql::convert::sql_value_to_value(
580                "range_start",
581                &ConcreteDataType::timestamp_datatype(time_unit),
582                start,
583                Some(&query_ctx.timezone()),
584                None,
585                false,
586            )
587            .context(SqlCommonSnafu)
588            .and_then(|v| {
589                if let datatypes::value::Value::Timestamp(t) = v {
590                    Ok(t)
591                } else {
592                    error::InvalidSqlSnafu {
593                        err_msg: format!("Expected a timestamp value, found {v:?}"),
594                    }
595                    .fail()
596                }
597            })?;
598
599            let end = common_sql::convert::sql_value_to_value(
600                "range_end",
601                &ConcreteDataType::timestamp_datatype(time_unit),
602                end,
603                Some(&query_ctx.timezone()),
604                None,
605                false,
606            )
607            .context(SqlCommonSnafu)
608            .and_then(|v| {
609                if let datatypes::value::Value::Timestamp(t) = v {
610                    Ok(t)
611                } else {
612                    error::InvalidSqlSnafu {
613                        err_msg: format!("Expected a timestamp value, found {v:?}"),
614                    }
615                    .fail()
616                }
617            })?;
618            time_ranges.push((start, end));
619        }
620        Ok(time_ranges)
621    }
622
623    /// Returns the inserter for the statement executor.
624    pub(crate) fn inserter(&self) -> &InserterRef {
625        &self.inserter
626    }
627}
628
629fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
630    let CopyQueryToArgument {
631        with,
632        connection,
633        location,
634    } = stmt;
635
636    Ok(CopyQueryToRequest {
637        location,
638        with: with.into_map(),
639        connection: connection.into_map(),
640    })
641}
642
643fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
644    let direction = match stmt {
645        CopyTable::To(_) => CopyDirection::Export,
646        CopyTable::From(_) => CopyDirection::Import,
647    };
648
649    let CopyTableArgument {
650        location,
651        connection,
652        with,
653        table_name,
654        limit,
655        ..
656    } = match stmt {
657        CopyTable::To(arg) => arg,
658        CopyTable::From(arg) => arg,
659    };
660    let (catalog_name, schema_name, table_name) =
661        table_idents_to_full_name(&table_name, &query_ctx)
662            .map_err(BoxedError::new)
663            .context(ExternalSnafu)?;
664
665    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
666
667    let pattern = with
668        .get(common_datasource::file_format::FILE_PATTERN)
669        .cloned();
670
671    Ok(CopyTableRequest {
672        catalog_name,
673        schema_name,
674        table_name,
675        location,
676        with: with.into_map(),
677        connection: connection.into_map(),
678        pattern,
679        direction,
680        timestamp_range,
681        limit,
682    })
683}
684
685/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
686/// This function extracts the necessary info including catalog/database name, time range, etc.
687fn to_copy_database_request(
688    arg: CopyDatabaseArgument,
689    query_ctx: &QueryContextRef,
690) -> Result<CopyDatabaseRequest> {
691    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
692        .map_err(BoxedError::new)
693        .context(ExternalSnafu)?;
694    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
695
696    Ok(CopyDatabaseRequest {
697        catalog_name,
698        schema_name: database_name,
699        location: arg.location,
700        with: arg.with.into_map(),
701        connection: arg.connection.into_map(),
702        time_range,
703    })
704}
705
706/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
707/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
708/// The timezone used for conversion will respect that inside `query_ctx`.
709fn timestamp_range_from_option_map(
710    options: &OptionMap,
711    query_ctx: &QueryContextRef,
712) -> Result<Option<TimestampRange>> {
713    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
714    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
715    let time_range = match (start_timestamp, end_timestamp) {
716        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
717            error::InvalidTimestampRangeSnafu {
718                start: start.to_iso8601_string(),
719                end: end.to_iso8601_string(),
720            }
721        })?),
722        (Some(start), None) => Some(TimestampRange::from_start(start)),
723        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
724        (None, None) => None,
725    };
726    Ok(time_range)
727}
728
729/// Extracts timestamp from a [HashMap<String, String>] with given key.
730fn extract_timestamp(
731    map: &OptionMap,
732    key: &str,
733    query_ctx: &QueryContextRef,
734) -> Result<Option<Timestamp>> {
735    map.get(key)
736        .map(|v| {
737            Timestamp::from_str(v, Some(&query_ctx.timezone()))
738                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
739        })
740        .transpose()
741}
742
743fn idents_to_full_database_name(
744    obj_name: &ObjectName,
745    query_ctx: &QueryContextRef,
746) -> Result<(String, String)> {
747    match &obj_name.0[..] {
748        [database] => Ok((
749            query_ctx.current_catalog().to_owned(),
750            database.to_string_unquoted(),
751        )),
752        [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
753        _ => InvalidSqlSnafu {
754            err_msg: format!(
755                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
756            ),
757        }
758        .fail(),
759    }
760}
761
762/// The [`Inserter`] implementation for the statement executor.
763pub struct InserterImpl {
764    statement_executor: StatementExecutorRef,
765    options: Option<InsertOptions>,
766}
767
768impl InserterImpl {
769    pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
770        Self {
771            statement_executor,
772            options,
773        }
774    }
775}
776
777#[async_trait::async_trait]
778impl Inserter for InserterImpl {
779    async fn insert_rows(
780        &self,
781        context: &client::inserter::Context<'_>,
782        requests: RowInsertRequests,
783    ) -> ClientResult<()> {
784        let mut ctx_builder = QueryContextBuilder::default()
785            .current_catalog(context.catalog.to_string())
786            .current_schema(context.schema.to_string());
787        if let Some(options) = self.options.as_ref() {
788            ctx_builder = ctx_builder
789                .set_extension(
790                    TTL_KEY.to_string(),
791                    format_duration(options.ttl).to_string(),
792                )
793                .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
794        }
795        let query_ctx = ctx_builder.build().into();
796
797        self.statement_executor
798            .inserter()
799            .handle_row_inserts(
800                requests,
801                query_ctx,
802                self.statement_executor.as_ref(),
803                false,
804                false,
805            )
806            .await
807            .map_err(BoxedError::new)
808            .context(ClientExternalSnafu)
809            .map(|_| ())
810    }
811
812    fn set_options(&mut self, options: &InsertOptions) {
813        self.options = Some(*options);
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use std::assert_matches::assert_matches;
820    use std::collections::HashMap;
821
822    use common_time::range::TimestampRange;
823    use common_time::{Timestamp, Timezone};
824    use session::context::QueryContextBuilder;
825    use sql::statements::OptionMap;
826
827    use crate::error;
828    use crate::statement::copy_database::{
829        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
830    };
831    use crate::statement::timestamp_range_from_option_map;
832
833    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
834        let query_ctx = QueryContextBuilder::default()
835            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
836            .build()
837            .into();
838        let map = OptionMap::from(
839            [
840                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
841                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
842            ]
843            .into_iter()
844            .collect::<HashMap<_, _>>(),
845        );
846        timestamp_range_from_option_map(&map, &query_ctx)
847    }
848
849    #[test]
850    fn test_timestamp_range_from_option_map() {
851        assert_eq!(
852            Some(
853                TimestampRange::new(
854                    Timestamp::new_second(1649635200),
855                    Timestamp::new_second(1649664000),
856                )
857                .unwrap(),
858            ),
859            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
860        );
861
862        assert_matches!(
863            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
864            error::Error::InvalidTimestampRange { .. }
865        );
866    }
867}