1mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::pin::Pin;
31use std::sync::Arc;
32use std::time::Duration;
33
34use async_stream::stream;
35use catalog::kvbackend::KvBackendCatalogManager;
36use catalog::process_manager::ProcessManagerRef;
37use catalog::CatalogManagerRef;
38use client::{OutputData, RecordBatches};
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::ddl::ProcedureExecutorRef;
43use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
44use common_meta::key::schema_name::SchemaNameKey;
45use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
46use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
47use common_meta::kv_backend::KvBackendRef;
48use common_query::Output;
49use common_recordbatch::error::StreamTimeoutSnafu;
50use common_recordbatch::RecordBatchStreamWrapper;
51use common_telemetry::tracing;
52use common_time::range::TimestampRange;
53use common_time::Timestamp;
54use datafusion_expr::LogicalPlan;
55use futures::stream::{Stream, StreamExt};
56use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
57use query::parser::QueryStatement;
58use query::QueryEngineRef;
59use session::context::{Channel, QueryContextRef};
60use session::table_name::table_idents_to_full_name;
61use set::{set_query_timeout, set_read_preference};
62use snafu::{ensure, OptionExt, ResultExt};
63use sql::statements::copy::{
64 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
65};
66use sql::statements::set_variables::SetVariables;
67use sql::statements::show::ShowCreateTableVariant;
68use sql::statements::statement::Statement;
69use sql::statements::OptionMap;
70use sql::util::format_raw_object_name;
71use sqlparser::ast::ObjectName;
72use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
73use table::table_name::TableName;
74use table::table_reference::TableReference;
75use table::TableRef;
76
77use self::set::{
78 set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
79};
80use crate::error::{
81 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
82 PlanStatementSnafu, Result, SchemaNotFoundSnafu, StatementTimeoutSnafu,
83 TableMetadataManagerSnafu, TableNotFoundSnafu, UpgradeCatalogManagerRefSnafu,
84};
85use crate::insert::InserterRef;
86use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
87
88#[derive(Clone)]
89pub struct StatementExecutor {
90 catalog_manager: CatalogManagerRef,
91 query_engine: QueryEngineRef,
92 procedure_executor: ProcedureExecutorRef,
93 table_metadata_manager: TableMetadataManagerRef,
94 flow_metadata_manager: FlowMetadataManagerRef,
95 view_info_manager: ViewInfoManagerRef,
96 partition_manager: PartitionRuleManagerRef,
97 cache_invalidator: CacheInvalidatorRef,
98 inserter: InserterRef,
99 process_manager: Option<ProcessManagerRef>,
100}
101
102pub type StatementExecutorRef = Arc<StatementExecutor>;
103
104impl StatementExecutor {
105 #[allow(clippy::too_many_arguments)]
106 pub fn new(
107 catalog_manager: CatalogManagerRef,
108 query_engine: QueryEngineRef,
109 procedure_executor: ProcedureExecutorRef,
110 kv_backend: KvBackendRef,
111 cache_invalidator: CacheInvalidatorRef,
112 inserter: InserterRef,
113 table_route_cache: TableRouteCacheRef,
114 process_manager: Option<ProcessManagerRef>,
115 ) -> Self {
116 Self {
117 catalog_manager,
118 query_engine,
119 procedure_executor,
120 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
121 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
122 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
123 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
124 cache_invalidator,
125 inserter,
126 process_manager,
127 }
128 }
129
130 #[cfg(feature = "testing")]
131 pub async fn execute_stmt(
132 &self,
133 stmt: QueryStatement,
134 query_ctx: QueryContextRef,
135 ) -> Result<Output> {
136 match stmt {
137 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
138 QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
139 }
140 }
141
142 #[tracing::instrument(skip_all)]
143 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
144 match stmt {
145 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
146 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
147 }
148
149 Statement::DeclareCursor(declare_cursor) => {
150 self.declare_cursor(declare_cursor, query_ctx).await
151 }
152 Statement::FetchCursor(fetch_cursor) => {
153 self.fetch_cursor(fetch_cursor, query_ctx).await
154 }
155 Statement::CloseCursor(close_cursor) => {
156 self.close_cursor(close_cursor, query_ctx).await
157 }
158
159 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
160
161 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
162
163 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
164
165 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
166
167 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
168
169 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
170
171 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
172
173 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
174
175 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
176
177 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
178
179 #[cfg(feature = "enterprise")]
180 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
181
182 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
183 let query_output = self
184 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
185 .await?;
186 let req = to_copy_query_request(stmt.arg)?;
187
188 self.copy_query_to(req, query_output)
189 .await
190 .map(Output::new_with_affected_rows)
191 }
192
193 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
194 let req = to_copy_table_request(stmt, query_ctx.clone())?;
195 match req.direction {
196 CopyDirection::Export => self
197 .copy_table_to(req, query_ctx)
198 .await
199 .map(Output::new_with_affected_rows),
200 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
201 }
202 }
203
204 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
205 match copy_database {
206 CopyDatabase::To(arg) => {
207 self.copy_database_to(
208 to_copy_database_request(arg, &query_ctx)?,
209 query_ctx.clone(),
210 )
211 .await
212 }
213 CopyDatabase::From(arg) => {
214 self.copy_database_from(
215 to_copy_database_request(arg, &query_ctx)?,
216 query_ctx,
217 )
218 .await
219 }
220 }
221 }
222
223 Statement::CreateTable(stmt) => {
224 let _ = self.create_table(stmt, query_ctx).await?;
225 Ok(Output::new_with_affected_rows(0))
226 }
227 Statement::CreateTableLike(stmt) => {
228 let _ = self.create_table_like(stmt, query_ctx).await?;
229 Ok(Output::new_with_affected_rows(0))
230 }
231 Statement::CreateExternalTable(stmt) => {
232 let _ = self.create_external_table(stmt, query_ctx).await?;
233 Ok(Output::new_with_affected_rows(0))
234 }
235 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
236 #[cfg(feature = "enterprise")]
237 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
238 Statement::DropFlow(stmt) => {
239 self.drop_flow(
240 query_ctx.current_catalog().to_string(),
241 format_raw_object_name(stmt.flow_name()),
242 stmt.drop_if_exists(),
243 query_ctx,
244 )
245 .await
246 }
247 #[cfg(feature = "enterprise")]
248 Statement::DropTrigger(stmt) => {
249 self.drop_trigger(
250 query_ctx.current_catalog().to_string(),
251 format_raw_object_name(stmt.trigger_name()),
252 stmt.drop_if_exists(),
253 query_ctx,
254 )
255 .await
256 }
257 Statement::CreateView(stmt) => {
258 let _ = self.create_view(stmt, query_ctx).await?;
259 Ok(Output::new_with_affected_rows(0))
260 }
261 Statement::DropView(stmt) => {
262 let (catalog_name, schema_name, view_name) =
263 table_idents_to_full_name(&stmt.view_name, &query_ctx)
264 .map_err(BoxedError::new)
265 .context(ExternalSnafu)?;
266
267 self.drop_view(
268 catalog_name,
269 schema_name,
270 view_name,
271 stmt.drop_if_exists,
272 query_ctx,
273 )
274 .await
275 }
276 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
277
278 Statement::AlterDatabase(alter_database) => {
279 self.alter_database(alter_database, query_ctx).await
280 }
281
282 Statement::DropTable(stmt) => {
283 let mut table_names = Vec::with_capacity(stmt.table_names().len());
284 for table_name_stmt in stmt.table_names() {
285 let (catalog, schema, table) =
286 table_idents_to_full_name(table_name_stmt, &query_ctx)
287 .map_err(BoxedError::new)
288 .context(ExternalSnafu)?;
289 table_names.push(TableName::new(catalog, schema, table));
290 }
291 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
292 .await
293 }
294 Statement::DropDatabase(stmt) => {
295 self.drop_database(
296 query_ctx.current_catalog().to_string(),
297 format_raw_object_name(stmt.name()),
298 stmt.drop_if_exists(),
299 query_ctx,
300 )
301 .await
302 }
303 Statement::TruncateTable(stmt) => {
304 let (catalog, schema, table) =
305 table_idents_to_full_name(stmt.table_name(), &query_ctx)
306 .map_err(BoxedError::new)
307 .context(ExternalSnafu)?;
308 let table_name = TableName::new(catalog, schema, table);
309 self.truncate_table(table_name, query_ctx).await
310 }
311 Statement::CreateDatabase(stmt) => {
312 self.create_database(
313 &format_raw_object_name(&stmt.name),
314 stmt.if_not_exists,
315 stmt.options.into_map(),
316 query_ctx,
317 )
318 .await
319 }
320 Statement::ShowCreateDatabase(show) => {
321 let (catalog, database) =
322 idents_to_full_database_name(&show.database_name, &query_ctx)
323 .map_err(BoxedError::new)
324 .context(ExternalSnafu)?;
325 let table_metadata_manager = self
326 .catalog_manager
327 .as_any()
328 .downcast_ref::<KvBackendCatalogManager>()
329 .map(|manager| manager.table_metadata_manager_ref().clone())
330 .context(UpgradeCatalogManagerRefSnafu)?;
331 let opts: HashMap<String, String> = table_metadata_manager
332 .schema_manager()
333 .get(SchemaNameKey::new(&catalog, &database))
334 .await
335 .context(TableMetadataManagerSnafu)?
336 .context(SchemaNotFoundSnafu {
337 schema_info: &database,
338 })?
339 .into_inner()
340 .into();
341
342 self.show_create_database(&database, opts.into()).await
343 }
344 Statement::ShowCreateTable(show) => {
345 let (catalog, schema, table) =
346 table_idents_to_full_name(&show.table_name, &query_ctx)
347 .map_err(BoxedError::new)
348 .context(ExternalSnafu)?;
349
350 let table_ref = self
351 .catalog_manager
352 .table(&catalog, &schema, &table, Some(&query_ctx))
353 .await
354 .context(CatalogSnafu)?
355 .context(TableNotFoundSnafu { table_name: &table })?;
356 let table_name = TableName::new(catalog, schema, table);
357
358 match show.variant {
359 ShowCreateTableVariant::Original => {
360 self.show_create_table(table_name, table_ref, query_ctx)
361 .await
362 }
363 ShowCreateTableVariant::PostgresForeignTable => {
364 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
365 .await
366 }
367 }
368 }
369 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
370 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
371 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
372 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
373 Statement::ShowColumns(show_columns) => {
374 self.show_columns(show_columns, query_ctx).await
375 }
376 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
377 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
378 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
379 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
380 Statement::Use(db) => self.use_database(db, query_ctx).await,
381 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
382 Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
383 Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
384 }
385 }
386
387 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
388 let catalog = query_ctx.current_catalog();
389 ensure!(
390 self.catalog_manager
391 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
392 .await
393 .context(CatalogSnafu)?,
394 SchemaNotFoundSnafu { schema_info: &db }
395 );
396
397 query_ctx.set_current_schema(&db);
398
399 Ok(Output::new_with_record_batches(RecordBatches::empty()))
400 }
401
402 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
403 let var_name = set_var.variable.to_string().to_uppercase();
404
405 match var_name.as_str() {
406 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
407
408 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
409 set_timezone(set_var.value, query_ctx)?
410 }
411
412 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
413
414 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
418
419 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
420 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
421 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
422 Channel::Postgres => {
423 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
424 }
425 _ => {
426 return NotSupportedSnafu {
427 feat: format!("Unsupported set variable {}", var_name),
428 }
429 .fail()
430 }
431 },
432 "STATEMENT_TIMEOUT" => {
433 if query_ctx.channel() == Channel::Postgres {
434 set_query_timeout(set_var.value, query_ctx)?
435 } else {
436 return NotSupportedSnafu {
437 feat: format!("Unsupported set variable {}", var_name),
438 }
439 .fail();
440 }
441 }
442 "SEARCH_PATH" => {
443 if query_ctx.channel() == Channel::Postgres {
444 set_search_path(set_var.value, query_ctx)?
445 } else {
446 return NotSupportedSnafu {
447 feat: format!("Unsupported set variable {}", var_name),
448 }
449 .fail();
450 }
451 }
452 _ => {
453 if query_ctx.channel() == Channel::Postgres {
458 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
459 } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
460 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
462 } else {
463 return NotSupportedSnafu {
464 feat: format!("Unsupported set variable {}", var_name),
465 }
466 .fail();
467 }
468 }
469 }
470 Ok(Output::new_with_affected_rows(0))
471 }
472
473 #[tracing::instrument(skip_all)]
474 pub async fn plan(
475 &self,
476 stmt: &QueryStatement,
477 query_ctx: QueryContextRef,
478 ) -> Result<LogicalPlan> {
479 self.query_engine
480 .planner()
481 .plan(stmt, query_ctx)
482 .await
483 .context(PlanStatementSnafu)
484 }
485
486 #[tracing::instrument(skip_all)]
488 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
489 self.query_engine
490 .execute(plan, query_ctx)
491 .await
492 .context(ExecLogicalPlanSnafu)
493 }
494
495 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
496 self.query_engine
497 .planner()
498 .optimize(plan)
499 .context(PlanStatementSnafu)
500 }
501
502 #[tracing::instrument(skip_all)]
503 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
504 let timeout = derive_timeout(&stmt, &query_ctx);
505 match timeout {
506 Some(timeout) => {
507 let start = tokio::time::Instant::now();
508 let output = tokio::time::timeout(timeout, self.plan_exec_inner(stmt, query_ctx))
509 .await
510 .context(StatementTimeoutSnafu)?;
511 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
513 Ok(attach_timeout(output?, remaining_timeout))
514 }
515 None => self.plan_exec_inner(stmt, query_ctx).await,
516 }
517 }
518
519 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
520 let TableReference {
521 catalog,
522 schema,
523 table,
524 } = table_ref;
525 self.catalog_manager
526 .table(catalog, schema, table, None)
527 .await
528 .context(CatalogSnafu)?
529 .with_context(|| TableNotFoundSnafu {
530 table_name: table_ref.to_string(),
531 })
532 }
533
534 async fn plan_exec_inner(
535 &self,
536 stmt: QueryStatement,
537 query_ctx: QueryContextRef,
538 ) -> Result<Output> {
539 let plan = self.plan(&stmt, query_ctx.clone()).await?;
540 self.exec_plan(plan, query_ctx).await
541 }
542}
543
544fn attach_timeout(output: Output, mut timeout: Duration) -> Output {
545 match output.data {
546 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
547 OutputData::Stream(mut stream) => {
548 let schema = stream.schema();
549 let s = Box::pin(stream! {
550 let start = tokio::time::Instant::now();
551 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.context(StreamTimeoutSnafu)? {
552 yield item;
553 timeout = timeout.checked_sub(tokio::time::Instant::now() - start).unwrap_or(Duration::ZERO);
554 }
555 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
556 let stream = RecordBatchStreamWrapper {
557 schema,
558 stream: s,
559 output_ordering: None,
560 metrics: Default::default(),
561 };
562 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
563 }
564 }
565}
566
567fn derive_timeout(stmt: &QueryStatement, query_ctx: &QueryContextRef) -> Option<Duration> {
570 let query_timeout = query_ctx.query_timeout()?;
571 match (query_ctx.channel(), stmt) {
572 (Channel::Mysql, QueryStatement::Sql(Statement::Query(_)))
573 | (Channel::Postgres, QueryStatement::Sql(_)) => Some(query_timeout),
574 (_, _) => None,
575 }
576}
577
578fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
579 let CopyQueryToArgument {
580 with,
581 connection,
582 location,
583 } = stmt;
584
585 Ok(CopyQueryToRequest {
586 location,
587 with: with.into_map(),
588 connection: connection.into_map(),
589 })
590}
591
592fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
593 let direction = match stmt {
594 CopyTable::To(_) => CopyDirection::Export,
595 CopyTable::From(_) => CopyDirection::Import,
596 };
597
598 let CopyTableArgument {
599 location,
600 connection,
601 with,
602 table_name,
603 limit,
604 ..
605 } = match stmt {
606 CopyTable::To(arg) => arg,
607 CopyTable::From(arg) => arg,
608 };
609 let (catalog_name, schema_name, table_name) =
610 table_idents_to_full_name(&table_name, &query_ctx)
611 .map_err(BoxedError::new)
612 .context(ExternalSnafu)?;
613
614 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
615
616 let pattern = with
617 .get(common_datasource::file_format::FILE_PATTERN)
618 .cloned();
619
620 Ok(CopyTableRequest {
621 catalog_name,
622 schema_name,
623 table_name,
624 location,
625 with: with.into_map(),
626 connection: connection.into_map(),
627 pattern,
628 direction,
629 timestamp_range,
630 limit,
631 })
632}
633
634fn to_copy_database_request(
637 arg: CopyDatabaseArgument,
638 query_ctx: &QueryContextRef,
639) -> Result<CopyDatabaseRequest> {
640 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
641 .map_err(BoxedError::new)
642 .context(ExternalSnafu)?;
643 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
644
645 Ok(CopyDatabaseRequest {
646 catalog_name,
647 schema_name: database_name,
648 location: arg.location,
649 with: arg.with.into_map(),
650 connection: arg.connection.into_map(),
651 time_range,
652 })
653}
654
655fn timestamp_range_from_option_map(
659 options: &OptionMap,
660 query_ctx: &QueryContextRef,
661) -> Result<Option<TimestampRange>> {
662 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
663 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
664 let time_range = match (start_timestamp, end_timestamp) {
665 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
666 error::InvalidTimestampRangeSnafu {
667 start: start.to_iso8601_string(),
668 end: end.to_iso8601_string(),
669 }
670 })?),
671 (Some(start), None) => Some(TimestampRange::from_start(start)),
672 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
674 };
675 Ok(time_range)
676}
677
678fn extract_timestamp(
680 map: &OptionMap,
681 key: &str,
682 query_ctx: &QueryContextRef,
683) -> Result<Option<Timestamp>> {
684 map.get(key)
685 .map(|v| {
686 Timestamp::from_str(v, Some(&query_ctx.timezone()))
687 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
688 })
689 .transpose()
690}
691
692fn idents_to_full_database_name(
693 obj_name: &ObjectName,
694 query_ctx: &QueryContextRef,
695) -> Result<(String, String)> {
696 match &obj_name.0[..] {
697 [database] => Ok((
698 query_ctx.current_catalog().to_owned(),
699 database.value.clone(),
700 )),
701 [catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
702 _ => InvalidSqlSnafu {
703 err_msg: format!(
704 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
705 ),
706 }
707 .fail(),
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use std::assert_matches::assert_matches;
714 use std::collections::HashMap;
715
716 use common_time::range::TimestampRange;
717 use common_time::{Timestamp, Timezone};
718 use session::context::QueryContextBuilder;
719 use sql::statements::OptionMap;
720
721 use crate::error;
722 use crate::statement::copy_database::{
723 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
724 };
725 use crate::statement::timestamp_range_from_option_map;
726
727 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
728 let query_ctx = QueryContextBuilder::default()
729 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
730 .build()
731 .into();
732 let map = OptionMap::from(
733 [
734 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
735 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
736 ]
737 .into_iter()
738 .collect::<HashMap<_, _>>(),
739 );
740 timestamp_range_from_option_map(&map, &query_ctx)
741 }
742
743 #[test]
744 fn test_timestamp_range_from_option_map() {
745 assert_eq!(
746 Some(
747 TimestampRange::new(
748 Timestamp::new_second(1649635200),
749 Timestamp::new_second(1649664000),
750 )
751 .unwrap(),
752 ),
753 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
754 );
755
756 assert_matches!(
757 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
758 error::Error::InvalidTimestampRange { .. }
759 );
760 }
761}