1mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::v1::RowInsertRequests;
33use catalog::CatalogManagerRef;
34use catalog::kvbackend::KvBackendCatalogManager;
35use catalog::process_manager::ProcessManagerRef;
36use client::RecordBatches;
37use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
38use client::inserter::{InsertOptions, Inserter};
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
43use common_meta::key::schema_name::SchemaNameKey;
44use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
45use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
46use common_meta::kv_backend::KvBackendRef;
47use common_meta::procedure_executor::ProcedureExecutorRef;
48use common_query::Output;
49use common_telemetry::{debug, tracing, warn};
50use common_time::Timestamp;
51use common_time::range::TimestampRange;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use humantime::format_duration;
55use itertools::Itertools;
56use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
57use query::QueryEngineRef;
58use query::parser::QueryStatement;
59use session::context::{Channel, QueryContextBuilder, QueryContextRef};
60use session::table_name::table_idents_to_full_name;
61use set::{set_query_timeout, set_read_preference};
62use snafu::{OptionExt, ResultExt, ensure};
63use sql::ast::ObjectNamePartExt;
64use sql::statements::OptionMap;
65use sql::statements::copy::{
66 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
67};
68use sql::statements::set_variables::SetVariables;
69use sql::statements::show::ShowCreateTableVariant;
70use sql::statements::statement::Statement;
71use sql::util::format_raw_object_name;
72use sqlparser::ast::ObjectName;
73use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
74use table::TableRef;
75use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
76use table::table_name::TableName;
77use table::table_reference::TableReference;
78
79use self::set::{
80 set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
81};
82use crate::error::{
83 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
84 PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
85 TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
86};
87use crate::insert::InserterRef;
88use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
89use crate::statement::set::set_allow_query_fallback;
90
91#[async_trait::async_trait]
93pub trait StatementExecutorConfigurator: Send + Sync {
94 async fn configure(
95 &self,
96 executor: StatementExecutor,
97 ctx: ExecutorConfigureContext,
98 ) -> std::result::Result<StatementExecutor, BoxedError>;
99}
100
101pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
102
103pub struct ExecutorConfigureContext {
104 pub kv_backend: KvBackendRef,
105}
106
107#[derive(Clone)]
108pub struct StatementExecutor {
109 catalog_manager: CatalogManagerRef,
110 query_engine: QueryEngineRef,
111 procedure_executor: ProcedureExecutorRef,
112 table_metadata_manager: TableMetadataManagerRef,
113 flow_metadata_manager: FlowMetadataManagerRef,
114 view_info_manager: ViewInfoManagerRef,
115 partition_manager: PartitionRuleManagerRef,
116 cache_invalidator: CacheInvalidatorRef,
117 inserter: InserterRef,
118 process_manager: Option<ProcessManagerRef>,
119 #[cfg(feature = "enterprise")]
120 trigger_querier: Option<TriggerQuerierRef>,
121}
122
123pub type StatementExecutorRef = Arc<StatementExecutor>;
124
125#[cfg(feature = "enterprise")]
127#[async_trait::async_trait]
128pub trait TriggerQuerier: Send + Sync {
129 async fn show_create_trigger(
131 &self,
132 catalog: &str,
133 trigger: &str,
134 query_ctx: &QueryContextRef,
135 ) -> std::result::Result<Output, BoxedError>;
136
137 fn as_any(&self) -> &dyn std::any::Any;
138}
139
140#[cfg(feature = "enterprise")]
141pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
142
143impl StatementExecutor {
144 #[allow(clippy::too_many_arguments)]
145 pub fn new(
146 catalog_manager: CatalogManagerRef,
147 query_engine: QueryEngineRef,
148 procedure_executor: ProcedureExecutorRef,
149 kv_backend: KvBackendRef,
150 cache_invalidator: CacheInvalidatorRef,
151 inserter: InserterRef,
152 table_route_cache: TableRouteCacheRef,
153 process_manager: Option<ProcessManagerRef>,
154 ) -> Self {
155 Self {
156 catalog_manager,
157 query_engine,
158 procedure_executor,
159 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
160 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
161 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
162 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
163 cache_invalidator,
164 inserter,
165 process_manager,
166 #[cfg(feature = "enterprise")]
167 trigger_querier: None,
168 }
169 }
170
171 #[cfg(feature = "enterprise")]
172 pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
173 self.trigger_querier = Some(querier);
174 self
175 }
176
177 #[cfg(feature = "testing")]
178 pub async fn execute_stmt(
179 &self,
180 stmt: QueryStatement,
181 query_ctx: QueryContextRef,
182 ) -> Result<Output> {
183 match stmt {
184 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
185 QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
186 }
187 }
188
189 #[tracing::instrument(skip_all)]
190 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
191 match stmt {
192 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
193 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
194 }
195
196 Statement::DeclareCursor(declare_cursor) => {
197 self.declare_cursor(declare_cursor, query_ctx).await
198 }
199 Statement::FetchCursor(fetch_cursor) => {
200 self.fetch_cursor(fetch_cursor, query_ctx).await
201 }
202 Statement::CloseCursor(close_cursor) => {
203 self.close_cursor(close_cursor, query_ctx).await
204 }
205
206 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
207
208 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
209
210 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
211
212 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
213
214 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
215
216 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
217
218 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
219
220 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
221
222 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
223
224 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
225
226 #[cfg(feature = "enterprise")]
227 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
228
229 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
230 let query_output = self
231 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
232 .await?;
233 let req = to_copy_query_request(stmt.arg)?;
234
235 self.copy_query_to(req, query_output)
236 .await
237 .map(Output::new_with_affected_rows)
238 }
239
240 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
241 let req = to_copy_table_request(stmt, query_ctx.clone())?;
242 match req.direction {
243 CopyDirection::Export => self
244 .copy_table_to(req, query_ctx)
245 .await
246 .map(Output::new_with_affected_rows),
247 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
248 }
249 }
250
251 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
252 match copy_database {
253 CopyDatabase::To(arg) => {
254 self.copy_database_to(
255 to_copy_database_request(arg, &query_ctx)?,
256 query_ctx.clone(),
257 )
258 .await
259 }
260 CopyDatabase::From(arg) => {
261 self.copy_database_from(
262 to_copy_database_request(arg, &query_ctx)?,
263 query_ctx,
264 )
265 .await
266 }
267 }
268 }
269
270 Statement::CreateTable(stmt) => {
271 let _ = self.create_table(stmt, query_ctx).await?;
272 Ok(Output::new_with_affected_rows(0))
273 }
274 Statement::CreateTableLike(stmt) => {
275 let _ = self.create_table_like(stmt, query_ctx).await?;
276 Ok(Output::new_with_affected_rows(0))
277 }
278 Statement::CreateExternalTable(stmt) => {
279 let _ = self.create_external_table(stmt, query_ctx).await?;
280 Ok(Output::new_with_affected_rows(0))
281 }
282 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
283 #[cfg(feature = "enterprise")]
284 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
285 Statement::DropFlow(stmt) => {
286 self.drop_flow(
287 query_ctx.current_catalog().to_string(),
288 format_raw_object_name(stmt.flow_name()),
289 stmt.drop_if_exists(),
290 query_ctx,
291 )
292 .await
293 }
294 #[cfg(feature = "enterprise")]
295 Statement::DropTrigger(stmt) => {
296 self.drop_trigger(
297 query_ctx.current_catalog().to_string(),
298 format_raw_object_name(stmt.trigger_name()),
299 stmt.drop_if_exists(),
300 query_ctx,
301 )
302 .await
303 }
304 Statement::CreateView(stmt) => {
305 let _ = self.create_view(stmt, query_ctx).await?;
306 Ok(Output::new_with_affected_rows(0))
307 }
308 Statement::DropView(stmt) => {
309 let (catalog_name, schema_name, view_name) =
310 table_idents_to_full_name(&stmt.view_name, &query_ctx)
311 .map_err(BoxedError::new)
312 .context(ExternalSnafu)?;
313
314 self.drop_view(
315 catalog_name,
316 schema_name,
317 view_name,
318 stmt.drop_if_exists,
319 query_ctx,
320 )
321 .await
322 }
323 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
324
325 Statement::AlterDatabase(alter_database) => {
326 self.alter_database(alter_database, query_ctx).await
327 }
328
329 #[cfg(feature = "enterprise")]
330 Statement::AlterTrigger(alter_trigger) => {
331 self.alter_trigger(alter_trigger, query_ctx).await
332 }
333
334 Statement::DropTable(stmt) => {
335 let mut table_names = Vec::with_capacity(stmt.table_names().len());
336 for table_name_stmt in stmt.table_names() {
337 let (catalog, schema, table) =
338 table_idents_to_full_name(table_name_stmt, &query_ctx)
339 .map_err(BoxedError::new)
340 .context(ExternalSnafu)?;
341 table_names.push(TableName::new(catalog, schema, table));
342 }
343 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
344 .await
345 }
346 Statement::DropDatabase(stmt) => {
347 self.drop_database(
348 query_ctx.current_catalog().to_string(),
349 format_raw_object_name(stmt.name()),
350 stmt.drop_if_exists(),
351 query_ctx,
352 )
353 .await
354 }
355 Statement::TruncateTable(stmt) => {
356 let (catalog, schema, table) =
357 table_idents_to_full_name(stmt.table_name(), &query_ctx)
358 .map_err(BoxedError::new)
359 .context(ExternalSnafu)?;
360 let table_name = TableName::new(catalog, schema, table);
361 let time_ranges = self
362 .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
363 .await?;
364 self.truncate_table(table_name, time_ranges, query_ctx)
365 .await
366 }
367 Statement::CreateDatabase(stmt) => {
368 self.create_database(
369 &format_raw_object_name(&stmt.name),
370 stmt.if_not_exists,
371 stmt.options.into_map(),
372 query_ctx,
373 )
374 .await
375 }
376 Statement::ShowCreateDatabase(show) => {
377 let (catalog, database) =
378 idents_to_full_database_name(&show.database_name, &query_ctx)
379 .map_err(BoxedError::new)
380 .context(ExternalSnafu)?;
381 let table_metadata_manager = self
382 .catalog_manager
383 .as_any()
384 .downcast_ref::<KvBackendCatalogManager>()
385 .map(|manager| manager.table_metadata_manager_ref().clone())
386 .context(UpgradeCatalogManagerRefSnafu)?;
387 let opts: HashMap<String, String> = table_metadata_manager
388 .schema_manager()
389 .get(SchemaNameKey::new(&catalog, &database))
390 .await
391 .context(TableMetadataManagerSnafu)?
392 .context(SchemaNotFoundSnafu {
393 schema_info: &database,
394 })?
395 .into_inner()
396 .into();
397
398 self.show_create_database(&database, opts.into()).await
399 }
400 Statement::ShowCreateTable(show) => {
401 let (catalog, schema, table) =
402 table_idents_to_full_name(&show.table_name, &query_ctx)
403 .map_err(BoxedError::new)
404 .context(ExternalSnafu)?;
405
406 let table_ref = self
407 .catalog_manager
408 .table(&catalog, &schema, &table, Some(&query_ctx))
409 .await
410 .context(CatalogSnafu)?
411 .context(TableNotFoundSnafu { table_name: &table })?;
412 let table_name = TableName::new(catalog, schema, table);
413
414 match show.variant {
415 ShowCreateTableVariant::Original => {
416 self.show_create_table(table_name, table_ref, query_ctx)
417 .await
418 }
419 ShowCreateTableVariant::PostgresForeignTable => {
420 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
421 .await
422 }
423 }
424 }
425 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
426 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
427 #[cfg(feature = "enterprise")]
428 Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
429 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
430 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
431 Statement::ShowColumns(show_columns) => {
432 self.show_columns(show_columns, query_ctx).await
433 }
434 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
435 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
436 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
437 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
438 Statement::Use(db) => self.use_database(db, query_ctx).await,
439 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
440 Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
441 Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
442 }
443 }
444
445 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
446 let catalog = query_ctx.current_catalog();
447 ensure!(
448 self.catalog_manager
449 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
450 .await
451 .context(CatalogSnafu)?,
452 SchemaNotFoundSnafu { schema_info: &db }
453 );
454
455 query_ctx.set_current_schema(&db);
456
457 Ok(Output::new_with_record_batches(RecordBatches::empty()))
458 }
459
460 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
461 let var_name = set_var.variable.to_string().to_uppercase();
462
463 debug!(
464 "Trying to set {}={} for session: {} ",
465 var_name,
466 set_var.value.iter().map(|e| e.to_string()).join(", "),
467 query_ctx.conn_info()
468 );
469
470 match var_name.as_str() {
471 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
472
473 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
474 set_timezone(set_var.value, query_ctx)?
475 }
476
477 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
478
479 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
483
484 "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
486
487 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
488 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
489 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
490 Channel::Postgres => {
491 warn!(
492 "Unsupported set variable {} for channel {:?}",
493 var_name,
494 query_ctx.channel()
495 );
496 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
497 }
498 _ => {
499 return NotSupportedSnafu {
500 feat: format!("Unsupported set variable {}", var_name),
501 }
502 .fail();
503 }
504 },
505 "STATEMENT_TIMEOUT" => match query_ctx.channel() {
506 Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
507 Channel::Mysql => {
508 warn!(
509 "Unsupported set variable {} for channel {:?}",
510 var_name,
511 query_ctx.channel()
512 );
513 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
514 }
515 _ => {
516 return NotSupportedSnafu {
517 feat: format!("Unsupported set variable {}", var_name),
518 }
519 .fail();
520 }
521 },
522 "SEARCH_PATH" => {
523 if query_ctx.channel() == Channel::Postgres {
524 set_search_path(set_var.value, query_ctx)?
525 } else {
526 return NotSupportedSnafu {
527 feat: format!("Unsupported set variable {}", var_name),
528 }
529 .fail();
530 }
531 }
532 _ => {
533 if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
534 {
535 warn!(
539 "Unsupported set variable {} for channel {:?}",
540 var_name,
541 query_ctx.channel()
542 );
543 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
544 } else {
545 return NotSupportedSnafu {
546 feat: format!("Unsupported set variable {}", var_name),
547 }
548 .fail();
549 }
550 }
551 }
552 Ok(Output::new_with_affected_rows(0))
553 }
554
555 #[tracing::instrument(skip_all)]
556 pub async fn plan(
557 &self,
558 stmt: &QueryStatement,
559 query_ctx: QueryContextRef,
560 ) -> Result<LogicalPlan> {
561 self.query_engine
562 .planner()
563 .plan(stmt, query_ctx)
564 .await
565 .context(PlanStatementSnafu)
566 }
567
568 #[tracing::instrument(skip_all)]
570 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
571 self.query_engine
572 .execute(plan, query_ctx)
573 .await
574 .context(ExecLogicalPlanSnafu)
575 }
576
577 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
578 self.query_engine
579 .planner()
580 .optimize(plan)
581 .context(PlanStatementSnafu)
582 }
583
584 #[tracing::instrument(skip_all)]
585 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
586 let plan = self.plan(&stmt, query_ctx.clone()).await?;
587 self.exec_plan(plan, query_ctx).await
588 }
589
590 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
591 let TableReference {
592 catalog,
593 schema,
594 table,
595 } = table_ref;
596 self.catalog_manager
597 .table(catalog, schema, table, None)
598 .await
599 .context(CatalogSnafu)?
600 .with_context(|| TableNotFoundSnafu {
601 table_name: table_ref.to_string(),
602 })
603 }
604
605 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
606 &self.procedure_executor
607 }
608
609 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
610 &self.cache_invalidator
611 }
612
613 pub async fn convert_truncate_time_ranges(
616 &self,
617 table_name: &TableName,
618 sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
619 query_ctx: &QueryContextRef,
620 ) -> Result<Vec<(Timestamp, Timestamp)>> {
621 if sql_values_time_range.is_empty() {
622 return Ok(vec![]);
623 }
624 let table = self.get_table(&table_name.table_ref()).await?;
625 let info = table.table_info();
626 let time_index_dt = info
627 .meta
628 .schema
629 .timestamp_column()
630 .context(UnexpectedSnafu {
631 violated: "Table must have a timestamp column",
632 })?;
633
634 let time_unit = time_index_dt
635 .data_type
636 .as_timestamp()
637 .with_context(|| UnexpectedSnafu {
638 violated: format!(
639 "Table {}'s time index column must be a timestamp type, found: {:?}",
640 table_name, time_index_dt
641 ),
642 })?
643 .unit();
644
645 let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
646 for (start, end) in sql_values_time_range {
647 let start = common_sql::convert::sql_value_to_value(
648 "range_start",
649 &ConcreteDataType::timestamp_datatype(time_unit),
650 start,
651 Some(&query_ctx.timezone()),
652 None,
653 false,
654 )
655 .context(SqlCommonSnafu)
656 .and_then(|v| {
657 if let datatypes::value::Value::Timestamp(t) = v {
658 Ok(t)
659 } else {
660 error::InvalidSqlSnafu {
661 err_msg: format!("Expected a timestamp value, found {v:?}"),
662 }
663 .fail()
664 }
665 })?;
666
667 let end = common_sql::convert::sql_value_to_value(
668 "range_end",
669 &ConcreteDataType::timestamp_datatype(time_unit),
670 end,
671 Some(&query_ctx.timezone()),
672 None,
673 false,
674 )
675 .context(SqlCommonSnafu)
676 .and_then(|v| {
677 if let datatypes::value::Value::Timestamp(t) = v {
678 Ok(t)
679 } else {
680 error::InvalidSqlSnafu {
681 err_msg: format!("Expected a timestamp value, found {v:?}"),
682 }
683 .fail()
684 }
685 })?;
686 time_ranges.push((start, end));
687 }
688 Ok(time_ranges)
689 }
690
691 pub(crate) fn inserter(&self) -> &InserterRef {
693 &self.inserter
694 }
695}
696
697fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
698 let CopyQueryToArgument {
699 with,
700 connection,
701 location,
702 } = stmt;
703
704 Ok(CopyQueryToRequest {
705 location,
706 with: with.into_map(),
707 connection: connection.into_map(),
708 })
709}
710
711fn verify_time_related_format(with: &OptionMap) -> Result<()> {
713 let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
714 let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
715 let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
716 let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
717
718 if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
719 ensure!(
720 time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
721 error::TimestampFormatNotSupportedSnafu {
722 format: "<unknown>".to_string(),
723 file_format: file_format.unwrap_or_default(),
724 }
725 );
726 }
727
728 for (key, format_opt) in [
729 (common_datasource::file_format::TIME_FORMAT, time_format),
730 (common_datasource::file_format::DATE_FORMAT, date_format),
731 (
732 common_datasource::file_format::TIMESTAMP_FORMAT,
733 timestamp_format,
734 ),
735 ] {
736 if let Some(format) = format_opt {
737 chrono::format::strftime::StrftimeItems::new(format)
738 .parse()
739 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
740 }
741 }
742
743 Ok(())
744}
745
746fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
747 let direction = match stmt {
748 CopyTable::To(_) => CopyDirection::Export,
749 CopyTable::From(_) => CopyDirection::Import,
750 };
751
752 let CopyTableArgument {
753 location,
754 connection,
755 with,
756 table_name,
757 limit,
758 ..
759 } = match stmt {
760 CopyTable::To(arg) => arg,
761 CopyTable::From(arg) => arg,
762 };
763 let (catalog_name, schema_name, table_name) =
764 table_idents_to_full_name(&table_name, &query_ctx)
765 .map_err(BoxedError::new)
766 .context(ExternalSnafu)?;
767
768 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
769
770 verify_time_related_format(&with)?;
771
772 let pattern = with
773 .get(common_datasource::file_format::FILE_PATTERN)
774 .map(|x| x.to_string());
775
776 Ok(CopyTableRequest {
777 catalog_name,
778 schema_name,
779 table_name,
780 location,
781 with: with.into_map(),
782 connection: connection.into_map(),
783 pattern,
784 direction,
785 timestamp_range,
786 limit,
787 })
788}
789
790fn to_copy_database_request(
793 arg: CopyDatabaseArgument,
794 query_ctx: &QueryContextRef,
795) -> Result<CopyDatabaseRequest> {
796 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
797 .map_err(BoxedError::new)
798 .context(ExternalSnafu)?;
799 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
800
801 Ok(CopyDatabaseRequest {
802 catalog_name,
803 schema_name: database_name,
804 location: arg.location,
805 with: arg.with.into_map(),
806 connection: arg.connection.into_map(),
807 time_range,
808 })
809}
810
811fn timestamp_range_from_option_map(
815 options: &OptionMap,
816 query_ctx: &QueryContextRef,
817) -> Result<Option<TimestampRange>> {
818 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
819 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
820 let time_range = match (start_timestamp, end_timestamp) {
821 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
822 error::InvalidTimestampRangeSnafu {
823 start: start.to_iso8601_string(),
824 end: end.to_iso8601_string(),
825 }
826 })?),
827 (Some(start), None) => Some(TimestampRange::from_start(start)),
828 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
830 };
831 Ok(time_range)
832}
833
834fn extract_timestamp(
836 map: &OptionMap,
837 key: &str,
838 query_ctx: &QueryContextRef,
839) -> Result<Option<Timestamp>> {
840 map.get(key)
841 .map(|v| {
842 Timestamp::from_str(v, Some(&query_ctx.timezone()))
843 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
844 })
845 .transpose()
846}
847
848fn idents_to_full_database_name(
849 obj_name: &ObjectName,
850 query_ctx: &QueryContextRef,
851) -> Result<(String, String)> {
852 match &obj_name.0[..] {
853 [database] => Ok((
854 query_ctx.current_catalog().to_owned(),
855 database.to_string_unquoted(),
856 )),
857 [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
858 _ => InvalidSqlSnafu {
859 err_msg: format!(
860 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
861 ),
862 }
863 .fail(),
864 }
865}
866
867pub struct InserterImpl {
869 statement_executor: StatementExecutorRef,
870 options: Option<InsertOptions>,
871}
872
873impl InserterImpl {
874 pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
875 Self {
876 statement_executor,
877 options,
878 }
879 }
880}
881
882#[async_trait::async_trait]
883impl Inserter for InserterImpl {
884 async fn insert_rows(
885 &self,
886 context: &client::inserter::Context<'_>,
887 requests: RowInsertRequests,
888 ) -> ClientResult<()> {
889 let mut ctx_builder = QueryContextBuilder::default()
890 .current_catalog(context.catalog.to_string())
891 .current_schema(context.schema.to_string());
892 if let Some(options) = self.options.as_ref() {
893 ctx_builder = ctx_builder
894 .set_extension(
895 TTL_KEY.to_string(),
896 format_duration(options.ttl).to_string(),
897 )
898 .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
899 }
900 let query_ctx = ctx_builder.build().into();
901
902 self.statement_executor
903 .inserter()
904 .handle_row_inserts(
905 requests,
906 query_ctx,
907 self.statement_executor.as_ref(),
908 false,
909 false,
910 )
911 .await
912 .map_err(BoxedError::new)
913 .context(ClientExternalSnafu)
914 .map(|_| ())
915 }
916
917 fn set_options(&mut self, options: &InsertOptions) {
918 self.options = Some(*options);
919 }
920}
921
922#[cfg(test)]
923mod tests {
924 use std::assert_matches::assert_matches;
925 use std::collections::HashMap;
926
927 use common_time::range::TimestampRange;
928 use common_time::{Timestamp, Timezone};
929 use session::context::QueryContextBuilder;
930 use sql::statements::OptionMap;
931
932 use crate::error;
933 use crate::statement::copy_database::{
934 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
935 };
936 use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
937
938 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
939 let query_ctx = QueryContextBuilder::default()
940 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
941 .build()
942 .into();
943 let map = OptionMap::from(
944 [
945 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
946 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
947 ]
948 .into_iter()
949 .collect::<HashMap<_, _>>(),
950 );
951 timestamp_range_from_option_map(&map, &query_ctx)
952 }
953
954 #[test]
955 fn test_timestamp_range_from_option_map() {
956 assert_eq!(
957 Some(
958 TimestampRange::new(
959 Timestamp::new_second(1649635200),
960 Timestamp::new_second(1649664000),
961 )
962 .unwrap(),
963 ),
964 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
965 );
966
967 assert_matches!(
968 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
969 error::Error::InvalidTimestampRange { .. }
970 );
971 }
972
973 #[test]
974 fn test_verify_timestamp_format() {
975 let map = OptionMap::from(
976 [
977 (
978 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
979 "%Y-%m-%d %H:%M:%S".to_string(),
980 ),
981 (
982 common_datasource::file_format::FORMAT_TYPE.to_string(),
983 "csv".to_string(),
984 ),
985 ]
986 .into_iter()
987 .collect::<HashMap<_, _>>(),
988 );
989 assert!(verify_time_related_format(&map).is_ok());
990
991 let map = OptionMap::from(
992 [
993 (
994 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
995 "%Y-%m-%d %H:%M:%S".to_string(),
996 ),
997 (
998 common_datasource::file_format::FORMAT_TYPE.to_string(),
999 "json".to_string(),
1000 ),
1001 ]
1002 .into_iter()
1003 .collect::<HashMap<_, _>>(),
1004 );
1005
1006 assert_matches!(
1007 verify_time_related_format(&map).unwrap_err(),
1008 error::Error::TimestampFormatNotSupported { .. }
1009 );
1010 let map = OptionMap::from(
1011 [
1012 (
1013 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1014 "%111112".to_string(),
1015 ),
1016 (
1017 common_datasource::file_format::FORMAT_TYPE.to_string(),
1018 "csv".to_string(),
1019 ),
1020 ]
1021 .into_iter()
1022 .collect::<HashMap<_, _>>(),
1023 );
1024
1025 assert_matches!(
1026 verify_time_related_format(&map).unwrap_err(),
1027 error::Error::InvalidCopyParameter { .. }
1028 );
1029 }
1030}