1mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::v1::RowInsertRequests;
33use catalog::kvbackend::KvBackendCatalogManager;
34use catalog::process_manager::ProcessManagerRef;
35use catalog::CatalogManagerRef;
36use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
37use client::inserter::{InsertOptions, Inserter};
38use client::RecordBatches;
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
43use common_meta::key::schema_name::SchemaNameKey;
44use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
45use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
46use common_meta::kv_backend::KvBackendRef;
47use common_meta::procedure_executor::ProcedureExecutorRef;
48use common_query::Output;
49use common_telemetry::tracing;
50use common_time::range::TimestampRange;
51use common_time::Timestamp;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use humantime::format_duration;
55use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
56use query::parser::QueryStatement;
57use query::QueryEngineRef;
58use session::context::{Channel, QueryContextBuilder, QueryContextRef};
59use session::table_name::table_idents_to_full_name;
60use set::{set_query_timeout, set_read_preference};
61use snafu::{ensure, OptionExt, ResultExt};
62use sql::ast::ObjectNamePartExt;
63use sql::statements::copy::{
64 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
65};
66use sql::statements::set_variables::SetVariables;
67use sql::statements::show::ShowCreateTableVariant;
68use sql::statements::statement::Statement;
69use sql::statements::OptionMap;
70use sql::util::format_raw_object_name;
71use sqlparser::ast::ObjectName;
72use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
73use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
74use table::table_name::TableName;
75use table::table_reference::TableReference;
76use table::TableRef;
77
78use self::set::{
79 set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
80};
81use crate::error::{
82 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
83 PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
84 TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
85};
86use crate::insert::InserterRef;
87use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
88use crate::statement::set::set_allow_query_fallback;
89
90#[derive(Clone)]
91pub struct StatementExecutor {
92 catalog_manager: CatalogManagerRef,
93 query_engine: QueryEngineRef,
94 procedure_executor: ProcedureExecutorRef,
95 table_metadata_manager: TableMetadataManagerRef,
96 flow_metadata_manager: FlowMetadataManagerRef,
97 view_info_manager: ViewInfoManagerRef,
98 partition_manager: PartitionRuleManagerRef,
99 cache_invalidator: CacheInvalidatorRef,
100 inserter: InserterRef,
101 process_manager: Option<ProcessManagerRef>,
102}
103
104pub type StatementExecutorRef = Arc<StatementExecutor>;
105
106impl StatementExecutor {
107 #[allow(clippy::too_many_arguments)]
108 pub fn new(
109 catalog_manager: CatalogManagerRef,
110 query_engine: QueryEngineRef,
111 procedure_executor: ProcedureExecutorRef,
112 kv_backend: KvBackendRef,
113 cache_invalidator: CacheInvalidatorRef,
114 inserter: InserterRef,
115 table_route_cache: TableRouteCacheRef,
116 process_manager: Option<ProcessManagerRef>,
117 ) -> Self {
118 Self {
119 catalog_manager,
120 query_engine,
121 procedure_executor,
122 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
123 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
124 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
125 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
126 cache_invalidator,
127 inserter,
128 process_manager,
129 }
130 }
131
132 #[cfg(feature = "testing")]
133 pub async fn execute_stmt(
134 &self,
135 stmt: QueryStatement,
136 query_ctx: QueryContextRef,
137 ) -> Result<Output> {
138 match stmt {
139 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
140 QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
141 }
142 }
143
144 #[tracing::instrument(skip_all)]
145 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
146 match stmt {
147 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
148 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
149 }
150
151 Statement::DeclareCursor(declare_cursor) => {
152 self.declare_cursor(declare_cursor, query_ctx).await
153 }
154 Statement::FetchCursor(fetch_cursor) => {
155 self.fetch_cursor(fetch_cursor, query_ctx).await
156 }
157 Statement::CloseCursor(close_cursor) => {
158 self.close_cursor(close_cursor, query_ctx).await
159 }
160
161 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
162
163 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
164
165 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
166
167 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
168
169 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
170
171 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
172
173 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
174
175 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
176
177 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
178
179 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
180
181 #[cfg(feature = "enterprise")]
182 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
183
184 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
185 let query_output = self
186 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
187 .await?;
188 let req = to_copy_query_request(stmt.arg)?;
189
190 self.copy_query_to(req, query_output)
191 .await
192 .map(Output::new_with_affected_rows)
193 }
194
195 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
196 let req = to_copy_table_request(stmt, query_ctx.clone())?;
197 match req.direction {
198 CopyDirection::Export => self
199 .copy_table_to(req, query_ctx)
200 .await
201 .map(Output::new_with_affected_rows),
202 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
203 }
204 }
205
206 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
207 match copy_database {
208 CopyDatabase::To(arg) => {
209 self.copy_database_to(
210 to_copy_database_request(arg, &query_ctx)?,
211 query_ctx.clone(),
212 )
213 .await
214 }
215 CopyDatabase::From(arg) => {
216 self.copy_database_from(
217 to_copy_database_request(arg, &query_ctx)?,
218 query_ctx,
219 )
220 .await
221 }
222 }
223 }
224
225 Statement::CreateTable(stmt) => {
226 let _ = self.create_table(stmt, query_ctx).await?;
227 Ok(Output::new_with_affected_rows(0))
228 }
229 Statement::CreateTableLike(stmt) => {
230 let _ = self.create_table_like(stmt, query_ctx).await?;
231 Ok(Output::new_with_affected_rows(0))
232 }
233 Statement::CreateExternalTable(stmt) => {
234 let _ = self.create_external_table(stmt, query_ctx).await?;
235 Ok(Output::new_with_affected_rows(0))
236 }
237 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
238 #[cfg(feature = "enterprise")]
239 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
240 Statement::DropFlow(stmt) => {
241 self.drop_flow(
242 query_ctx.current_catalog().to_string(),
243 format_raw_object_name(stmt.flow_name()),
244 stmt.drop_if_exists(),
245 query_ctx,
246 )
247 .await
248 }
249 #[cfg(feature = "enterprise")]
250 Statement::DropTrigger(stmt) => {
251 self.drop_trigger(
252 query_ctx.current_catalog().to_string(),
253 format_raw_object_name(stmt.trigger_name()),
254 stmt.drop_if_exists(),
255 query_ctx,
256 )
257 .await
258 }
259 Statement::CreateView(stmt) => {
260 let _ = self.create_view(stmt, query_ctx).await?;
261 Ok(Output::new_with_affected_rows(0))
262 }
263 Statement::DropView(stmt) => {
264 let (catalog_name, schema_name, view_name) =
265 table_idents_to_full_name(&stmt.view_name, &query_ctx)
266 .map_err(BoxedError::new)
267 .context(ExternalSnafu)?;
268
269 self.drop_view(
270 catalog_name,
271 schema_name,
272 view_name,
273 stmt.drop_if_exists,
274 query_ctx,
275 )
276 .await
277 }
278 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
279
280 Statement::AlterDatabase(alter_database) => {
281 self.alter_database(alter_database, query_ctx).await
282 }
283
284 #[cfg(feature = "enterprise")]
285 Statement::AlterTrigger(alter_trigger) => {
286 self.alter_trigger(alter_trigger, query_ctx).await
287 }
288
289 Statement::DropTable(stmt) => {
290 let mut table_names = Vec::with_capacity(stmt.table_names().len());
291 for table_name_stmt in stmt.table_names() {
292 let (catalog, schema, table) =
293 table_idents_to_full_name(table_name_stmt, &query_ctx)
294 .map_err(BoxedError::new)
295 .context(ExternalSnafu)?;
296 table_names.push(TableName::new(catalog, schema, table));
297 }
298 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
299 .await
300 }
301 Statement::DropDatabase(stmt) => {
302 self.drop_database(
303 query_ctx.current_catalog().to_string(),
304 format_raw_object_name(stmt.name()),
305 stmt.drop_if_exists(),
306 query_ctx,
307 )
308 .await
309 }
310 Statement::TruncateTable(stmt) => {
311 let (catalog, schema, table) =
312 table_idents_to_full_name(stmt.table_name(), &query_ctx)
313 .map_err(BoxedError::new)
314 .context(ExternalSnafu)?;
315 let table_name = TableName::new(catalog, schema, table);
316 let time_ranges = self
317 .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
318 .await?;
319 self.truncate_table(table_name, time_ranges, query_ctx)
320 .await
321 }
322 Statement::CreateDatabase(stmt) => {
323 self.create_database(
324 &format_raw_object_name(&stmt.name),
325 stmt.if_not_exists,
326 stmt.options.into_map(),
327 query_ctx,
328 )
329 .await
330 }
331 Statement::ShowCreateDatabase(show) => {
332 let (catalog, database) =
333 idents_to_full_database_name(&show.database_name, &query_ctx)
334 .map_err(BoxedError::new)
335 .context(ExternalSnafu)?;
336 let table_metadata_manager = self
337 .catalog_manager
338 .as_any()
339 .downcast_ref::<KvBackendCatalogManager>()
340 .map(|manager| manager.table_metadata_manager_ref().clone())
341 .context(UpgradeCatalogManagerRefSnafu)?;
342 let opts: HashMap<String, String> = table_metadata_manager
343 .schema_manager()
344 .get(SchemaNameKey::new(&catalog, &database))
345 .await
346 .context(TableMetadataManagerSnafu)?
347 .context(SchemaNotFoundSnafu {
348 schema_info: &database,
349 })?
350 .into_inner()
351 .into();
352
353 self.show_create_database(&database, opts.into()).await
354 }
355 Statement::ShowCreateTable(show) => {
356 let (catalog, schema, table) =
357 table_idents_to_full_name(&show.table_name, &query_ctx)
358 .map_err(BoxedError::new)
359 .context(ExternalSnafu)?;
360
361 let table_ref = self
362 .catalog_manager
363 .table(&catalog, &schema, &table, Some(&query_ctx))
364 .await
365 .context(CatalogSnafu)?
366 .context(TableNotFoundSnafu { table_name: &table })?;
367 let table_name = TableName::new(catalog, schema, table);
368
369 match show.variant {
370 ShowCreateTableVariant::Original => {
371 self.show_create_table(table_name, table_ref, query_ctx)
372 .await
373 }
374 ShowCreateTableVariant::PostgresForeignTable => {
375 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
376 .await
377 }
378 }
379 }
380 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
381 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
382 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
383 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
384 Statement::ShowColumns(show_columns) => {
385 self.show_columns(show_columns, query_ctx).await
386 }
387 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
388 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
389 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
390 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
391 Statement::Use(db) => self.use_database(db, query_ctx).await,
392 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
393 Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
394 Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
395 }
396 }
397
398 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
399 let catalog = query_ctx.current_catalog();
400 ensure!(
401 self.catalog_manager
402 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
403 .await
404 .context(CatalogSnafu)?,
405 SchemaNotFoundSnafu { schema_info: &db }
406 );
407
408 query_ctx.set_current_schema(&db);
409
410 Ok(Output::new_with_record_batches(RecordBatches::empty()))
411 }
412
413 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
414 let var_name = set_var.variable.to_string().to_uppercase();
415
416 match var_name.as_str() {
417 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
418
419 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
420 set_timezone(set_var.value, query_ctx)?
421 }
422
423 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
424
425 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
429
430 "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
432
433 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
434 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
435 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
436 Channel::Postgres => {
437 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
438 }
439 _ => {
440 return NotSupportedSnafu {
441 feat: format!("Unsupported set variable {}", var_name),
442 }
443 .fail()
444 }
445 },
446 "STATEMENT_TIMEOUT" => {
447 if query_ctx.channel() == Channel::Postgres {
448 set_query_timeout(set_var.value, query_ctx)?
449 } else {
450 return NotSupportedSnafu {
451 feat: format!("Unsupported set variable {}", var_name),
452 }
453 .fail();
454 }
455 }
456 "SEARCH_PATH" => {
457 if query_ctx.channel() == Channel::Postgres {
458 set_search_path(set_var.value, query_ctx)?
459 } else {
460 return NotSupportedSnafu {
461 feat: format!("Unsupported set variable {}", var_name),
462 }
463 .fail();
464 }
465 }
466 _ => {
467 if query_ctx.channel() == Channel::Postgres {
472 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
473 } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
474 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
476 } else {
477 return NotSupportedSnafu {
478 feat: format!("Unsupported set variable {}", var_name),
479 }
480 .fail();
481 }
482 }
483 }
484 Ok(Output::new_with_affected_rows(0))
485 }
486
487 #[tracing::instrument(skip_all)]
488 pub async fn plan(
489 &self,
490 stmt: &QueryStatement,
491 query_ctx: QueryContextRef,
492 ) -> Result<LogicalPlan> {
493 self.query_engine
494 .planner()
495 .plan(stmt, query_ctx)
496 .await
497 .context(PlanStatementSnafu)
498 }
499
500 #[tracing::instrument(skip_all)]
502 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
503 self.query_engine
504 .execute(plan, query_ctx)
505 .await
506 .context(ExecLogicalPlanSnafu)
507 }
508
509 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
510 self.query_engine
511 .planner()
512 .optimize(plan)
513 .context(PlanStatementSnafu)
514 }
515
516 #[tracing::instrument(skip_all)]
517 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
518 let plan = self.plan(&stmt, query_ctx.clone()).await?;
519 self.exec_plan(plan, query_ctx).await
520 }
521
522 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
523 let TableReference {
524 catalog,
525 schema,
526 table,
527 } = table_ref;
528 self.catalog_manager
529 .table(catalog, schema, table, None)
530 .await
531 .context(CatalogSnafu)?
532 .with_context(|| TableNotFoundSnafu {
533 table_name: table_ref.to_string(),
534 })
535 }
536
537 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
538 &self.procedure_executor
539 }
540
541 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
542 &self.cache_invalidator
543 }
544
545 pub async fn convert_truncate_time_ranges(
548 &self,
549 table_name: &TableName,
550 sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
551 query_ctx: &QueryContextRef,
552 ) -> Result<Vec<(Timestamp, Timestamp)>> {
553 if sql_values_time_range.is_empty() {
554 return Ok(vec![]);
555 }
556 let table = self.get_table(&table_name.table_ref()).await?;
557 let info = table.table_info();
558 let time_index_dt = info
559 .meta
560 .schema
561 .timestamp_column()
562 .context(UnexpectedSnafu {
563 violated: "Table must have a timestamp column",
564 })?;
565
566 let time_unit = time_index_dt
567 .data_type
568 .as_timestamp()
569 .with_context(|| UnexpectedSnafu {
570 violated: format!(
571 "Table {}'s time index column must be a timestamp type, found: {:?}",
572 table_name, time_index_dt
573 ),
574 })?
575 .unit();
576
577 let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
578 for (start, end) in sql_values_time_range {
579 let start = common_sql::convert::sql_value_to_value(
580 "range_start",
581 &ConcreteDataType::timestamp_datatype(time_unit),
582 start,
583 Some(&query_ctx.timezone()),
584 None,
585 false,
586 )
587 .context(SqlCommonSnafu)
588 .and_then(|v| {
589 if let datatypes::value::Value::Timestamp(t) = v {
590 Ok(t)
591 } else {
592 error::InvalidSqlSnafu {
593 err_msg: format!("Expected a timestamp value, found {v:?}"),
594 }
595 .fail()
596 }
597 })?;
598
599 let end = common_sql::convert::sql_value_to_value(
600 "range_end",
601 &ConcreteDataType::timestamp_datatype(time_unit),
602 end,
603 Some(&query_ctx.timezone()),
604 None,
605 false,
606 )
607 .context(SqlCommonSnafu)
608 .and_then(|v| {
609 if let datatypes::value::Value::Timestamp(t) = v {
610 Ok(t)
611 } else {
612 error::InvalidSqlSnafu {
613 err_msg: format!("Expected a timestamp value, found {v:?}"),
614 }
615 .fail()
616 }
617 })?;
618 time_ranges.push((start, end));
619 }
620 Ok(time_ranges)
621 }
622
623 pub(crate) fn inserter(&self) -> &InserterRef {
625 &self.inserter
626 }
627}
628
629fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
630 let CopyQueryToArgument {
631 with,
632 connection,
633 location,
634 } = stmt;
635
636 Ok(CopyQueryToRequest {
637 location,
638 with: with.into_map(),
639 connection: connection.into_map(),
640 })
641}
642
643fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
644 let direction = match stmt {
645 CopyTable::To(_) => CopyDirection::Export,
646 CopyTable::From(_) => CopyDirection::Import,
647 };
648
649 let CopyTableArgument {
650 location,
651 connection,
652 with,
653 table_name,
654 limit,
655 ..
656 } = match stmt {
657 CopyTable::To(arg) => arg,
658 CopyTable::From(arg) => arg,
659 };
660 let (catalog_name, schema_name, table_name) =
661 table_idents_to_full_name(&table_name, &query_ctx)
662 .map_err(BoxedError::new)
663 .context(ExternalSnafu)?;
664
665 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
666
667 let pattern = with
668 .get(common_datasource::file_format::FILE_PATTERN)
669 .cloned();
670
671 Ok(CopyTableRequest {
672 catalog_name,
673 schema_name,
674 table_name,
675 location,
676 with: with.into_map(),
677 connection: connection.into_map(),
678 pattern,
679 direction,
680 timestamp_range,
681 limit,
682 })
683}
684
685fn to_copy_database_request(
688 arg: CopyDatabaseArgument,
689 query_ctx: &QueryContextRef,
690) -> Result<CopyDatabaseRequest> {
691 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
692 .map_err(BoxedError::new)
693 .context(ExternalSnafu)?;
694 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
695
696 Ok(CopyDatabaseRequest {
697 catalog_name,
698 schema_name: database_name,
699 location: arg.location,
700 with: arg.with.into_map(),
701 connection: arg.connection.into_map(),
702 time_range,
703 })
704}
705
706fn timestamp_range_from_option_map(
710 options: &OptionMap,
711 query_ctx: &QueryContextRef,
712) -> Result<Option<TimestampRange>> {
713 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
714 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
715 let time_range = match (start_timestamp, end_timestamp) {
716 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
717 error::InvalidTimestampRangeSnafu {
718 start: start.to_iso8601_string(),
719 end: end.to_iso8601_string(),
720 }
721 })?),
722 (Some(start), None) => Some(TimestampRange::from_start(start)),
723 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
725 };
726 Ok(time_range)
727}
728
729fn extract_timestamp(
731 map: &OptionMap,
732 key: &str,
733 query_ctx: &QueryContextRef,
734) -> Result<Option<Timestamp>> {
735 map.get(key)
736 .map(|v| {
737 Timestamp::from_str(v, Some(&query_ctx.timezone()))
738 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
739 })
740 .transpose()
741}
742
743fn idents_to_full_database_name(
744 obj_name: &ObjectName,
745 query_ctx: &QueryContextRef,
746) -> Result<(String, String)> {
747 match &obj_name.0[..] {
748 [database] => Ok((
749 query_ctx.current_catalog().to_owned(),
750 database.to_string_unquoted(),
751 )),
752 [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
753 _ => InvalidSqlSnafu {
754 err_msg: format!(
755 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
756 ),
757 }
758 .fail(),
759 }
760}
761
762pub struct InserterImpl {
764 statement_executor: StatementExecutorRef,
765 options: Option<InsertOptions>,
766}
767
768impl InserterImpl {
769 pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
770 Self {
771 statement_executor,
772 options,
773 }
774 }
775}
776
777#[async_trait::async_trait]
778impl Inserter for InserterImpl {
779 async fn insert_rows(
780 &self,
781 context: &client::inserter::Context<'_>,
782 requests: RowInsertRequests,
783 ) -> ClientResult<()> {
784 let mut ctx_builder = QueryContextBuilder::default()
785 .current_catalog(context.catalog.to_string())
786 .current_schema(context.schema.to_string());
787 if let Some(options) = self.options.as_ref() {
788 ctx_builder = ctx_builder
789 .set_extension(
790 TTL_KEY.to_string(),
791 format_duration(options.ttl).to_string(),
792 )
793 .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
794 }
795 let query_ctx = ctx_builder.build().into();
796
797 self.statement_executor
798 .inserter()
799 .handle_row_inserts(
800 requests,
801 query_ctx,
802 self.statement_executor.as_ref(),
803 false,
804 false,
805 )
806 .await
807 .map_err(BoxedError::new)
808 .context(ClientExternalSnafu)
809 .map(|_| ())
810 }
811
812 fn set_options(&mut self, options: &InsertOptions) {
813 self.options = Some(*options);
814 }
815}
816
817#[cfg(test)]
818mod tests {
819 use std::assert_matches::assert_matches;
820 use std::collections::HashMap;
821
822 use common_time::range::TimestampRange;
823 use common_time::{Timestamp, Timezone};
824 use session::context::QueryContextBuilder;
825 use sql::statements::OptionMap;
826
827 use crate::error;
828 use crate::statement::copy_database::{
829 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
830 };
831 use crate::statement::timestamp_range_from_option_map;
832
833 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
834 let query_ctx = QueryContextBuilder::default()
835 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
836 .build()
837 .into();
838 let map = OptionMap::from(
839 [
840 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
841 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
842 ]
843 .into_iter()
844 .collect::<HashMap<_, _>>(),
845 );
846 timestamp_range_from_option_map(&map, &query_ctx)
847 }
848
849 #[test]
850 fn test_timestamp_range_from_option_map() {
851 assert_eq!(
852 Some(
853 TimestampRange::new(
854 Timestamp::new_second(1649635200),
855 Timestamp::new_second(1649664000),
856 )
857 .unwrap(),
858 ),
859 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
860 );
861
862 assert_matches!(
863 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
864 error::Error::InvalidTimestampRange { .. }
865 );
866 }
867}