1mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21mod ddl;
22mod describe;
23mod dml;
24mod set;
25mod show;
26mod tql;
27
28use std::collections::HashMap;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32
33use async_stream::stream;
34use catalog::kvbackend::KvBackendCatalogManager;
35use catalog::CatalogManagerRef;
36use client::{OutputData, RecordBatches};
37use common_error::ext::BoxedError;
38use common_meta::cache::TableRouteCacheRef;
39use common_meta::cache_invalidator::CacheInvalidatorRef;
40use common_meta::ddl::ProcedureExecutorRef;
41use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
42use common_meta::key::schema_name::SchemaNameKey;
43use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
44use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
45use common_meta::kv_backend::KvBackendRef;
46use common_query::Output;
47use common_recordbatch::error::StreamTimeoutSnafu;
48use common_recordbatch::RecordBatchStreamWrapper;
49use common_telemetry::tracing;
50use common_time::range::TimestampRange;
51use common_time::Timestamp;
52use datafusion_expr::LogicalPlan;
53use futures::stream::{Stream, StreamExt};
54use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
55use query::parser::QueryStatement;
56use query::QueryEngineRef;
57use session::context::{Channel, QueryContextRef};
58use session::table_name::table_idents_to_full_name;
59use set::{set_query_timeout, set_read_preference};
60use snafu::{ensure, OptionExt, ResultExt};
61use sql::statements::copy::{
62 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
63};
64use sql::statements::set_variables::SetVariables;
65use sql::statements::show::ShowCreateTableVariant;
66use sql::statements::statement::Statement;
67use sql::statements::OptionMap;
68use sql::util::format_raw_object_name;
69use sqlparser::ast::ObjectName;
70use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
71use table::table_name::TableName;
72use table::table_reference::TableReference;
73use table::TableRef;
74
75use self::set::{
76 set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
77};
78use crate::error::{
79 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
80 PlanStatementSnafu, Result, SchemaNotFoundSnafu, StatementTimeoutSnafu,
81 TableMetadataManagerSnafu, TableNotFoundSnafu, UpgradeCatalogManagerRefSnafu,
82};
83use crate::insert::InserterRef;
84use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
85
86#[derive(Clone)]
87pub struct StatementExecutor {
88 catalog_manager: CatalogManagerRef,
89 query_engine: QueryEngineRef,
90 procedure_executor: ProcedureExecutorRef,
91 table_metadata_manager: TableMetadataManagerRef,
92 flow_metadata_manager: FlowMetadataManagerRef,
93 view_info_manager: ViewInfoManagerRef,
94 partition_manager: PartitionRuleManagerRef,
95 cache_invalidator: CacheInvalidatorRef,
96 inserter: InserterRef,
97}
98
99pub type StatementExecutorRef = Arc<StatementExecutor>;
100
101impl StatementExecutor {
102 pub fn new(
103 catalog_manager: CatalogManagerRef,
104 query_engine: QueryEngineRef,
105 procedure_executor: ProcedureExecutorRef,
106 kv_backend: KvBackendRef,
107 cache_invalidator: CacheInvalidatorRef,
108 inserter: InserterRef,
109 table_route_cache: TableRouteCacheRef,
110 ) -> Self {
111 Self {
112 catalog_manager,
113 query_engine,
114 procedure_executor,
115 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
116 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
117 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
118 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
119 cache_invalidator,
120 inserter,
121 }
122 }
123
124 #[cfg(feature = "testing")]
125 pub async fn execute_stmt(
126 &self,
127 stmt: QueryStatement,
128 query_ctx: QueryContextRef,
129 ) -> Result<Output> {
130 match stmt {
131 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
132 QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
133 }
134 }
135
136 #[tracing::instrument(skip_all)]
137 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
138 match stmt {
139 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
140 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
141 }
142
143 Statement::DeclareCursor(declare_cursor) => {
144 self.declare_cursor(declare_cursor, query_ctx).await
145 }
146 Statement::FetchCursor(fetch_cursor) => {
147 self.fetch_cursor(fetch_cursor, query_ctx).await
148 }
149 Statement::CloseCursor(close_cursor) => {
150 self.close_cursor(close_cursor, query_ctx).await
151 }
152
153 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
154
155 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
156
157 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
158
159 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
160
161 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
162
163 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
164
165 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
166
167 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
168
169 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
170
171 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
172
173 #[cfg(feature = "enterprise")]
174 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
175
176 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
177 let query_output = self
178 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
179 .await?;
180 let req = to_copy_query_request(stmt.arg)?;
181
182 self.copy_query_to(req, query_output)
183 .await
184 .map(Output::new_with_affected_rows)
185 }
186
187 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
188 let req = to_copy_table_request(stmt, query_ctx.clone())?;
189 match req.direction {
190 CopyDirection::Export => self
191 .copy_table_to(req, query_ctx)
192 .await
193 .map(Output::new_with_affected_rows),
194 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
195 }
196 }
197
198 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
199 match copy_database {
200 CopyDatabase::To(arg) => {
201 self.copy_database_to(
202 to_copy_database_request(arg, &query_ctx)?,
203 query_ctx.clone(),
204 )
205 .await
206 }
207 CopyDatabase::From(arg) => {
208 self.copy_database_from(
209 to_copy_database_request(arg, &query_ctx)?,
210 query_ctx,
211 )
212 .await
213 }
214 }
215 }
216
217 Statement::CreateTable(stmt) => {
218 let _ = self.create_table(stmt, query_ctx).await?;
219 Ok(Output::new_with_affected_rows(0))
220 }
221 Statement::CreateTableLike(stmt) => {
222 let _ = self.create_table_like(stmt, query_ctx).await?;
223 Ok(Output::new_with_affected_rows(0))
224 }
225 Statement::CreateExternalTable(stmt) => {
226 let _ = self.create_external_table(stmt, query_ctx).await?;
227 Ok(Output::new_with_affected_rows(0))
228 }
229 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
230 #[cfg(feature = "enterprise")]
231 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
232 Statement::DropFlow(stmt) => {
233 self.drop_flow(
234 query_ctx.current_catalog().to_string(),
235 format_raw_object_name(stmt.flow_name()),
236 stmt.drop_if_exists(),
237 query_ctx,
238 )
239 .await
240 }
241 Statement::CreateView(stmt) => {
242 let _ = self.create_view(stmt, query_ctx).await?;
243 Ok(Output::new_with_affected_rows(0))
244 }
245 Statement::DropView(stmt) => {
246 let (catalog_name, schema_name, view_name) =
247 table_idents_to_full_name(&stmt.view_name, &query_ctx)
248 .map_err(BoxedError::new)
249 .context(ExternalSnafu)?;
250
251 self.drop_view(
252 catalog_name,
253 schema_name,
254 view_name,
255 stmt.drop_if_exists,
256 query_ctx,
257 )
258 .await
259 }
260 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
261
262 Statement::AlterDatabase(alter_database) => {
263 self.alter_database(alter_database, query_ctx).await
264 }
265
266 Statement::DropTable(stmt) => {
267 let mut table_names = Vec::with_capacity(stmt.table_names().len());
268 for table_name_stmt in stmt.table_names() {
269 let (catalog, schema, table) =
270 table_idents_to_full_name(table_name_stmt, &query_ctx)
271 .map_err(BoxedError::new)
272 .context(ExternalSnafu)?;
273 table_names.push(TableName::new(catalog, schema, table));
274 }
275 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
276 .await
277 }
278 Statement::DropDatabase(stmt) => {
279 self.drop_database(
280 query_ctx.current_catalog().to_string(),
281 format_raw_object_name(stmt.name()),
282 stmt.drop_if_exists(),
283 query_ctx,
284 )
285 .await
286 }
287 Statement::TruncateTable(stmt) => {
288 let (catalog, schema, table) =
289 table_idents_to_full_name(stmt.table_name(), &query_ctx)
290 .map_err(BoxedError::new)
291 .context(ExternalSnafu)?;
292 let table_name = TableName::new(catalog, schema, table);
293 self.truncate_table(table_name, query_ctx).await
294 }
295 Statement::CreateDatabase(stmt) => {
296 self.create_database(
297 &format_raw_object_name(&stmt.name),
298 stmt.if_not_exists,
299 stmt.options.into_map(),
300 query_ctx,
301 )
302 .await
303 }
304 Statement::ShowCreateDatabase(show) => {
305 let (catalog, database) =
306 idents_to_full_database_name(&show.database_name, &query_ctx)
307 .map_err(BoxedError::new)
308 .context(ExternalSnafu)?;
309 let table_metadata_manager = self
310 .catalog_manager
311 .as_any()
312 .downcast_ref::<KvBackendCatalogManager>()
313 .map(|manager| manager.table_metadata_manager_ref().clone())
314 .context(UpgradeCatalogManagerRefSnafu)?;
315 let opts: HashMap<String, String> = table_metadata_manager
316 .schema_manager()
317 .get(SchemaNameKey::new(&catalog, &database))
318 .await
319 .context(TableMetadataManagerSnafu)?
320 .context(SchemaNotFoundSnafu {
321 schema_info: &database,
322 })?
323 .into_inner()
324 .into();
325
326 self.show_create_database(&database, opts.into()).await
327 }
328 Statement::ShowCreateTable(show) => {
329 let (catalog, schema, table) =
330 table_idents_to_full_name(&show.table_name, &query_ctx)
331 .map_err(BoxedError::new)
332 .context(ExternalSnafu)?;
333
334 let table_ref = self
335 .catalog_manager
336 .table(&catalog, &schema, &table, Some(&query_ctx))
337 .await
338 .context(CatalogSnafu)?
339 .context(TableNotFoundSnafu { table_name: &table })?;
340 let table_name = TableName::new(catalog, schema, table);
341
342 match show.variant {
343 ShowCreateTableVariant::Original => {
344 self.show_create_table(table_name, table_ref, query_ctx)
345 .await
346 }
347 ShowCreateTableVariant::PostgresForeignTable => {
348 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
349 .await
350 }
351 }
352 }
353 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
354 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
355 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
356 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
357 Statement::ShowColumns(show_columns) => {
358 self.show_columns(show_columns, query_ctx).await
359 }
360 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
361 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
362 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
363 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
364 Statement::Use(db) => self.use_database(db, query_ctx).await,
365 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
366 }
367 }
368
369 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
370 let catalog = query_ctx.current_catalog();
371 ensure!(
372 self.catalog_manager
373 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
374 .await
375 .context(CatalogSnafu)?,
376 SchemaNotFoundSnafu { schema_info: &db }
377 );
378
379 query_ctx.set_current_schema(&db);
380
381 Ok(Output::new_with_record_batches(RecordBatches::empty()))
382 }
383
384 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
385 let var_name = set_var.variable.to_string().to_uppercase();
386
387 match var_name.as_str() {
388 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
389
390 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
391 set_timezone(set_var.value, query_ctx)?
392 }
393
394 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
395
396 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
400
401 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
402 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
403 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
404 Channel::Postgres => {
405 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
406 }
407 _ => {
408 return NotSupportedSnafu {
409 feat: format!("Unsupported set variable {}", var_name),
410 }
411 .fail()
412 }
413 },
414 "STATEMENT_TIMEOUT" => {
415 if query_ctx.channel() == Channel::Postgres {
416 set_query_timeout(set_var.value, query_ctx)?
417 } else {
418 return NotSupportedSnafu {
419 feat: format!("Unsupported set variable {}", var_name),
420 }
421 .fail();
422 }
423 }
424 "SEARCH_PATH" => {
425 if query_ctx.channel() == Channel::Postgres {
426 set_search_path(set_var.value, query_ctx)?
427 } else {
428 return NotSupportedSnafu {
429 feat: format!("Unsupported set variable {}", var_name),
430 }
431 .fail();
432 }
433 }
434 _ => {
435 if query_ctx.channel() == Channel::Postgres {
440 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
441 } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
442 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
444 } else {
445 return NotSupportedSnafu {
446 feat: format!("Unsupported set variable {}", var_name),
447 }
448 .fail();
449 }
450 }
451 }
452 Ok(Output::new_with_affected_rows(0))
453 }
454
455 #[tracing::instrument(skip_all)]
456 pub async fn plan(
457 &self,
458 stmt: &QueryStatement,
459 query_ctx: QueryContextRef,
460 ) -> Result<LogicalPlan> {
461 self.query_engine
462 .planner()
463 .plan(stmt, query_ctx)
464 .await
465 .context(PlanStatementSnafu)
466 }
467
468 #[tracing::instrument(skip_all)]
470 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
471 self.query_engine
472 .execute(plan, query_ctx)
473 .await
474 .context(ExecLogicalPlanSnafu)
475 }
476
477 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
478 self.query_engine
479 .planner()
480 .optimize(plan)
481 .context(PlanStatementSnafu)
482 }
483
484 #[tracing::instrument(skip_all)]
485 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
486 let timeout = derive_timeout(&stmt, &query_ctx);
487 match timeout {
488 Some(timeout) => {
489 let start = tokio::time::Instant::now();
490 let output = tokio::time::timeout(timeout, self.plan_exec_inner(stmt, query_ctx))
491 .await
492 .context(StatementTimeoutSnafu)?;
493 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
495 Ok(attach_timeout(output?, remaining_timeout))
496 }
497 None => self.plan_exec_inner(stmt, query_ctx).await,
498 }
499 }
500
501 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
502 let TableReference {
503 catalog,
504 schema,
505 table,
506 } = table_ref;
507 self.catalog_manager
508 .table(catalog, schema, table, None)
509 .await
510 .context(CatalogSnafu)?
511 .with_context(|| TableNotFoundSnafu {
512 table_name: table_ref.to_string(),
513 })
514 }
515
516 async fn plan_exec_inner(
517 &self,
518 stmt: QueryStatement,
519 query_ctx: QueryContextRef,
520 ) -> Result<Output> {
521 let plan = self.plan(&stmt, query_ctx.clone()).await?;
522 self.exec_plan(plan, query_ctx).await
523 }
524}
525
526fn attach_timeout(output: Output, mut timeout: Duration) -> Output {
527 match output.data {
528 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
529 OutputData::Stream(mut stream) => {
530 let schema = stream.schema();
531 let s = Box::pin(stream! {
532 let start = tokio::time::Instant::now();
533 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.context(StreamTimeoutSnafu)? {
534 yield item;
535 timeout = timeout.checked_sub(tokio::time::Instant::now() - start).unwrap_or(Duration::ZERO);
536 }
537 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
538 let stream = RecordBatchStreamWrapper {
539 schema,
540 stream: s,
541 output_ordering: None,
542 metrics: Default::default(),
543 };
544 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
545 }
546 }
547}
548
549fn derive_timeout(stmt: &QueryStatement, query_ctx: &QueryContextRef) -> Option<Duration> {
552 let query_timeout = query_ctx.query_timeout()?;
553 match (query_ctx.channel(), stmt) {
554 (Channel::Mysql, QueryStatement::Sql(Statement::Query(_)))
555 | (Channel::Postgres, QueryStatement::Sql(_)) => Some(query_timeout),
556 (_, _) => None,
557 }
558}
559
560fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
561 let CopyQueryToArgument {
562 with,
563 connection,
564 location,
565 } = stmt;
566
567 Ok(CopyQueryToRequest {
568 location,
569 with: with.into_map(),
570 connection: connection.into_map(),
571 })
572}
573
574fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
575 let direction = match stmt {
576 CopyTable::To(_) => CopyDirection::Export,
577 CopyTable::From(_) => CopyDirection::Import,
578 };
579
580 let CopyTableArgument {
581 location,
582 connection,
583 with,
584 table_name,
585 limit,
586 ..
587 } = match stmt {
588 CopyTable::To(arg) => arg,
589 CopyTable::From(arg) => arg,
590 };
591 let (catalog_name, schema_name, table_name) =
592 table_idents_to_full_name(&table_name, &query_ctx)
593 .map_err(BoxedError::new)
594 .context(ExternalSnafu)?;
595
596 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
597
598 let pattern = with
599 .get(common_datasource::file_format::FILE_PATTERN)
600 .cloned();
601
602 Ok(CopyTableRequest {
603 catalog_name,
604 schema_name,
605 table_name,
606 location,
607 with: with.into_map(),
608 connection: connection.into_map(),
609 pattern,
610 direction,
611 timestamp_range,
612 limit,
613 })
614}
615
616fn to_copy_database_request(
619 arg: CopyDatabaseArgument,
620 query_ctx: &QueryContextRef,
621) -> Result<CopyDatabaseRequest> {
622 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
623 .map_err(BoxedError::new)
624 .context(ExternalSnafu)?;
625 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
626
627 Ok(CopyDatabaseRequest {
628 catalog_name,
629 schema_name: database_name,
630 location: arg.location,
631 with: arg.with.into_map(),
632 connection: arg.connection.into_map(),
633 time_range,
634 })
635}
636
637fn timestamp_range_from_option_map(
641 options: &OptionMap,
642 query_ctx: &QueryContextRef,
643) -> Result<Option<TimestampRange>> {
644 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
645 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
646 let time_range = match (start_timestamp, end_timestamp) {
647 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
648 error::InvalidTimestampRangeSnafu {
649 start: start.to_iso8601_string(),
650 end: end.to_iso8601_string(),
651 }
652 })?),
653 (Some(start), None) => Some(TimestampRange::from_start(start)),
654 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
656 };
657 Ok(time_range)
658}
659
660fn extract_timestamp(
662 map: &OptionMap,
663 key: &str,
664 query_ctx: &QueryContextRef,
665) -> Result<Option<Timestamp>> {
666 map.get(key)
667 .map(|v| {
668 Timestamp::from_str(v, Some(&query_ctx.timezone()))
669 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
670 })
671 .transpose()
672}
673
674fn idents_to_full_database_name(
675 obj_name: &ObjectName,
676 query_ctx: &QueryContextRef,
677) -> Result<(String, String)> {
678 match &obj_name.0[..] {
679 [database] => Ok((
680 query_ctx.current_catalog().to_owned(),
681 database.value.clone(),
682 )),
683 [catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
684 _ => InvalidSqlSnafu {
685 err_msg: format!(
686 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
687 ),
688 }
689 .fail(),
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use std::assert_matches::assert_matches;
696 use std::collections::HashMap;
697
698 use common_time::range::TimestampRange;
699 use common_time::{Timestamp, Timezone};
700 use session::context::QueryContextBuilder;
701 use sql::statements::OptionMap;
702
703 use crate::error;
704 use crate::statement::copy_database::{
705 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
706 };
707 use crate::statement::timestamp_range_from_option_map;
708
709 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
710 let query_ctx = QueryContextBuilder::default()
711 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
712 .build()
713 .into();
714 let map = OptionMap::from(
715 [
716 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
717 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
718 ]
719 .into_iter()
720 .collect::<HashMap<_, _>>(),
721 );
722 timestamp_range_from_option_map(&map, &query_ctx)
723 }
724
725 #[test]
726 fn test_timestamp_range_from_option_map() {
727 assert_eq!(
728 Some(
729 TimestampRange::new(
730 Timestamp::new_second(1649635200),
731 Timestamp::new_second(1649664000),
732 )
733 .unwrap(),
734 ),
735 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
736 );
737
738 assert_matches!(
739 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
740 error::Error::InvalidTimestampRange { .. }
741 );
742 }
743}