operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21mod ddl;
22mod describe;
23mod dml;
24mod set;
25mod show;
26mod tql;
27
28use std::collections::HashMap;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32
33use async_stream::stream;
34use catalog::kvbackend::KvBackendCatalogManager;
35use catalog::CatalogManagerRef;
36use client::{OutputData, RecordBatches};
37use common_error::ext::BoxedError;
38use common_meta::cache::TableRouteCacheRef;
39use common_meta::cache_invalidator::CacheInvalidatorRef;
40use common_meta::ddl::ProcedureExecutorRef;
41use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
42use common_meta::key::schema_name::SchemaNameKey;
43use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
44use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
45use common_meta::kv_backend::KvBackendRef;
46use common_query::Output;
47use common_recordbatch::error::StreamTimeoutSnafu;
48use common_recordbatch::RecordBatchStreamWrapper;
49use common_telemetry::tracing;
50use common_time::range::TimestampRange;
51use common_time::Timestamp;
52use datafusion_expr::LogicalPlan;
53use futures::stream::{Stream, StreamExt};
54use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
55use query::parser::QueryStatement;
56use query::QueryEngineRef;
57use session::context::{Channel, QueryContextRef};
58use session::table_name::table_idents_to_full_name;
59use set::{set_query_timeout, set_read_preference};
60use snafu::{ensure, OptionExt, ResultExt};
61use sql::statements::copy::{
62    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
63};
64use sql::statements::set_variables::SetVariables;
65use sql::statements::show::ShowCreateTableVariant;
66use sql::statements::statement::Statement;
67use sql::statements::OptionMap;
68use sql::util::format_raw_object_name;
69use sqlparser::ast::ObjectName;
70use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
71use table::table_name::TableName;
72use table::table_reference::TableReference;
73use table::TableRef;
74
75use self::set::{
76    set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
77};
78use crate::error::{
79    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
80    PlanStatementSnafu, Result, SchemaNotFoundSnafu, StatementTimeoutSnafu,
81    TableMetadataManagerSnafu, TableNotFoundSnafu, UpgradeCatalogManagerRefSnafu,
82};
83use crate::insert::InserterRef;
84use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
85
86#[derive(Clone)]
87pub struct StatementExecutor {
88    catalog_manager: CatalogManagerRef,
89    query_engine: QueryEngineRef,
90    procedure_executor: ProcedureExecutorRef,
91    table_metadata_manager: TableMetadataManagerRef,
92    flow_metadata_manager: FlowMetadataManagerRef,
93    view_info_manager: ViewInfoManagerRef,
94    partition_manager: PartitionRuleManagerRef,
95    cache_invalidator: CacheInvalidatorRef,
96    inserter: InserterRef,
97}
98
99pub type StatementExecutorRef = Arc<StatementExecutor>;
100
101impl StatementExecutor {
102    pub fn new(
103        catalog_manager: CatalogManagerRef,
104        query_engine: QueryEngineRef,
105        procedure_executor: ProcedureExecutorRef,
106        kv_backend: KvBackendRef,
107        cache_invalidator: CacheInvalidatorRef,
108        inserter: InserterRef,
109        table_route_cache: TableRouteCacheRef,
110    ) -> Self {
111        Self {
112            catalog_manager,
113            query_engine,
114            procedure_executor,
115            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
116            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
117            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
118            partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
119            cache_invalidator,
120            inserter,
121        }
122    }
123
124    #[cfg(feature = "testing")]
125    pub async fn execute_stmt(
126        &self,
127        stmt: QueryStatement,
128        query_ctx: QueryContextRef,
129    ) -> Result<Output> {
130        match stmt {
131            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
132            QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
133        }
134    }
135
136    #[tracing::instrument(skip_all)]
137    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
138        match stmt {
139            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
140                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
141            }
142
143            Statement::DeclareCursor(declare_cursor) => {
144                self.declare_cursor(declare_cursor, query_ctx).await
145            }
146            Statement::FetchCursor(fetch_cursor) => {
147                self.fetch_cursor(fetch_cursor, query_ctx).await
148            }
149            Statement::CloseCursor(close_cursor) => {
150                self.close_cursor(close_cursor, query_ctx).await
151            }
152
153            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
154
155            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
156
157            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
158
159            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
160
161            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
162
163            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
164
165            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
166
167            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
168
169            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
170
171            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
172
173            #[cfg(feature = "enterprise")]
174            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
175
176            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
177                let query_output = self
178                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
179                    .await?;
180                let req = to_copy_query_request(stmt.arg)?;
181
182                self.copy_query_to(req, query_output)
183                    .await
184                    .map(Output::new_with_affected_rows)
185            }
186
187            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
188                let req = to_copy_table_request(stmt, query_ctx.clone())?;
189                match req.direction {
190                    CopyDirection::Export => self
191                        .copy_table_to(req, query_ctx)
192                        .await
193                        .map(Output::new_with_affected_rows),
194                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
195                }
196            }
197
198            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
199                match copy_database {
200                    CopyDatabase::To(arg) => {
201                        self.copy_database_to(
202                            to_copy_database_request(arg, &query_ctx)?,
203                            query_ctx.clone(),
204                        )
205                        .await
206                    }
207                    CopyDatabase::From(arg) => {
208                        self.copy_database_from(
209                            to_copy_database_request(arg, &query_ctx)?,
210                            query_ctx,
211                        )
212                        .await
213                    }
214                }
215            }
216
217            Statement::CreateTable(stmt) => {
218                let _ = self.create_table(stmt, query_ctx).await?;
219                Ok(Output::new_with_affected_rows(0))
220            }
221            Statement::CreateTableLike(stmt) => {
222                let _ = self.create_table_like(stmt, query_ctx).await?;
223                Ok(Output::new_with_affected_rows(0))
224            }
225            Statement::CreateExternalTable(stmt) => {
226                let _ = self.create_external_table(stmt, query_ctx).await?;
227                Ok(Output::new_with_affected_rows(0))
228            }
229            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
230            #[cfg(feature = "enterprise")]
231            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
232            Statement::DropFlow(stmt) => {
233                self.drop_flow(
234                    query_ctx.current_catalog().to_string(),
235                    format_raw_object_name(stmt.flow_name()),
236                    stmt.drop_if_exists(),
237                    query_ctx,
238                )
239                .await
240            }
241            Statement::CreateView(stmt) => {
242                let _ = self.create_view(stmt, query_ctx).await?;
243                Ok(Output::new_with_affected_rows(0))
244            }
245            Statement::DropView(stmt) => {
246                let (catalog_name, schema_name, view_name) =
247                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
248                        .map_err(BoxedError::new)
249                        .context(ExternalSnafu)?;
250
251                self.drop_view(
252                    catalog_name,
253                    schema_name,
254                    view_name,
255                    stmt.drop_if_exists,
256                    query_ctx,
257                )
258                .await
259            }
260            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
261
262            Statement::AlterDatabase(alter_database) => {
263                self.alter_database(alter_database, query_ctx).await
264            }
265
266            Statement::DropTable(stmt) => {
267                let mut table_names = Vec::with_capacity(stmt.table_names().len());
268                for table_name_stmt in stmt.table_names() {
269                    let (catalog, schema, table) =
270                        table_idents_to_full_name(table_name_stmt, &query_ctx)
271                            .map_err(BoxedError::new)
272                            .context(ExternalSnafu)?;
273                    table_names.push(TableName::new(catalog, schema, table));
274                }
275                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
276                    .await
277            }
278            Statement::DropDatabase(stmt) => {
279                self.drop_database(
280                    query_ctx.current_catalog().to_string(),
281                    format_raw_object_name(stmt.name()),
282                    stmt.drop_if_exists(),
283                    query_ctx,
284                )
285                .await
286            }
287            Statement::TruncateTable(stmt) => {
288                let (catalog, schema, table) =
289                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
290                        .map_err(BoxedError::new)
291                        .context(ExternalSnafu)?;
292                let table_name = TableName::new(catalog, schema, table);
293                self.truncate_table(table_name, query_ctx).await
294            }
295            Statement::CreateDatabase(stmt) => {
296                self.create_database(
297                    &format_raw_object_name(&stmt.name),
298                    stmt.if_not_exists,
299                    stmt.options.into_map(),
300                    query_ctx,
301                )
302                .await
303            }
304            Statement::ShowCreateDatabase(show) => {
305                let (catalog, database) =
306                    idents_to_full_database_name(&show.database_name, &query_ctx)
307                        .map_err(BoxedError::new)
308                        .context(ExternalSnafu)?;
309                let table_metadata_manager = self
310                    .catalog_manager
311                    .as_any()
312                    .downcast_ref::<KvBackendCatalogManager>()
313                    .map(|manager| manager.table_metadata_manager_ref().clone())
314                    .context(UpgradeCatalogManagerRefSnafu)?;
315                let opts: HashMap<String, String> = table_metadata_manager
316                    .schema_manager()
317                    .get(SchemaNameKey::new(&catalog, &database))
318                    .await
319                    .context(TableMetadataManagerSnafu)?
320                    .context(SchemaNotFoundSnafu {
321                        schema_info: &database,
322                    })?
323                    .into_inner()
324                    .into();
325
326                self.show_create_database(&database, opts.into()).await
327            }
328            Statement::ShowCreateTable(show) => {
329                let (catalog, schema, table) =
330                    table_idents_to_full_name(&show.table_name, &query_ctx)
331                        .map_err(BoxedError::new)
332                        .context(ExternalSnafu)?;
333
334                let table_ref = self
335                    .catalog_manager
336                    .table(&catalog, &schema, &table, Some(&query_ctx))
337                    .await
338                    .context(CatalogSnafu)?
339                    .context(TableNotFoundSnafu { table_name: &table })?;
340                let table_name = TableName::new(catalog, schema, table);
341
342                match show.variant {
343                    ShowCreateTableVariant::Original => {
344                        self.show_create_table(table_name, table_ref, query_ctx)
345                            .await
346                    }
347                    ShowCreateTableVariant::PostgresForeignTable => {
348                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
349                            .await
350                    }
351                }
352            }
353            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
354            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
355            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
356            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
357            Statement::ShowColumns(show_columns) => {
358                self.show_columns(show_columns, query_ctx).await
359            }
360            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
361            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
362            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
363            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
364            Statement::Use(db) => self.use_database(db, query_ctx).await,
365            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
366        }
367    }
368
369    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
370        let catalog = query_ctx.current_catalog();
371        ensure!(
372            self.catalog_manager
373                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
374                .await
375                .context(CatalogSnafu)?,
376            SchemaNotFoundSnafu { schema_info: &db }
377        );
378
379        query_ctx.set_current_schema(&db);
380
381        Ok(Output::new_with_record_batches(RecordBatches::empty()))
382    }
383
384    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
385        let var_name = set_var.variable.to_string().to_uppercase();
386
387        match var_name.as_str() {
388            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
389
390            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
391                set_timezone(set_var.value, query_ctx)?
392            }
393
394            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
395
396            // Same as "bytea_output", we just ignore it here.
397            // Not harmful since it only relates to how date is viewed in client app's output.
398            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
399            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
400
401            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
402            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
403                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
404                Channel::Postgres => {
405                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
406                }
407                _ => {
408                    return NotSupportedSnafu {
409                        feat: format!("Unsupported set variable {}", var_name),
410                    }
411                    .fail()
412                }
413            },
414            "STATEMENT_TIMEOUT" => {
415                if query_ctx.channel() == Channel::Postgres {
416                    set_query_timeout(set_var.value, query_ctx)?
417                } else {
418                    return NotSupportedSnafu {
419                        feat: format!("Unsupported set variable {}", var_name),
420                    }
421                    .fail();
422                }
423            }
424            "SEARCH_PATH" => {
425                if query_ctx.channel() == Channel::Postgres {
426                    set_search_path(set_var.value, query_ctx)?
427                } else {
428                    return NotSupportedSnafu {
429                        feat: format!("Unsupported set variable {}", var_name),
430                    }
431                    .fail();
432                }
433            }
434            _ => {
435                // for postgres, we give unknown SET statements a warning with
436                //  success, this is prevent the SET call becoming a blocker
437                //  of connection establishment
438                //
439                if query_ctx.channel() == Channel::Postgres {
440                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
441                } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
442                    // Just ignore `SET @@` commands for MySQL
443                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
444                } else {
445                    return NotSupportedSnafu {
446                        feat: format!("Unsupported set variable {}", var_name),
447                    }
448                    .fail();
449                }
450            }
451        }
452        Ok(Output::new_with_affected_rows(0))
453    }
454
455    #[tracing::instrument(skip_all)]
456    pub async fn plan(
457        &self,
458        stmt: &QueryStatement,
459        query_ctx: QueryContextRef,
460    ) -> Result<LogicalPlan> {
461        self.query_engine
462            .planner()
463            .plan(stmt, query_ctx)
464            .await
465            .context(PlanStatementSnafu)
466    }
467
468    /// Execute [`LogicalPlan`] directly.
469    #[tracing::instrument(skip_all)]
470    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
471        self.query_engine
472            .execute(plan, query_ctx)
473            .await
474            .context(ExecLogicalPlanSnafu)
475    }
476
477    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
478        self.query_engine
479            .planner()
480            .optimize(plan)
481            .context(PlanStatementSnafu)
482    }
483
484    #[tracing::instrument(skip_all)]
485    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
486        let timeout = derive_timeout(&stmt, &query_ctx);
487        match timeout {
488            Some(timeout) => {
489                let start = tokio::time::Instant::now();
490                let output = tokio::time::timeout(timeout, self.plan_exec_inner(stmt, query_ctx))
491                    .await
492                    .context(StatementTimeoutSnafu)?;
493                // compute remaining timeout
494                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
495                Ok(attach_timeout(output?, remaining_timeout))
496            }
497            None => self.plan_exec_inner(stmt, query_ctx).await,
498        }
499    }
500
501    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
502        let TableReference {
503            catalog,
504            schema,
505            table,
506        } = table_ref;
507        self.catalog_manager
508            .table(catalog, schema, table, None)
509            .await
510            .context(CatalogSnafu)?
511            .with_context(|| TableNotFoundSnafu {
512                table_name: table_ref.to_string(),
513            })
514    }
515
516    async fn plan_exec_inner(
517        &self,
518        stmt: QueryStatement,
519        query_ctx: QueryContextRef,
520    ) -> Result<Output> {
521        let plan = self.plan(&stmt, query_ctx.clone()).await?;
522        self.exec_plan(plan, query_ctx).await
523    }
524}
525
526fn attach_timeout(output: Output, mut timeout: Duration) -> Output {
527    match output.data {
528        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
529        OutputData::Stream(mut stream) => {
530            let schema = stream.schema();
531            let s = Box::pin(stream! {
532                let start = tokio::time::Instant::now();
533                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.context(StreamTimeoutSnafu)? {
534                    yield item;
535                    timeout = timeout.checked_sub(tokio::time::Instant::now() - start).unwrap_or(Duration::ZERO);
536                }
537            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
538            let stream = RecordBatchStreamWrapper {
539                schema,
540                stream: s,
541                output_ordering: None,
542                metrics: Default::default(),
543            };
544            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
545        }
546    }
547}
548
549/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
550/// For MySQL, it applies only to read-only statements.
551fn derive_timeout(stmt: &QueryStatement, query_ctx: &QueryContextRef) -> Option<Duration> {
552    let query_timeout = query_ctx.query_timeout()?;
553    match (query_ctx.channel(), stmt) {
554        (Channel::Mysql, QueryStatement::Sql(Statement::Query(_)))
555        | (Channel::Postgres, QueryStatement::Sql(_)) => Some(query_timeout),
556        (_, _) => None,
557    }
558}
559
560fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
561    let CopyQueryToArgument {
562        with,
563        connection,
564        location,
565    } = stmt;
566
567    Ok(CopyQueryToRequest {
568        location,
569        with: with.into_map(),
570        connection: connection.into_map(),
571    })
572}
573
574fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
575    let direction = match stmt {
576        CopyTable::To(_) => CopyDirection::Export,
577        CopyTable::From(_) => CopyDirection::Import,
578    };
579
580    let CopyTableArgument {
581        location,
582        connection,
583        with,
584        table_name,
585        limit,
586        ..
587    } = match stmt {
588        CopyTable::To(arg) => arg,
589        CopyTable::From(arg) => arg,
590    };
591    let (catalog_name, schema_name, table_name) =
592        table_idents_to_full_name(&table_name, &query_ctx)
593            .map_err(BoxedError::new)
594            .context(ExternalSnafu)?;
595
596    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
597
598    let pattern = with
599        .get(common_datasource::file_format::FILE_PATTERN)
600        .cloned();
601
602    Ok(CopyTableRequest {
603        catalog_name,
604        schema_name,
605        table_name,
606        location,
607        with: with.into_map(),
608        connection: connection.into_map(),
609        pattern,
610        direction,
611        timestamp_range,
612        limit,
613    })
614}
615
616/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
617/// This function extracts the necessary info including catalog/database name, time range, etc.
618fn to_copy_database_request(
619    arg: CopyDatabaseArgument,
620    query_ctx: &QueryContextRef,
621) -> Result<CopyDatabaseRequest> {
622    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
623        .map_err(BoxedError::new)
624        .context(ExternalSnafu)?;
625    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
626
627    Ok(CopyDatabaseRequest {
628        catalog_name,
629        schema_name: database_name,
630        location: arg.location,
631        with: arg.with.into_map(),
632        connection: arg.connection.into_map(),
633        time_range,
634    })
635}
636
637/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
638/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
639/// The timezone used for conversion will respect that inside `query_ctx`.
640fn timestamp_range_from_option_map(
641    options: &OptionMap,
642    query_ctx: &QueryContextRef,
643) -> Result<Option<TimestampRange>> {
644    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
645    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
646    let time_range = match (start_timestamp, end_timestamp) {
647        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
648            error::InvalidTimestampRangeSnafu {
649                start: start.to_iso8601_string(),
650                end: end.to_iso8601_string(),
651            }
652        })?),
653        (Some(start), None) => Some(TimestampRange::from_start(start)),
654        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
655        (None, None) => None,
656    };
657    Ok(time_range)
658}
659
660/// Extracts timestamp from a [HashMap<String, String>] with given key.
661fn extract_timestamp(
662    map: &OptionMap,
663    key: &str,
664    query_ctx: &QueryContextRef,
665) -> Result<Option<Timestamp>> {
666    map.get(key)
667        .map(|v| {
668            Timestamp::from_str(v, Some(&query_ctx.timezone()))
669                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
670        })
671        .transpose()
672}
673
674fn idents_to_full_database_name(
675    obj_name: &ObjectName,
676    query_ctx: &QueryContextRef,
677) -> Result<(String, String)> {
678    match &obj_name.0[..] {
679        [database] => Ok((
680            query_ctx.current_catalog().to_owned(),
681            database.value.clone(),
682        )),
683        [catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
684        _ => InvalidSqlSnafu {
685            err_msg: format!(
686                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
687            ),
688        }
689        .fail(),
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use std::assert_matches::assert_matches;
696    use std::collections::HashMap;
697
698    use common_time::range::TimestampRange;
699    use common_time::{Timestamp, Timezone};
700    use session::context::QueryContextBuilder;
701    use sql::statements::OptionMap;
702
703    use crate::error;
704    use crate::statement::copy_database::{
705        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
706    };
707    use crate::statement::timestamp_range_from_option_map;
708
709    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
710        let query_ctx = QueryContextBuilder::default()
711            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
712            .build()
713            .into();
714        let map = OptionMap::from(
715            [
716                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
717                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
718            ]
719            .into_iter()
720            .collect::<HashMap<_, _>>(),
721        );
722        timestamp_range_from_option_map(&map, &query_ctx)
723    }
724
725    #[test]
726    fn test_timestamp_range_from_option_map() {
727        assert_eq!(
728            Some(
729                TimestampRange::new(
730                    Timestamp::new_second(1649635200),
731                    Timestamp::new_second(1649664000),
732                )
733                .unwrap(),
734            ),
735            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
736        );
737
738        assert_matches!(
739            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
740            error::Error::InvalidTimestampRange { .. }
741        );
742    }
743}