operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use catalog::kvbackend::KvBackendCatalogManager;
33use catalog::process_manager::ProcessManagerRef;
34use catalog::CatalogManagerRef;
35use client::RecordBatches;
36use common_error::ext::BoxedError;
37use common_meta::cache::TableRouteCacheRef;
38use common_meta::cache_invalidator::CacheInvalidatorRef;
39use common_meta::ddl::ProcedureExecutorRef;
40use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
41use common_meta::key::schema_name::SchemaNameKey;
42use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
43use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
44use common_meta::kv_backend::KvBackendRef;
45use common_query::Output;
46use common_telemetry::tracing;
47use common_time::range::TimestampRange;
48use common_time::Timestamp;
49use datafusion_expr::LogicalPlan;
50use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
51use query::parser::QueryStatement;
52use query::QueryEngineRef;
53use session::context::{Channel, QueryContextRef};
54use session::table_name::table_idents_to_full_name;
55use set::{set_query_timeout, set_read_preference};
56use snafu::{ensure, OptionExt, ResultExt};
57use sql::statements::copy::{
58    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
59};
60use sql::statements::set_variables::SetVariables;
61use sql::statements::show::ShowCreateTableVariant;
62use sql::statements::statement::Statement;
63use sql::statements::OptionMap;
64use sql::util::format_raw_object_name;
65use sqlparser::ast::ObjectName;
66use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
67use table::table_name::TableName;
68use table::table_reference::TableReference;
69use table::TableRef;
70
71use self::set::{
72    set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
73};
74use crate::error::{
75    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
76    PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
77    UpgradeCatalogManagerRefSnafu,
78};
79use crate::insert::InserterRef;
80use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
81use crate::statement::set::set_allow_query_fallback;
82
83#[derive(Clone)]
84pub struct StatementExecutor {
85    catalog_manager: CatalogManagerRef,
86    query_engine: QueryEngineRef,
87    procedure_executor: ProcedureExecutorRef,
88    table_metadata_manager: TableMetadataManagerRef,
89    flow_metadata_manager: FlowMetadataManagerRef,
90    view_info_manager: ViewInfoManagerRef,
91    partition_manager: PartitionRuleManagerRef,
92    cache_invalidator: CacheInvalidatorRef,
93    inserter: InserterRef,
94    process_manager: Option<ProcessManagerRef>,
95}
96
97pub type StatementExecutorRef = Arc<StatementExecutor>;
98
99impl StatementExecutor {
100    #[allow(clippy::too_many_arguments)]
101    pub fn new(
102        catalog_manager: CatalogManagerRef,
103        query_engine: QueryEngineRef,
104        procedure_executor: ProcedureExecutorRef,
105        kv_backend: KvBackendRef,
106        cache_invalidator: CacheInvalidatorRef,
107        inserter: InserterRef,
108        table_route_cache: TableRouteCacheRef,
109        process_manager: Option<ProcessManagerRef>,
110    ) -> Self {
111        Self {
112            catalog_manager,
113            query_engine,
114            procedure_executor,
115            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
116            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
117            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
118            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
119            cache_invalidator,
120            inserter,
121            process_manager,
122        }
123    }
124
125    #[cfg(feature = "testing")]
126    pub async fn execute_stmt(
127        &self,
128        stmt: QueryStatement,
129        query_ctx: QueryContextRef,
130    ) -> Result<Output> {
131        match stmt {
132            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
133            QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
134        }
135    }
136
137    #[tracing::instrument(skip_all)]
138    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
139        match stmt {
140            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
141                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
142            }
143
144            Statement::DeclareCursor(declare_cursor) => {
145                self.declare_cursor(declare_cursor, query_ctx).await
146            }
147            Statement::FetchCursor(fetch_cursor) => {
148                self.fetch_cursor(fetch_cursor, query_ctx).await
149            }
150            Statement::CloseCursor(close_cursor) => {
151                self.close_cursor(close_cursor, query_ctx).await
152            }
153
154            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
155
156            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
157
158            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
159
160            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
161
162            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
163
164            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
165
166            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
167
168            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
169
170            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
171
172            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
173
174            #[cfg(feature = "enterprise")]
175            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
176
177            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
178                let query_output = self
179                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
180                    .await?;
181                let req = to_copy_query_request(stmt.arg)?;
182
183                self.copy_query_to(req, query_output)
184                    .await
185                    .map(Output::new_with_affected_rows)
186            }
187
188            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
189                let req = to_copy_table_request(stmt, query_ctx.clone())?;
190                match req.direction {
191                    CopyDirection::Export => self
192                        .copy_table_to(req, query_ctx)
193                        .await
194                        .map(Output::new_with_affected_rows),
195                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
196                }
197            }
198
199            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
200                match copy_database {
201                    CopyDatabase::To(arg) => {
202                        self.copy_database_to(
203                            to_copy_database_request(arg, &query_ctx)?,
204                            query_ctx.clone(),
205                        )
206                        .await
207                    }
208                    CopyDatabase::From(arg) => {
209                        self.copy_database_from(
210                            to_copy_database_request(arg, &query_ctx)?,
211                            query_ctx,
212                        )
213                        .await
214                    }
215                }
216            }
217
218            Statement::CreateTable(stmt) => {
219                let _ = self.create_table(stmt, query_ctx).await?;
220                Ok(Output::new_with_affected_rows(0))
221            }
222            Statement::CreateTableLike(stmt) => {
223                let _ = self.create_table_like(stmt, query_ctx).await?;
224                Ok(Output::new_with_affected_rows(0))
225            }
226            Statement::CreateExternalTable(stmt) => {
227                let _ = self.create_external_table(stmt, query_ctx).await?;
228                Ok(Output::new_with_affected_rows(0))
229            }
230            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
231            #[cfg(feature = "enterprise")]
232            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
233            Statement::DropFlow(stmt) => {
234                self.drop_flow(
235                    query_ctx.current_catalog().to_string(),
236                    format_raw_object_name(stmt.flow_name()),
237                    stmt.drop_if_exists(),
238                    query_ctx,
239                )
240                .await
241            }
242            #[cfg(feature = "enterprise")]
243            Statement::DropTrigger(stmt) => {
244                self.drop_trigger(
245                    query_ctx.current_catalog().to_string(),
246                    format_raw_object_name(stmt.trigger_name()),
247                    stmt.drop_if_exists(),
248                    query_ctx,
249                )
250                .await
251            }
252            Statement::CreateView(stmt) => {
253                let _ = self.create_view(stmt, query_ctx).await?;
254                Ok(Output::new_with_affected_rows(0))
255            }
256            Statement::DropView(stmt) => {
257                let (catalog_name, schema_name, view_name) =
258                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
259                        .map_err(BoxedError::new)
260                        .context(ExternalSnafu)?;
261
262                self.drop_view(
263                    catalog_name,
264                    schema_name,
265                    view_name,
266                    stmt.drop_if_exists,
267                    query_ctx,
268                )
269                .await
270            }
271            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
272
273            Statement::AlterDatabase(alter_database) => {
274                self.alter_database(alter_database, query_ctx).await
275            }
276
277            #[cfg(feature = "enterprise")]
278            Statement::AlterTrigger(alter_trigger) => {
279                self.alter_trigger(alter_trigger, query_ctx).await
280            }
281
282            Statement::DropTable(stmt) => {
283                let mut table_names = Vec::with_capacity(stmt.table_names().len());
284                for table_name_stmt in stmt.table_names() {
285                    let (catalog, schema, table) =
286                        table_idents_to_full_name(table_name_stmt, &query_ctx)
287                            .map_err(BoxedError::new)
288                            .context(ExternalSnafu)?;
289                    table_names.push(TableName::new(catalog, schema, table));
290                }
291                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
292                    .await
293            }
294            Statement::DropDatabase(stmt) => {
295                self.drop_database(
296                    query_ctx.current_catalog().to_string(),
297                    format_raw_object_name(stmt.name()),
298                    stmt.drop_if_exists(),
299                    query_ctx,
300                )
301                .await
302            }
303            Statement::TruncateTable(stmt) => {
304                let (catalog, schema, table) =
305                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
306                        .map_err(BoxedError::new)
307                        .context(ExternalSnafu)?;
308                let table_name = TableName::new(catalog, schema, table);
309                self.truncate_table(table_name, query_ctx).await
310            }
311            Statement::CreateDatabase(stmt) => {
312                self.create_database(
313                    &format_raw_object_name(&stmt.name),
314                    stmt.if_not_exists,
315                    stmt.options.into_map(),
316                    query_ctx,
317                )
318                .await
319            }
320            Statement::ShowCreateDatabase(show) => {
321                let (catalog, database) =
322                    idents_to_full_database_name(&show.database_name, &query_ctx)
323                        .map_err(BoxedError::new)
324                        .context(ExternalSnafu)?;
325                let table_metadata_manager = self
326                    .catalog_manager
327                    .as_any()
328                    .downcast_ref::<KvBackendCatalogManager>()
329                    .map(|manager| manager.table_metadata_manager_ref().clone())
330                    .context(UpgradeCatalogManagerRefSnafu)?;
331                let opts: HashMap<String, String> = table_metadata_manager
332                    .schema_manager()
333                    .get(SchemaNameKey::new(&catalog, &database))
334                    .await
335                    .context(TableMetadataManagerSnafu)?
336                    .context(SchemaNotFoundSnafu {
337                        schema_info: &database,
338                    })?
339                    .into_inner()
340                    .into();
341
342                self.show_create_database(&database, opts.into()).await
343            }
344            Statement::ShowCreateTable(show) => {
345                let (catalog, schema, table) =
346                    table_idents_to_full_name(&show.table_name, &query_ctx)
347                        .map_err(BoxedError::new)
348                        .context(ExternalSnafu)?;
349
350                let table_ref = self
351                    .catalog_manager
352                    .table(&catalog, &schema, &table, Some(&query_ctx))
353                    .await
354                    .context(CatalogSnafu)?
355                    .context(TableNotFoundSnafu { table_name: &table })?;
356                let table_name = TableName::new(catalog, schema, table);
357
358                match show.variant {
359                    ShowCreateTableVariant::Original => {
360                        self.show_create_table(table_name, table_ref, query_ctx)
361                            .await
362                    }
363                    ShowCreateTableVariant::PostgresForeignTable => {
364                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
365                            .await
366                    }
367                }
368            }
369            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
370            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
371            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
372            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
373            Statement::ShowColumns(show_columns) => {
374                self.show_columns(show_columns, query_ctx).await
375            }
376            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
377            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
378            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
379            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
380            Statement::Use(db) => self.use_database(db, query_ctx).await,
381            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
382            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
383            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
384        }
385    }
386
387    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
388        let catalog = query_ctx.current_catalog();
389        ensure!(
390            self.catalog_manager
391                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
392                .await
393                .context(CatalogSnafu)?,
394            SchemaNotFoundSnafu { schema_info: &db }
395        );
396
397        query_ctx.set_current_schema(&db);
398
399        Ok(Output::new_with_record_batches(RecordBatches::empty()))
400    }
401
402    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
403        let var_name = set_var.variable.to_string().to_uppercase();
404
405        match var_name.as_str() {
406            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
407
408            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
409                set_timezone(set_var.value, query_ctx)?
410            }
411
412            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
413
414            // Same as "bytea_output", we just ignore it here.
415            // Not harmful since it only relates to how date is viewed in client app's output.
416            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
417            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
418
419            // Allow query to fallback when failed to push down.
420            "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
421
422            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
423            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
424                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
425                Channel::Postgres => {
426                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
427                }
428                _ => {
429                    return NotSupportedSnafu {
430                        feat: format!("Unsupported set variable {}", var_name),
431                    }
432                    .fail()
433                }
434            },
435            "STATEMENT_TIMEOUT" => {
436                if query_ctx.channel() == Channel::Postgres {
437                    set_query_timeout(set_var.value, query_ctx)?
438                } else {
439                    return NotSupportedSnafu {
440                        feat: format!("Unsupported set variable {}", var_name),
441                    }
442                    .fail();
443                }
444            }
445            "SEARCH_PATH" => {
446                if query_ctx.channel() == Channel::Postgres {
447                    set_search_path(set_var.value, query_ctx)?
448                } else {
449                    return NotSupportedSnafu {
450                        feat: format!("Unsupported set variable {}", var_name),
451                    }
452                    .fail();
453                }
454            }
455            _ => {
456                // for postgres, we give unknown SET statements a warning with
457                //  success, this is prevent the SET call becoming a blocker
458                //  of connection establishment
459                //
460                if query_ctx.channel() == Channel::Postgres {
461                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
462                } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
463                    // Just ignore `SET @@` commands for MySQL
464                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
465                } else {
466                    return NotSupportedSnafu {
467                        feat: format!("Unsupported set variable {}", var_name),
468                    }
469                    .fail();
470                }
471            }
472        }
473        Ok(Output::new_with_affected_rows(0))
474    }
475
476    #[tracing::instrument(skip_all)]
477    pub async fn plan(
478        &self,
479        stmt: &QueryStatement,
480        query_ctx: QueryContextRef,
481    ) -> Result<LogicalPlan> {
482        self.query_engine
483            .planner()
484            .plan(stmt, query_ctx)
485            .await
486            .context(PlanStatementSnafu)
487    }
488
489    /// Execute [`LogicalPlan`] directly.
490    #[tracing::instrument(skip_all)]
491    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
492        self.query_engine
493            .execute(plan, query_ctx)
494            .await
495            .context(ExecLogicalPlanSnafu)
496    }
497
498    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
499        self.query_engine
500            .planner()
501            .optimize(plan)
502            .context(PlanStatementSnafu)
503    }
504
505    #[tracing::instrument(skip_all)]
506    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
507        let plan = self.plan(&stmt, query_ctx.clone()).await?;
508        self.exec_plan(plan, query_ctx).await
509    }
510
511    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
512        let TableReference {
513            catalog,
514            schema,
515            table,
516        } = table_ref;
517        self.catalog_manager
518            .table(catalog, schema, table, None)
519            .await
520            .context(CatalogSnafu)?
521            .with_context(|| TableNotFoundSnafu {
522                table_name: table_ref.to_string(),
523            })
524    }
525
526    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
527        &self.procedure_executor
528    }
529
530    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
531        &self.cache_invalidator
532    }
533}
534
535fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
536    let CopyQueryToArgument {
537        with,
538        connection,
539        location,
540    } = stmt;
541
542    Ok(CopyQueryToRequest {
543        location,
544        with: with.into_map(),
545        connection: connection.into_map(),
546    })
547}
548
549fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
550    let direction = match stmt {
551        CopyTable::To(_) => CopyDirection::Export,
552        CopyTable::From(_) => CopyDirection::Import,
553    };
554
555    let CopyTableArgument {
556        location,
557        connection,
558        with,
559        table_name,
560        limit,
561        ..
562    } = match stmt {
563        CopyTable::To(arg) => arg,
564        CopyTable::From(arg) => arg,
565    };
566    let (catalog_name, schema_name, table_name) =
567        table_idents_to_full_name(&table_name, &query_ctx)
568            .map_err(BoxedError::new)
569            .context(ExternalSnafu)?;
570
571    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
572
573    let pattern = with
574        .get(common_datasource::file_format::FILE_PATTERN)
575        .cloned();
576
577    Ok(CopyTableRequest {
578        catalog_name,
579        schema_name,
580        table_name,
581        location,
582        with: with.into_map(),
583        connection: connection.into_map(),
584        pattern,
585        direction,
586        timestamp_range,
587        limit,
588    })
589}
590
591/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
592/// This function extracts the necessary info including catalog/database name, time range, etc.
593fn to_copy_database_request(
594    arg: CopyDatabaseArgument,
595    query_ctx: &QueryContextRef,
596) -> Result<CopyDatabaseRequest> {
597    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
598        .map_err(BoxedError::new)
599        .context(ExternalSnafu)?;
600    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
601
602    Ok(CopyDatabaseRequest {
603        catalog_name,
604        schema_name: database_name,
605        location: arg.location,
606        with: arg.with.into_map(),
607        connection: arg.connection.into_map(),
608        time_range,
609    })
610}
611
612/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
613/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
614/// The timezone used for conversion will respect that inside `query_ctx`.
615fn timestamp_range_from_option_map(
616    options: &OptionMap,
617    query_ctx: &QueryContextRef,
618) -> Result<Option<TimestampRange>> {
619    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
620    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
621    let time_range = match (start_timestamp, end_timestamp) {
622        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
623            error::InvalidTimestampRangeSnafu {
624                start: start.to_iso8601_string(),
625                end: end.to_iso8601_string(),
626            }
627        })?),
628        (Some(start), None) => Some(TimestampRange::from_start(start)),
629        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
630        (None, None) => None,
631    };
632    Ok(time_range)
633}
634
635/// Extracts timestamp from a [HashMap<String, String>] with given key.
636fn extract_timestamp(
637    map: &OptionMap,
638    key: &str,
639    query_ctx: &QueryContextRef,
640) -> Result<Option<Timestamp>> {
641    map.get(key)
642        .map(|v| {
643            Timestamp::from_str(v, Some(&query_ctx.timezone()))
644                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
645        })
646        .transpose()
647}
648
649fn idents_to_full_database_name(
650    obj_name: &ObjectName,
651    query_ctx: &QueryContextRef,
652) -> Result<(String, String)> {
653    match &obj_name.0[..] {
654        [database] => Ok((
655            query_ctx.current_catalog().to_owned(),
656            database.value.clone(),
657        )),
658        [catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
659        _ => InvalidSqlSnafu {
660            err_msg: format!(
661                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
662            ),
663        }
664        .fail(),
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use std::assert_matches::assert_matches;
671    use std::collections::HashMap;
672
673    use common_time::range::TimestampRange;
674    use common_time::{Timestamp, Timezone};
675    use session::context::QueryContextBuilder;
676    use sql::statements::OptionMap;
677
678    use crate::error;
679    use crate::statement::copy_database::{
680        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
681    };
682    use crate::statement::timestamp_range_from_option_map;
683
684    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
685        let query_ctx = QueryContextBuilder::default()
686            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
687            .build()
688            .into();
689        let map = OptionMap::from(
690            [
691                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
692                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
693            ]
694            .into_iter()
695            .collect::<HashMap<_, _>>(),
696        );
697        timestamp_range_from_option_map(&map, &query_ctx)
698    }
699
700    #[test]
701    fn test_timestamp_range_from_option_map() {
702        assert_eq!(
703            Some(
704                TimestampRange::new(
705                    Timestamp::new_second(1649635200),
706                    Timestamp::new_second(1649664000),
707                )
708                .unwrap(),
709            ),
710            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
711        );
712
713        assert_matches!(
714            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
715            error::Error::InvalidTimestampRange { .. }
716        );
717    }
718}