1mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use catalog::kvbackend::KvBackendCatalogManager;
33use catalog::process_manager::ProcessManagerRef;
34use catalog::CatalogManagerRef;
35use client::RecordBatches;
36use common_error::ext::BoxedError;
37use common_meta::cache::TableRouteCacheRef;
38use common_meta::cache_invalidator::CacheInvalidatorRef;
39use common_meta::ddl::ProcedureExecutorRef;
40use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
41use common_meta::key::schema_name::SchemaNameKey;
42use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
43use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
44use common_meta::kv_backend::KvBackendRef;
45use common_query::Output;
46use common_telemetry::tracing;
47use common_time::range::TimestampRange;
48use common_time::Timestamp;
49use datafusion_expr::LogicalPlan;
50use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
51use query::parser::QueryStatement;
52use query::QueryEngineRef;
53use session::context::{Channel, QueryContextRef};
54use session::table_name::table_idents_to_full_name;
55use set::{set_query_timeout, set_read_preference};
56use snafu::{ensure, OptionExt, ResultExt};
57use sql::statements::copy::{
58 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
59};
60use sql::statements::set_variables::SetVariables;
61use sql::statements::show::ShowCreateTableVariant;
62use sql::statements::statement::Statement;
63use sql::statements::OptionMap;
64use sql::util::format_raw_object_name;
65use sqlparser::ast::ObjectName;
66use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
67use table::table_name::TableName;
68use table::table_reference::TableReference;
69use table::TableRef;
70
71use self::set::{
72 set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
73};
74use crate::error::{
75 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
76 PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
77 UpgradeCatalogManagerRefSnafu,
78};
79use crate::insert::InserterRef;
80use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
81use crate::statement::set::set_allow_query_fallback;
82
83#[derive(Clone)]
84pub struct StatementExecutor {
85 catalog_manager: CatalogManagerRef,
86 query_engine: QueryEngineRef,
87 procedure_executor: ProcedureExecutorRef,
88 table_metadata_manager: TableMetadataManagerRef,
89 flow_metadata_manager: FlowMetadataManagerRef,
90 view_info_manager: ViewInfoManagerRef,
91 partition_manager: PartitionRuleManagerRef,
92 cache_invalidator: CacheInvalidatorRef,
93 inserter: InserterRef,
94 process_manager: Option<ProcessManagerRef>,
95}
96
97pub type StatementExecutorRef = Arc<StatementExecutor>;
98
99impl StatementExecutor {
100 #[allow(clippy::too_many_arguments)]
101 pub fn new(
102 catalog_manager: CatalogManagerRef,
103 query_engine: QueryEngineRef,
104 procedure_executor: ProcedureExecutorRef,
105 kv_backend: KvBackendRef,
106 cache_invalidator: CacheInvalidatorRef,
107 inserter: InserterRef,
108 table_route_cache: TableRouteCacheRef,
109 process_manager: Option<ProcessManagerRef>,
110 ) -> Self {
111 Self {
112 catalog_manager,
113 query_engine,
114 procedure_executor,
115 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
116 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
117 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
118 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
119 cache_invalidator,
120 inserter,
121 process_manager,
122 }
123 }
124
125 #[cfg(feature = "testing")]
126 pub async fn execute_stmt(
127 &self,
128 stmt: QueryStatement,
129 query_ctx: QueryContextRef,
130 ) -> Result<Output> {
131 match stmt {
132 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
133 QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
134 }
135 }
136
137 #[tracing::instrument(skip_all)]
138 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
139 match stmt {
140 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
141 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
142 }
143
144 Statement::DeclareCursor(declare_cursor) => {
145 self.declare_cursor(declare_cursor, query_ctx).await
146 }
147 Statement::FetchCursor(fetch_cursor) => {
148 self.fetch_cursor(fetch_cursor, query_ctx).await
149 }
150 Statement::CloseCursor(close_cursor) => {
151 self.close_cursor(close_cursor, query_ctx).await
152 }
153
154 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
155
156 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
157
158 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
159
160 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
161
162 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
163
164 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
165
166 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
167
168 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
169
170 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
171
172 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
173
174 #[cfg(feature = "enterprise")]
175 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
176
177 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
178 let query_output = self
179 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
180 .await?;
181 let req = to_copy_query_request(stmt.arg)?;
182
183 self.copy_query_to(req, query_output)
184 .await
185 .map(Output::new_with_affected_rows)
186 }
187
188 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
189 let req = to_copy_table_request(stmt, query_ctx.clone())?;
190 match req.direction {
191 CopyDirection::Export => self
192 .copy_table_to(req, query_ctx)
193 .await
194 .map(Output::new_with_affected_rows),
195 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
196 }
197 }
198
199 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
200 match copy_database {
201 CopyDatabase::To(arg) => {
202 self.copy_database_to(
203 to_copy_database_request(arg, &query_ctx)?,
204 query_ctx.clone(),
205 )
206 .await
207 }
208 CopyDatabase::From(arg) => {
209 self.copy_database_from(
210 to_copy_database_request(arg, &query_ctx)?,
211 query_ctx,
212 )
213 .await
214 }
215 }
216 }
217
218 Statement::CreateTable(stmt) => {
219 let _ = self.create_table(stmt, query_ctx).await?;
220 Ok(Output::new_with_affected_rows(0))
221 }
222 Statement::CreateTableLike(stmt) => {
223 let _ = self.create_table_like(stmt, query_ctx).await?;
224 Ok(Output::new_with_affected_rows(0))
225 }
226 Statement::CreateExternalTable(stmt) => {
227 let _ = self.create_external_table(stmt, query_ctx).await?;
228 Ok(Output::new_with_affected_rows(0))
229 }
230 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
231 #[cfg(feature = "enterprise")]
232 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
233 Statement::DropFlow(stmt) => {
234 self.drop_flow(
235 query_ctx.current_catalog().to_string(),
236 format_raw_object_name(stmt.flow_name()),
237 stmt.drop_if_exists(),
238 query_ctx,
239 )
240 .await
241 }
242 #[cfg(feature = "enterprise")]
243 Statement::DropTrigger(stmt) => {
244 self.drop_trigger(
245 query_ctx.current_catalog().to_string(),
246 format_raw_object_name(stmt.trigger_name()),
247 stmt.drop_if_exists(),
248 query_ctx,
249 )
250 .await
251 }
252 Statement::CreateView(stmt) => {
253 let _ = self.create_view(stmt, query_ctx).await?;
254 Ok(Output::new_with_affected_rows(0))
255 }
256 Statement::DropView(stmt) => {
257 let (catalog_name, schema_name, view_name) =
258 table_idents_to_full_name(&stmt.view_name, &query_ctx)
259 .map_err(BoxedError::new)
260 .context(ExternalSnafu)?;
261
262 self.drop_view(
263 catalog_name,
264 schema_name,
265 view_name,
266 stmt.drop_if_exists,
267 query_ctx,
268 )
269 .await
270 }
271 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
272
273 Statement::AlterDatabase(alter_database) => {
274 self.alter_database(alter_database, query_ctx).await
275 }
276
277 #[cfg(feature = "enterprise")]
278 Statement::AlterTrigger(alter_trigger) => {
279 self.alter_trigger(alter_trigger, query_ctx).await
280 }
281
282 Statement::DropTable(stmt) => {
283 let mut table_names = Vec::with_capacity(stmt.table_names().len());
284 for table_name_stmt in stmt.table_names() {
285 let (catalog, schema, table) =
286 table_idents_to_full_name(table_name_stmt, &query_ctx)
287 .map_err(BoxedError::new)
288 .context(ExternalSnafu)?;
289 table_names.push(TableName::new(catalog, schema, table));
290 }
291 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
292 .await
293 }
294 Statement::DropDatabase(stmt) => {
295 self.drop_database(
296 query_ctx.current_catalog().to_string(),
297 format_raw_object_name(stmt.name()),
298 stmt.drop_if_exists(),
299 query_ctx,
300 )
301 .await
302 }
303 Statement::TruncateTable(stmt) => {
304 let (catalog, schema, table) =
305 table_idents_to_full_name(stmt.table_name(), &query_ctx)
306 .map_err(BoxedError::new)
307 .context(ExternalSnafu)?;
308 let table_name = TableName::new(catalog, schema, table);
309 self.truncate_table(table_name, query_ctx).await
310 }
311 Statement::CreateDatabase(stmt) => {
312 self.create_database(
313 &format_raw_object_name(&stmt.name),
314 stmt.if_not_exists,
315 stmt.options.into_map(),
316 query_ctx,
317 )
318 .await
319 }
320 Statement::ShowCreateDatabase(show) => {
321 let (catalog, database) =
322 idents_to_full_database_name(&show.database_name, &query_ctx)
323 .map_err(BoxedError::new)
324 .context(ExternalSnafu)?;
325 let table_metadata_manager = self
326 .catalog_manager
327 .as_any()
328 .downcast_ref::<KvBackendCatalogManager>()
329 .map(|manager| manager.table_metadata_manager_ref().clone())
330 .context(UpgradeCatalogManagerRefSnafu)?;
331 let opts: HashMap<String, String> = table_metadata_manager
332 .schema_manager()
333 .get(SchemaNameKey::new(&catalog, &database))
334 .await
335 .context(TableMetadataManagerSnafu)?
336 .context(SchemaNotFoundSnafu {
337 schema_info: &database,
338 })?
339 .into_inner()
340 .into();
341
342 self.show_create_database(&database, opts.into()).await
343 }
344 Statement::ShowCreateTable(show) => {
345 let (catalog, schema, table) =
346 table_idents_to_full_name(&show.table_name, &query_ctx)
347 .map_err(BoxedError::new)
348 .context(ExternalSnafu)?;
349
350 let table_ref = self
351 .catalog_manager
352 .table(&catalog, &schema, &table, Some(&query_ctx))
353 .await
354 .context(CatalogSnafu)?
355 .context(TableNotFoundSnafu { table_name: &table })?;
356 let table_name = TableName::new(catalog, schema, table);
357
358 match show.variant {
359 ShowCreateTableVariant::Original => {
360 self.show_create_table(table_name, table_ref, query_ctx)
361 .await
362 }
363 ShowCreateTableVariant::PostgresForeignTable => {
364 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
365 .await
366 }
367 }
368 }
369 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
370 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
371 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
372 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
373 Statement::ShowColumns(show_columns) => {
374 self.show_columns(show_columns, query_ctx).await
375 }
376 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
377 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
378 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
379 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
380 Statement::Use(db) => self.use_database(db, query_ctx).await,
381 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
382 Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
383 Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
384 }
385 }
386
387 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
388 let catalog = query_ctx.current_catalog();
389 ensure!(
390 self.catalog_manager
391 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
392 .await
393 .context(CatalogSnafu)?,
394 SchemaNotFoundSnafu { schema_info: &db }
395 );
396
397 query_ctx.set_current_schema(&db);
398
399 Ok(Output::new_with_record_batches(RecordBatches::empty()))
400 }
401
402 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
403 let var_name = set_var.variable.to_string().to_uppercase();
404
405 match var_name.as_str() {
406 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
407
408 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
409 set_timezone(set_var.value, query_ctx)?
410 }
411
412 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
413
414 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
418
419 "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
421
422 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
423 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
424 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
425 Channel::Postgres => {
426 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
427 }
428 _ => {
429 return NotSupportedSnafu {
430 feat: format!("Unsupported set variable {}", var_name),
431 }
432 .fail()
433 }
434 },
435 "STATEMENT_TIMEOUT" => {
436 if query_ctx.channel() == Channel::Postgres {
437 set_query_timeout(set_var.value, query_ctx)?
438 } else {
439 return NotSupportedSnafu {
440 feat: format!("Unsupported set variable {}", var_name),
441 }
442 .fail();
443 }
444 }
445 "SEARCH_PATH" => {
446 if query_ctx.channel() == Channel::Postgres {
447 set_search_path(set_var.value, query_ctx)?
448 } else {
449 return NotSupportedSnafu {
450 feat: format!("Unsupported set variable {}", var_name),
451 }
452 .fail();
453 }
454 }
455 _ => {
456 if query_ctx.channel() == Channel::Postgres {
461 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
462 } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
463 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
465 } else {
466 return NotSupportedSnafu {
467 feat: format!("Unsupported set variable {}", var_name),
468 }
469 .fail();
470 }
471 }
472 }
473 Ok(Output::new_with_affected_rows(0))
474 }
475
476 #[tracing::instrument(skip_all)]
477 pub async fn plan(
478 &self,
479 stmt: &QueryStatement,
480 query_ctx: QueryContextRef,
481 ) -> Result<LogicalPlan> {
482 self.query_engine
483 .planner()
484 .plan(stmt, query_ctx)
485 .await
486 .context(PlanStatementSnafu)
487 }
488
489 #[tracing::instrument(skip_all)]
491 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
492 self.query_engine
493 .execute(plan, query_ctx)
494 .await
495 .context(ExecLogicalPlanSnafu)
496 }
497
498 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
499 self.query_engine
500 .planner()
501 .optimize(plan)
502 .context(PlanStatementSnafu)
503 }
504
505 #[tracing::instrument(skip_all)]
506 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
507 let plan = self.plan(&stmt, query_ctx.clone()).await?;
508 self.exec_plan(plan, query_ctx).await
509 }
510
511 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
512 let TableReference {
513 catalog,
514 schema,
515 table,
516 } = table_ref;
517 self.catalog_manager
518 .table(catalog, schema, table, None)
519 .await
520 .context(CatalogSnafu)?
521 .with_context(|| TableNotFoundSnafu {
522 table_name: table_ref.to_string(),
523 })
524 }
525
526 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
527 &self.procedure_executor
528 }
529
530 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
531 &self.cache_invalidator
532 }
533}
534
535fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
536 let CopyQueryToArgument {
537 with,
538 connection,
539 location,
540 } = stmt;
541
542 Ok(CopyQueryToRequest {
543 location,
544 with: with.into_map(),
545 connection: connection.into_map(),
546 })
547}
548
549fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
550 let direction = match stmt {
551 CopyTable::To(_) => CopyDirection::Export,
552 CopyTable::From(_) => CopyDirection::Import,
553 };
554
555 let CopyTableArgument {
556 location,
557 connection,
558 with,
559 table_name,
560 limit,
561 ..
562 } = match stmt {
563 CopyTable::To(arg) => arg,
564 CopyTable::From(arg) => arg,
565 };
566 let (catalog_name, schema_name, table_name) =
567 table_idents_to_full_name(&table_name, &query_ctx)
568 .map_err(BoxedError::new)
569 .context(ExternalSnafu)?;
570
571 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
572
573 let pattern = with
574 .get(common_datasource::file_format::FILE_PATTERN)
575 .cloned();
576
577 Ok(CopyTableRequest {
578 catalog_name,
579 schema_name,
580 table_name,
581 location,
582 with: with.into_map(),
583 connection: connection.into_map(),
584 pattern,
585 direction,
586 timestamp_range,
587 limit,
588 })
589}
590
591fn to_copy_database_request(
594 arg: CopyDatabaseArgument,
595 query_ctx: &QueryContextRef,
596) -> Result<CopyDatabaseRequest> {
597 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
598 .map_err(BoxedError::new)
599 .context(ExternalSnafu)?;
600 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
601
602 Ok(CopyDatabaseRequest {
603 catalog_name,
604 schema_name: database_name,
605 location: arg.location,
606 with: arg.with.into_map(),
607 connection: arg.connection.into_map(),
608 time_range,
609 })
610}
611
612fn timestamp_range_from_option_map(
616 options: &OptionMap,
617 query_ctx: &QueryContextRef,
618) -> Result<Option<TimestampRange>> {
619 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
620 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
621 let time_range = match (start_timestamp, end_timestamp) {
622 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
623 error::InvalidTimestampRangeSnafu {
624 start: start.to_iso8601_string(),
625 end: end.to_iso8601_string(),
626 }
627 })?),
628 (Some(start), None) => Some(TimestampRange::from_start(start)),
629 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
631 };
632 Ok(time_range)
633}
634
635fn extract_timestamp(
637 map: &OptionMap,
638 key: &str,
639 query_ctx: &QueryContextRef,
640) -> Result<Option<Timestamp>> {
641 map.get(key)
642 .map(|v| {
643 Timestamp::from_str(v, Some(&query_ctx.timezone()))
644 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
645 })
646 .transpose()
647}
648
649fn idents_to_full_database_name(
650 obj_name: &ObjectName,
651 query_ctx: &QueryContextRef,
652) -> Result<(String, String)> {
653 match &obj_name.0[..] {
654 [database] => Ok((
655 query_ctx.current_catalog().to_owned(),
656 database.value.clone(),
657 )),
658 [catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
659 _ => InvalidSqlSnafu {
660 err_msg: format!(
661 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
662 ),
663 }
664 .fail(),
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use std::assert_matches::assert_matches;
671 use std::collections::HashMap;
672
673 use common_time::range::TimestampRange;
674 use common_time::{Timestamp, Timezone};
675 use session::context::QueryContextBuilder;
676 use sql::statements::OptionMap;
677
678 use crate::error;
679 use crate::statement::copy_database::{
680 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
681 };
682 use crate::statement::timestamp_range_from_option_map;
683
684 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
685 let query_ctx = QueryContextBuilder::default()
686 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
687 .build()
688 .into();
689 let map = OptionMap::from(
690 [
691 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
692 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
693 ]
694 .into_iter()
695 .collect::<HashMap<_, _>>(),
696 );
697 timestamp_range_from_option_map(&map, &query_ctx)
698 }
699
700 #[test]
701 fn test_timestamp_range_from_option_map() {
702 assert_eq!(
703 Some(
704 TimestampRange::new(
705 Timestamp::new_second(1649635200),
706 Timestamp::new_second(1649664000),
707 )
708 .unwrap(),
709 ),
710 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
711 );
712
713 assert_matches!(
714 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
715 error::Error::InvalidTimestampRange { .. }
716 );
717 }
718}