operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::time::Duration;
33
34use async_stream::stream;
35use catalog::kvbackend::KvBackendCatalogManager;
36use catalog::process_manager::ProcessManagerRef;
37use catalog::CatalogManagerRef;
38use client::{OutputData, RecordBatches};
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::ddl::ProcedureExecutorRef;
43use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
44use common_meta::key::schema_name::SchemaNameKey;
45use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
46use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
47use common_meta::kv_backend::KvBackendRef;
48use common_query::Output;
49use common_recordbatch::error::StreamTimeoutSnafu;
50use common_recordbatch::RecordBatchStreamWrapper;
51use common_telemetry::tracing;
52use common_time::range::TimestampRange;
53use common_time::Timestamp;
54use datafusion_expr::LogicalPlan;
55use futures::stream::{Stream, StreamExt};
56use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
57use query::parser::QueryStatement;
58use query::QueryEngineRef;
59use session::context::{Channel, QueryContextRef};
60use session::table_name::table_idents_to_full_name;
61use set::{set_query_timeout, set_read_preference};
62use snafu::{ensure, OptionExt, ResultExt};
63use sql::statements::copy::{
64    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
65};
66use sql::statements::set_variables::SetVariables;
67use sql::statements::show::ShowCreateTableVariant;
68use sql::statements::statement::Statement;
69use sql::statements::OptionMap;
70use sql::util::format_raw_object_name;
71use sqlparser::ast::ObjectName;
72use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
73use table::table_name::TableName;
74use table::table_reference::TableReference;
75use table::TableRef;
76
77use self::set::{
78    set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
79};
80use crate::error::{
81    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
82    PlanStatementSnafu, Result, SchemaNotFoundSnafu, StatementTimeoutSnafu,
83    TableMetadataManagerSnafu, TableNotFoundSnafu, UpgradeCatalogManagerRefSnafu,
84};
85use crate::insert::InserterRef;
86use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
87
88#[derive(Clone)]
89pub struct StatementExecutor {
90    catalog_manager: CatalogManagerRef,
91    query_engine: QueryEngineRef,
92    procedure_executor: ProcedureExecutorRef,
93    table_metadata_manager: TableMetadataManagerRef,
94    flow_metadata_manager: FlowMetadataManagerRef,
95    view_info_manager: ViewInfoManagerRef,
96    partition_manager: PartitionRuleManagerRef,
97    cache_invalidator: CacheInvalidatorRef,
98    inserter: InserterRef,
99    process_manager: Option<ProcessManagerRef>,
100}
101
102pub type StatementExecutorRef = Arc<StatementExecutor>;
103
104impl StatementExecutor {
105    #[allow(clippy::too_many_arguments)]
106    pub fn new(
107        catalog_manager: CatalogManagerRef,
108        query_engine: QueryEngineRef,
109        procedure_executor: ProcedureExecutorRef,
110        kv_backend: KvBackendRef,
111        cache_invalidator: CacheInvalidatorRef,
112        inserter: InserterRef,
113        table_route_cache: TableRouteCacheRef,
114        process_manager: Option<ProcessManagerRef>,
115    ) -> Self {
116        Self {
117            catalog_manager,
118            query_engine,
119            procedure_executor,
120            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
121            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
122            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
123            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
124            cache_invalidator,
125            inserter,
126            process_manager,
127        }
128    }
129
130    #[cfg(feature = "testing")]
131    pub async fn execute_stmt(
132        &self,
133        stmt: QueryStatement,
134        query_ctx: QueryContextRef,
135    ) -> Result<Output> {
136        match stmt {
137            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
138            QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
139        }
140    }
141
142    #[tracing::instrument(skip_all)]
143    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
144        match stmt {
145            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
146                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
147            }
148
149            Statement::DeclareCursor(declare_cursor) => {
150                self.declare_cursor(declare_cursor, query_ctx).await
151            }
152            Statement::FetchCursor(fetch_cursor) => {
153                self.fetch_cursor(fetch_cursor, query_ctx).await
154            }
155            Statement::CloseCursor(close_cursor) => {
156                self.close_cursor(close_cursor, query_ctx).await
157            }
158
159            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
160
161            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
162
163            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
164
165            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
166
167            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
168
169            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
170
171            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
172
173            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
174
175            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
176
177            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
178
179            #[cfg(feature = "enterprise")]
180            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
181
182            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
183                let query_output = self
184                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
185                    .await?;
186                let req = to_copy_query_request(stmt.arg)?;
187
188                self.copy_query_to(req, query_output)
189                    .await
190                    .map(Output::new_with_affected_rows)
191            }
192
193            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
194                let req = to_copy_table_request(stmt, query_ctx.clone())?;
195                match req.direction {
196                    CopyDirection::Export => self
197                        .copy_table_to(req, query_ctx)
198                        .await
199                        .map(Output::new_with_affected_rows),
200                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
201                }
202            }
203
204            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
205                match copy_database {
206                    CopyDatabase::To(arg) => {
207                        self.copy_database_to(
208                            to_copy_database_request(arg, &query_ctx)?,
209                            query_ctx.clone(),
210                        )
211                        .await
212                    }
213                    CopyDatabase::From(arg) => {
214                        self.copy_database_from(
215                            to_copy_database_request(arg, &query_ctx)?,
216                            query_ctx,
217                        )
218                        .await
219                    }
220                }
221            }
222
223            Statement::CreateTable(stmt) => {
224                let _ = self.create_table(stmt, query_ctx).await?;
225                Ok(Output::new_with_affected_rows(0))
226            }
227            Statement::CreateTableLike(stmt) => {
228                let _ = self.create_table_like(stmt, query_ctx).await?;
229                Ok(Output::new_with_affected_rows(0))
230            }
231            Statement::CreateExternalTable(stmt) => {
232                let _ = self.create_external_table(stmt, query_ctx).await?;
233                Ok(Output::new_with_affected_rows(0))
234            }
235            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
236            #[cfg(feature = "enterprise")]
237            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
238            Statement::DropFlow(stmt) => {
239                self.drop_flow(
240                    query_ctx.current_catalog().to_string(),
241                    format_raw_object_name(stmt.flow_name()),
242                    stmt.drop_if_exists(),
243                    query_ctx,
244                )
245                .await
246            }
247            #[cfg(feature = "enterprise")]
248            Statement::DropTrigger(stmt) => {
249                self.drop_trigger(
250                    query_ctx.current_catalog().to_string(),
251                    format_raw_object_name(stmt.trigger_name()),
252                    stmt.drop_if_exists(),
253                    query_ctx,
254                )
255                .await
256            }
257            Statement::CreateView(stmt) => {
258                let _ = self.create_view(stmt, query_ctx).await?;
259                Ok(Output::new_with_affected_rows(0))
260            }
261            Statement::DropView(stmt) => {
262                let (catalog_name, schema_name, view_name) =
263                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
264                        .map_err(BoxedError::new)
265                        .context(ExternalSnafu)?;
266
267                self.drop_view(
268                    catalog_name,
269                    schema_name,
270                    view_name,
271                    stmt.drop_if_exists,
272                    query_ctx,
273                )
274                .await
275            }
276            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
277
278            Statement::AlterDatabase(alter_database) => {
279                self.alter_database(alter_database, query_ctx).await
280            }
281
282            Statement::DropTable(stmt) => {
283                let mut table_names = Vec::with_capacity(stmt.table_names().len());
284                for table_name_stmt in stmt.table_names() {
285                    let (catalog, schema, table) =
286                        table_idents_to_full_name(table_name_stmt, &query_ctx)
287                            .map_err(BoxedError::new)
288                            .context(ExternalSnafu)?;
289                    table_names.push(TableName::new(catalog, schema, table));
290                }
291                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
292                    .await
293            }
294            Statement::DropDatabase(stmt) => {
295                self.drop_database(
296                    query_ctx.current_catalog().to_string(),
297                    format_raw_object_name(stmt.name()),
298                    stmt.drop_if_exists(),
299                    query_ctx,
300                )
301                .await
302            }
303            Statement::TruncateTable(stmt) => {
304                let (catalog, schema, table) =
305                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
306                        .map_err(BoxedError::new)
307                        .context(ExternalSnafu)?;
308                let table_name = TableName::new(catalog, schema, table);
309                self.truncate_table(table_name, query_ctx).await
310            }
311            Statement::CreateDatabase(stmt) => {
312                self.create_database(
313                    &format_raw_object_name(&stmt.name),
314                    stmt.if_not_exists,
315                    stmt.options.into_map(),
316                    query_ctx,
317                )
318                .await
319            }
320            Statement::ShowCreateDatabase(show) => {
321                let (catalog, database) =
322                    idents_to_full_database_name(&show.database_name, &query_ctx)
323                        .map_err(BoxedError::new)
324                        .context(ExternalSnafu)?;
325                let table_metadata_manager = self
326                    .catalog_manager
327                    .as_any()
328                    .downcast_ref::<KvBackendCatalogManager>()
329                    .map(|manager| manager.table_metadata_manager_ref().clone())
330                    .context(UpgradeCatalogManagerRefSnafu)?;
331                let opts: HashMap<String, String> = table_metadata_manager
332                    .schema_manager()
333                    .get(SchemaNameKey::new(&catalog, &database))
334                    .await
335                    .context(TableMetadataManagerSnafu)?
336                    .context(SchemaNotFoundSnafu {
337                        schema_info: &database,
338                    })?
339                    .into_inner()
340                    .into();
341
342                self.show_create_database(&database, opts.into()).await
343            }
344            Statement::ShowCreateTable(show) => {
345                let (catalog, schema, table) =
346                    table_idents_to_full_name(&show.table_name, &query_ctx)
347                        .map_err(BoxedError::new)
348                        .context(ExternalSnafu)?;
349
350                let table_ref = self
351                    .catalog_manager
352                    .table(&catalog, &schema, &table, Some(&query_ctx))
353                    .await
354                    .context(CatalogSnafu)?
355                    .context(TableNotFoundSnafu { table_name: &table })?;
356                let table_name = TableName::new(catalog, schema, table);
357
358                match show.variant {
359                    ShowCreateTableVariant::Original => {
360                        self.show_create_table(table_name, table_ref, query_ctx)
361                            .await
362                    }
363                    ShowCreateTableVariant::PostgresForeignTable => {
364                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
365                            .await
366                    }
367                }
368            }
369            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
370            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
371            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
372            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
373            Statement::ShowColumns(show_columns) => {
374                self.show_columns(show_columns, query_ctx).await
375            }
376            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
377            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
378            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
379            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
380            Statement::Use(db) => self.use_database(db, query_ctx).await,
381            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
382            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
383            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
384        }
385    }
386
387    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
388        let catalog = query_ctx.current_catalog();
389        ensure!(
390            self.catalog_manager
391                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
392                .await
393                .context(CatalogSnafu)?,
394            SchemaNotFoundSnafu { schema_info: &db }
395        );
396
397        query_ctx.set_current_schema(&db);
398
399        Ok(Output::new_with_record_batches(RecordBatches::empty()))
400    }
401
402    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
403        let var_name = set_var.variable.to_string().to_uppercase();
404
405        match var_name.as_str() {
406            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
407
408            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
409                set_timezone(set_var.value, query_ctx)?
410            }
411
412            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
413
414            // Same as "bytea_output", we just ignore it here.
415            // Not harmful since it only relates to how date is viewed in client app's output.
416            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
417            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
418
419            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
420            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
421                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
422                Channel::Postgres => {
423                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
424                }
425                _ => {
426                    return NotSupportedSnafu {
427                        feat: format!("Unsupported set variable {}", var_name),
428                    }
429                    .fail()
430                }
431            },
432            "STATEMENT_TIMEOUT" => {
433                if query_ctx.channel() == Channel::Postgres {
434                    set_query_timeout(set_var.value, query_ctx)?
435                } else {
436                    return NotSupportedSnafu {
437                        feat: format!("Unsupported set variable {}", var_name),
438                    }
439                    .fail();
440                }
441            }
442            "SEARCH_PATH" => {
443                if query_ctx.channel() == Channel::Postgres {
444                    set_search_path(set_var.value, query_ctx)?
445                } else {
446                    return NotSupportedSnafu {
447                        feat: format!("Unsupported set variable {}", var_name),
448                    }
449                    .fail();
450                }
451            }
452            _ => {
453                // for postgres, we give unknown SET statements a warning with
454                //  success, this is prevent the SET call becoming a blocker
455                //  of connection establishment
456                //
457                if query_ctx.channel() == Channel::Postgres {
458                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
459                } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
460                    // Just ignore `SET @@` commands for MySQL
461                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
462                } else {
463                    return NotSupportedSnafu {
464                        feat: format!("Unsupported set variable {}", var_name),
465                    }
466                    .fail();
467                }
468            }
469        }
470        Ok(Output::new_with_affected_rows(0))
471    }
472
473    #[tracing::instrument(skip_all)]
474    pub async fn plan(
475        &self,
476        stmt: &QueryStatement,
477        query_ctx: QueryContextRef,
478    ) -> Result<LogicalPlan> {
479        self.query_engine
480            .planner()
481            .plan(stmt, query_ctx)
482            .await
483            .context(PlanStatementSnafu)
484    }
485
486    /// Execute [`LogicalPlan`] directly.
487    #[tracing::instrument(skip_all)]
488    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
489        self.query_engine
490            .execute(plan, query_ctx)
491            .await
492            .context(ExecLogicalPlanSnafu)
493    }
494
495    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
496        self.query_engine
497            .planner()
498            .optimize(plan)
499            .context(PlanStatementSnafu)
500    }
501
502    #[tracing::instrument(skip_all)]
503    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
504        let timeout = derive_timeout(&stmt, &query_ctx);
505        match timeout {
506            Some(timeout) => {
507                let start = tokio::time::Instant::now();
508                let output = tokio::time::timeout(timeout, self.plan_exec_inner(stmt, query_ctx))
509                    .await
510                    .context(StatementTimeoutSnafu)?;
511                // compute remaining timeout
512                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
513                Ok(attach_timeout(output?, remaining_timeout))
514            }
515            None => self.plan_exec_inner(stmt, query_ctx).await,
516        }
517    }
518
519    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
520        let TableReference {
521            catalog,
522            schema,
523            table,
524        } = table_ref;
525        self.catalog_manager
526            .table(catalog, schema, table, None)
527            .await
528            .context(CatalogSnafu)?
529            .with_context(|| TableNotFoundSnafu {
530                table_name: table_ref.to_string(),
531            })
532    }
533
534    async fn plan_exec_inner(
535        &self,
536        stmt: QueryStatement,
537        query_ctx: QueryContextRef,
538    ) -> Result<Output> {
539        let plan = self.plan(&stmt, query_ctx.clone()).await?;
540        self.exec_plan(plan, query_ctx).await
541    }
542}
543
544fn attach_timeout(output: Output, mut timeout: Duration) -> Output {
545    match output.data {
546        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
547        OutputData::Stream(mut stream) => {
548            let schema = stream.schema();
549            let s = Box::pin(stream! {
550                let start = tokio::time::Instant::now();
551                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.context(StreamTimeoutSnafu)? {
552                    yield item;
553                    timeout = timeout.checked_sub(tokio::time::Instant::now() - start).unwrap_or(Duration::ZERO);
554                }
555            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
556            let stream = RecordBatchStreamWrapper {
557                schema,
558                stream: s,
559                output_ordering: None,
560                metrics: Default::default(),
561            };
562            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
563        }
564    }
565}
566
567/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
568/// For MySQL, it applies only to read-only statements.
569fn derive_timeout(stmt: &QueryStatement, query_ctx: &QueryContextRef) -> Option<Duration> {
570    let query_timeout = query_ctx.query_timeout()?;
571    match (query_ctx.channel(), stmt) {
572        (Channel::Mysql, QueryStatement::Sql(Statement::Query(_)))
573        | (Channel::Postgres, QueryStatement::Sql(_)) => Some(query_timeout),
574        (_, _) => None,
575    }
576}
577
578fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
579    let CopyQueryToArgument {
580        with,
581        connection,
582        location,
583    } = stmt;
584
585    Ok(CopyQueryToRequest {
586        location,
587        with: with.into_map(),
588        connection: connection.into_map(),
589    })
590}
591
592fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
593    let direction = match stmt {
594        CopyTable::To(_) => CopyDirection::Export,
595        CopyTable::From(_) => CopyDirection::Import,
596    };
597
598    let CopyTableArgument {
599        location,
600        connection,
601        with,
602        table_name,
603        limit,
604        ..
605    } = match stmt {
606        CopyTable::To(arg) => arg,
607        CopyTable::From(arg) => arg,
608    };
609    let (catalog_name, schema_name, table_name) =
610        table_idents_to_full_name(&table_name, &query_ctx)
611            .map_err(BoxedError::new)
612            .context(ExternalSnafu)?;
613
614    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
615
616    let pattern = with
617        .get(common_datasource::file_format::FILE_PATTERN)
618        .cloned();
619
620    Ok(CopyTableRequest {
621        catalog_name,
622        schema_name,
623        table_name,
624        location,
625        with: with.into_map(),
626        connection: connection.into_map(),
627        pattern,
628        direction,
629        timestamp_range,
630        limit,
631    })
632}
633
634/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
635/// This function extracts the necessary info including catalog/database name, time range, etc.
636fn to_copy_database_request(
637    arg: CopyDatabaseArgument,
638    query_ctx: &QueryContextRef,
639) -> Result<CopyDatabaseRequest> {
640    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
641        .map_err(BoxedError::new)
642        .context(ExternalSnafu)?;
643    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
644
645    Ok(CopyDatabaseRequest {
646        catalog_name,
647        schema_name: database_name,
648        location: arg.location,
649        with: arg.with.into_map(),
650        connection: arg.connection.into_map(),
651        time_range,
652    })
653}
654
655/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
656/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
657/// The timezone used for conversion will respect that inside `query_ctx`.
658fn timestamp_range_from_option_map(
659    options: &OptionMap,
660    query_ctx: &QueryContextRef,
661) -> Result<Option<TimestampRange>> {
662    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
663    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
664    let time_range = match (start_timestamp, end_timestamp) {
665        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
666            error::InvalidTimestampRangeSnafu {
667                start: start.to_iso8601_string(),
668                end: end.to_iso8601_string(),
669            }
670        })?),
671        (Some(start), None) => Some(TimestampRange::from_start(start)),
672        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
673        (None, None) => None,
674    };
675    Ok(time_range)
676}
677
678/// Extracts timestamp from a [HashMap<String, String>] with given key.
679fn extract_timestamp(
680    map: &OptionMap,
681    key: &str,
682    query_ctx: &QueryContextRef,
683) -> Result<Option<Timestamp>> {
684    map.get(key)
685        .map(|v| {
686            Timestamp::from_str(v, Some(&query_ctx.timezone()))
687                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
688        })
689        .transpose()
690}
691
692fn idents_to_full_database_name(
693    obj_name: &ObjectName,
694    query_ctx: &QueryContextRef,
695) -> Result<(String, String)> {
696    match &obj_name.0[..] {
697        [database] => Ok((
698            query_ctx.current_catalog().to_owned(),
699            database.value.clone(),
700        )),
701        [catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
702        _ => InvalidSqlSnafu {
703            err_msg: format!(
704                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
705            ),
706        }
707        .fail(),
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use std::assert_matches::assert_matches;
714    use std::collections::HashMap;
715
716    use common_time::range::TimestampRange;
717    use common_time::{Timestamp, Timezone};
718    use session::context::QueryContextBuilder;
719    use sql::statements::OptionMap;
720
721    use crate::error;
722    use crate::statement::copy_database::{
723        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
724    };
725    use crate::statement::timestamp_range_from_option_map;
726
727    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
728        let query_ctx = QueryContextBuilder::default()
729            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
730            .build()
731            .into();
732        let map = OptionMap::from(
733            [
734                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
735                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
736            ]
737            .into_iter()
738            .collect::<HashMap<_, _>>(),
739        );
740        timestamp_range_from_option_map(&map, &query_ctx)
741    }
742
743    #[test]
744    fn test_timestamp_range_from_option_map() {
745        assert_eq!(
746            Some(
747                TimestampRange::new(
748                    Timestamp::new_second(1649635200),
749                    Timestamp::new_second(1649664000),
750                )
751                .unwrap(),
752            ),
753            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
754        );
755
756        assert_matches!(
757            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
758            error::Error::InvalidTimestampRange { .. }
759        );
760    }
761}