1mod admin;
16mod comment;
17mod copy_database;
18mod copy_query_to;
19mod copy_table_from;
20mod copy_table_to;
21mod cursor;
22pub mod ddl;
23mod describe;
24mod dml;
25mod kill;
26mod set;
27mod show;
28mod tql;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32
33use api::v1::RowInsertRequests;
34use catalog::CatalogManagerRef;
35use catalog::kvbackend::KvBackendCatalogManager;
36use catalog::process_manager::ProcessManagerRef;
37use client::RecordBatches;
38use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
39use client::inserter::{InsertOptions, Inserter};
40use common_error::ext::BoxedError;
41use common_meta::cache::TableRouteCacheRef;
42use common_meta::cache_invalidator::CacheInvalidatorRef;
43use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
44use common_meta::key::schema_name::SchemaNameKey;
45use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
46use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
47use common_meta::kv_backend::KvBackendRef;
48use common_meta::procedure_executor::ProcedureExecutorRef;
49use common_query::Output;
50use common_telemetry::{debug, tracing, warn};
51use common_time::Timestamp;
52use common_time::range::TimestampRange;
53use datafusion_expr::LogicalPlan;
54use datatypes::prelude::ConcreteDataType;
55use datatypes::schema::ColumnSchema;
56use humantime::format_duration;
57use itertools::Itertools;
58use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
59use query::QueryEngineRef;
60use query::parser::QueryStatement;
61use session::context::{Channel, QueryContextBuilder, QueryContextRef};
62use session::table_name::table_idents_to_full_name;
63use set::{set_query_timeout, set_read_preference};
64use snafu::{OptionExt, ResultExt, ensure};
65use sql::ast::ObjectNamePartExt;
66use sql::statements::OptionMap;
67use sql::statements::copy::{
68 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
69};
70use sql::statements::set_variables::SetVariables;
71use sql::statements::show::ShowCreateTableVariant;
72use sql::statements::statement::Statement;
73use sql::util::format_raw_object_name;
74use sqlparser::ast::ObjectName;
75use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
76use table::TableRef;
77use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
78use table::table_name::TableName;
79use table::table_reference::TableReference;
80
81use self::set::{
82 set_bytea_output, set_datestyle, set_intervalstyle, set_search_path, set_timezone,
83 validate_client_encoding,
84};
85use crate::error::{
86 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
87 PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
88 TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
89};
90use crate::insert::InserterRef;
91use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
92use crate::statement::set::set_allow_query_fallback;
93
94#[async_trait::async_trait]
96pub trait StatementExecutorConfigurator: Send + Sync {
97 async fn configure(
98 &self,
99 executor: StatementExecutor,
100 ctx: ExecutorConfigureContext,
101 ) -> std::result::Result<StatementExecutor, BoxedError>;
102}
103
104pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
105
106pub struct ExecutorConfigureContext {
107 pub kv_backend: KvBackendRef,
108}
109
110#[derive(Clone)]
111pub struct StatementExecutor {
112 catalog_manager: CatalogManagerRef,
113 query_engine: QueryEngineRef,
114 procedure_executor: ProcedureExecutorRef,
115 table_metadata_manager: TableMetadataManagerRef,
116 flow_metadata_manager: FlowMetadataManagerRef,
117 view_info_manager: ViewInfoManagerRef,
118 partition_manager: PartitionRuleManagerRef,
119 cache_invalidator: CacheInvalidatorRef,
120 inserter: InserterRef,
121 process_manager: Option<ProcessManagerRef>,
122 #[cfg(feature = "enterprise")]
123 trigger_querier: Option<TriggerQuerierRef>,
124}
125
126pub type StatementExecutorRef = Arc<StatementExecutor>;
127
128#[cfg(feature = "enterprise")]
130#[async_trait::async_trait]
131pub trait TriggerQuerier: Send + Sync {
132 async fn show_create_trigger(
134 &self,
135 catalog: &str,
136 trigger: &str,
137 query_ctx: &QueryContextRef,
138 ) -> std::result::Result<Output, BoxedError>;
139
140 fn as_any(&self) -> &dyn std::any::Any;
141}
142
143#[cfg(feature = "enterprise")]
144pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
145
146impl StatementExecutor {
147 #[allow(clippy::too_many_arguments)]
148 pub fn new(
149 catalog_manager: CatalogManagerRef,
150 query_engine: QueryEngineRef,
151 procedure_executor: ProcedureExecutorRef,
152 kv_backend: KvBackendRef,
153 cache_invalidator: CacheInvalidatorRef,
154 inserter: InserterRef,
155 table_route_cache: TableRouteCacheRef,
156 process_manager: Option<ProcessManagerRef>,
157 ) -> Self {
158 Self {
159 catalog_manager,
160 query_engine,
161 procedure_executor,
162 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
163 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
164 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
165 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
166 cache_invalidator,
167 inserter,
168 process_manager,
169 #[cfg(feature = "enterprise")]
170 trigger_querier: None,
171 }
172 }
173
174 #[cfg(feature = "enterprise")]
175 pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
176 self.trigger_querier = Some(querier);
177 self
178 }
179
180 #[cfg(feature = "testing")]
181 pub async fn execute_stmt(
182 &self,
183 stmt: QueryStatement,
184 query_ctx: QueryContextRef,
185 ) -> Result<Output> {
186 match stmt {
187 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
188 QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
189 }
190 }
191
192 #[tracing::instrument(skip_all)]
193 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
194 match stmt {
195 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
196 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
197 }
198
199 Statement::DeclareCursor(declare_cursor) => {
200 self.declare_cursor(declare_cursor, query_ctx).await
201 }
202 Statement::FetchCursor(fetch_cursor) => {
203 self.fetch_cursor(fetch_cursor, query_ctx).await
204 }
205 Statement::CloseCursor(close_cursor) => {
206 self.close_cursor(close_cursor, query_ctx).await
207 }
208
209 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
210
211 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
212
213 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
214
215 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
216
217 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
218
219 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
220
221 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
222
223 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
224
225 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
226
227 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
228
229 #[cfg(feature = "enterprise")]
230 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
231
232 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
233 let query_output = self
234 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
235 .await?;
236 let req = to_copy_query_request(stmt.arg)?;
237
238 self.copy_query_to(req, query_output)
239 .await
240 .map(Output::new_with_affected_rows)
241 }
242
243 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
244 let req = to_copy_table_request(stmt, query_ctx.clone())?;
245 match req.direction {
246 CopyDirection::Export => self
247 .copy_table_to(req, query_ctx)
248 .await
249 .map(Output::new_with_affected_rows),
250 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
251 }
252 }
253
254 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
255 match copy_database {
256 CopyDatabase::To(arg) => {
257 self.copy_database_to(
258 to_copy_database_request(arg, &query_ctx)?,
259 query_ctx.clone(),
260 )
261 .await
262 }
263 CopyDatabase::From(arg) => {
264 self.copy_database_from(
265 to_copy_database_request(arg, &query_ctx)?,
266 query_ctx,
267 )
268 .await
269 }
270 }
271 }
272
273 Statement::CreateTable(stmt) => {
274 let _ = self.create_table(stmt, query_ctx).await?;
275 Ok(Output::new_with_affected_rows(0))
276 }
277 Statement::CreateTableLike(stmt) => {
278 let _ = self.create_table_like(stmt, query_ctx).await?;
279 Ok(Output::new_with_affected_rows(0))
280 }
281 Statement::CreateExternalTable(stmt) => {
282 let _ = self.create_external_table(stmt, query_ctx).await?;
283 Ok(Output::new_with_affected_rows(0))
284 }
285 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
286 #[cfg(feature = "enterprise")]
287 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
288 Statement::DropFlow(stmt) => {
289 self.drop_flow(
290 query_ctx.current_catalog().to_string(),
291 format_raw_object_name(stmt.flow_name()),
292 stmt.drop_if_exists(),
293 query_ctx,
294 )
295 .await
296 }
297 #[cfg(feature = "enterprise")]
298 Statement::DropTrigger(stmt) => {
299 self.drop_trigger(
300 query_ctx.current_catalog().to_string(),
301 format_raw_object_name(stmt.trigger_name()),
302 stmt.drop_if_exists(),
303 query_ctx,
304 )
305 .await
306 }
307 Statement::CreateView(stmt) => {
308 let _ = self.create_view(stmt, query_ctx).await?;
309 Ok(Output::new_with_affected_rows(0))
310 }
311 Statement::DropView(stmt) => {
312 let (catalog_name, schema_name, view_name) =
313 table_idents_to_full_name(&stmt.view_name, &query_ctx)
314 .map_err(BoxedError::new)
315 .context(ExternalSnafu)?;
316
317 self.drop_view(
318 catalog_name,
319 schema_name,
320 view_name,
321 stmt.drop_if_exists,
322 query_ctx,
323 )
324 .await
325 }
326 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
327
328 Statement::AlterDatabase(alter_database) => {
329 self.alter_database(alter_database, query_ctx).await
330 }
331
332 #[cfg(feature = "enterprise")]
333 Statement::AlterTrigger(alter_trigger) => {
334 self.alter_trigger(alter_trigger, query_ctx).await
335 }
336
337 Statement::DropTable(stmt) => {
338 let mut table_names = Vec::with_capacity(stmt.table_names().len());
339 for table_name_stmt in stmt.table_names() {
340 let (catalog, schema, table) =
341 table_idents_to_full_name(table_name_stmt, &query_ctx)
342 .map_err(BoxedError::new)
343 .context(ExternalSnafu)?;
344 table_names.push(TableName::new(catalog, schema, table));
345 }
346 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
347 .await
348 }
349 Statement::DropDatabase(stmt) => {
350 self.drop_database(
351 query_ctx.current_catalog().to_string(),
352 format_raw_object_name(stmt.name()),
353 stmt.drop_if_exists(),
354 query_ctx,
355 )
356 .await
357 }
358 Statement::TruncateTable(stmt) => {
359 let (catalog, schema, table) =
360 table_idents_to_full_name(stmt.table_name(), &query_ctx)
361 .map_err(BoxedError::new)
362 .context(ExternalSnafu)?;
363 let table_name = TableName::new(catalog, schema, table);
364 let time_ranges = self
365 .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
366 .await?;
367 self.truncate_table(table_name, time_ranges, query_ctx)
368 .await
369 }
370 Statement::CreateDatabase(stmt) => {
371 self.create_database(
372 &format_raw_object_name(&stmt.name),
373 stmt.if_not_exists,
374 stmt.options.into_map(),
375 query_ctx,
376 )
377 .await
378 }
379 Statement::ShowCreateDatabase(show) => {
380 let (catalog, database) =
381 idents_to_full_database_name(&show.database_name, &query_ctx)
382 .map_err(BoxedError::new)
383 .context(ExternalSnafu)?;
384 let table_metadata_manager = self
385 .catalog_manager
386 .as_any()
387 .downcast_ref::<KvBackendCatalogManager>()
388 .map(|manager| manager.table_metadata_manager_ref().clone())
389 .context(UpgradeCatalogManagerRefSnafu)?;
390 let opts: HashMap<String, String> = table_metadata_manager
391 .schema_manager()
392 .get(SchemaNameKey::new(&catalog, &database))
393 .await
394 .context(TableMetadataManagerSnafu)?
395 .context(SchemaNotFoundSnafu {
396 schema_info: &database,
397 })?
398 .into_inner()
399 .into();
400
401 self.show_create_database(&database, opts.into()).await
402 }
403 Statement::ShowCreateTable(show) => {
404 let (catalog, schema, table) =
405 table_idents_to_full_name(&show.table_name, &query_ctx)
406 .map_err(BoxedError::new)
407 .context(ExternalSnafu)?;
408
409 let table_ref = self
410 .catalog_manager
411 .table(&catalog, &schema, &table, Some(&query_ctx))
412 .await
413 .context(CatalogSnafu)?
414 .context(TableNotFoundSnafu { table_name: &table })?;
415 let table_name = TableName::new(catalog, schema, table);
416
417 match show.variant {
418 ShowCreateTableVariant::Original => {
419 self.show_create_table(table_name, table_ref, query_ctx)
420 .await
421 }
422 ShowCreateTableVariant::PostgresForeignTable => {
423 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
424 .await
425 }
426 }
427 }
428 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
429 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
430 #[cfg(feature = "enterprise")]
431 Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
432 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
433 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
434 Statement::Comment(stmt) => self.comment(stmt, query_ctx).await,
435 Statement::ShowColumns(show_columns) => {
436 self.show_columns(show_columns, query_ctx).await
437 }
438 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
439 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
440 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
441 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
442 Statement::Use(db) => self.use_database(db, query_ctx).await,
443 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
444 Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
445 Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
446 }
447 }
448
449 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
450 let catalog = query_ctx.current_catalog();
451 ensure!(
452 self.catalog_manager
453 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
454 .await
455 .context(CatalogSnafu)?,
456 SchemaNotFoundSnafu { schema_info: &db }
457 );
458
459 query_ctx.set_current_schema(&db);
460
461 Ok(Output::new_with_record_batches(RecordBatches::empty()))
462 }
463
464 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
465 let var_name = set_var.variable.to_string().to_uppercase();
466
467 debug!(
468 "Trying to set {}={} for session: {} ",
469 var_name,
470 set_var.value.iter().map(|e| e.to_string()).join(", "),
471 query_ctx.conn_info()
472 );
473
474 match var_name.as_str() {
475 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
476
477 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
478 set_timezone(set_var.value, query_ctx)?
479 }
480
481 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
482
483 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
487 "INTERVALSTYLE" => set_intervalstyle(set_var.value, query_ctx)?,
488
489 "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
491
492 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
493 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
494 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
495 Channel::Postgres => {
496 warn!(
497 "Unsupported set variable {} for channel {:?}",
498 var_name,
499 query_ctx.channel()
500 );
501 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
502 }
503 _ => {
504 return NotSupportedSnafu {
505 feat: format!("Unsupported set variable {}", var_name),
506 }
507 .fail();
508 }
509 },
510 "STATEMENT_TIMEOUT" => match query_ctx.channel() {
511 Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
512 Channel::Mysql => {
513 warn!(
514 "Unsupported set variable {} for channel {:?}",
515 var_name,
516 query_ctx.channel()
517 );
518 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
519 }
520 _ => {
521 return NotSupportedSnafu {
522 feat: format!("Unsupported set variable {}", var_name),
523 }
524 .fail();
525 }
526 },
527 "SEARCH_PATH" => {
528 if query_ctx.channel() == Channel::Postgres {
529 set_search_path(set_var.value, query_ctx)?
530 } else {
531 return NotSupportedSnafu {
532 feat: format!("Unsupported set variable {}", var_name),
533 }
534 .fail();
535 }
536 }
537 _ => {
538 if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
539 {
540 warn!(
544 "Unsupported set variable {} for channel {:?}",
545 var_name,
546 query_ctx.channel()
547 );
548 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
549 } else {
550 return NotSupportedSnafu {
551 feat: format!("Unsupported set variable {}", var_name),
552 }
553 .fail();
554 }
555 }
556 }
557 Ok(Output::new_with_affected_rows(0))
558 }
559
560 #[tracing::instrument(skip_all)]
561 pub async fn plan(
562 &self,
563 stmt: &QueryStatement,
564 query_ctx: QueryContextRef,
565 ) -> Result<LogicalPlan> {
566 self.query_engine
567 .planner()
568 .plan(stmt, query_ctx)
569 .await
570 .context(PlanStatementSnafu)
571 }
572
573 #[tracing::instrument(skip_all)]
575 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
576 self.query_engine
577 .execute(plan, query_ctx)
578 .await
579 .context(ExecLogicalPlanSnafu)
580 }
581
582 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
583 self.query_engine
584 .planner()
585 .optimize(plan)
586 .context(PlanStatementSnafu)
587 }
588
589 #[tracing::instrument(skip_all)]
590 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
591 let plan = self.plan(&stmt, query_ctx.clone()).await?;
592 self.exec_plan(plan, query_ctx).await
593 }
594
595 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
596 let TableReference {
597 catalog,
598 schema,
599 table,
600 } = table_ref;
601 self.catalog_manager
602 .table(catalog, schema, table, None)
603 .await
604 .context(CatalogSnafu)?
605 .with_context(|| TableNotFoundSnafu {
606 table_name: table_ref.to_string(),
607 })
608 }
609
610 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
611 &self.procedure_executor
612 }
613
614 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
615 &self.cache_invalidator
616 }
617
618 pub async fn convert_truncate_time_ranges(
621 &self,
622 table_name: &TableName,
623 sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
624 query_ctx: &QueryContextRef,
625 ) -> Result<Vec<(Timestamp, Timestamp)>> {
626 if sql_values_time_range.is_empty() {
627 return Ok(vec![]);
628 }
629 let table = self.get_table(&table_name.table_ref()).await?;
630 let info = table.table_info();
631 let time_index_dt = info
632 .meta
633 .schema
634 .timestamp_column()
635 .context(UnexpectedSnafu {
636 violated: "Table must have a timestamp column",
637 })?;
638
639 let time_unit = time_index_dt
640 .data_type
641 .as_timestamp()
642 .with_context(|| UnexpectedSnafu {
643 violated: format!(
644 "Table {}'s time index column must be a timestamp type, found: {:?}",
645 table_name, time_index_dt
646 ),
647 })?
648 .unit();
649
650 let start_column = ColumnSchema::new(
651 "range_start",
652 ConcreteDataType::timestamp_datatype(time_unit),
653 false,
654 );
655 let end_column = ColumnSchema::new(
656 "range_end",
657 ConcreteDataType::timestamp_datatype(time_unit),
658 false,
659 );
660 let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
661 for (start, end) in sql_values_time_range {
662 let start = common_sql::convert::sql_value_to_value(
663 &start_column,
664 start,
665 Some(&query_ctx.timezone()),
666 None,
667 false,
668 )
669 .context(SqlCommonSnafu)
670 .and_then(|v| {
671 if let datatypes::value::Value::Timestamp(t) = v {
672 Ok(t)
673 } else {
674 error::InvalidSqlSnafu {
675 err_msg: format!("Expected a timestamp value, found {v:?}"),
676 }
677 .fail()
678 }
679 })?;
680
681 let end = common_sql::convert::sql_value_to_value(
682 &end_column,
683 end,
684 Some(&query_ctx.timezone()),
685 None,
686 false,
687 )
688 .context(SqlCommonSnafu)
689 .and_then(|v| {
690 if let datatypes::value::Value::Timestamp(t) = v {
691 Ok(t)
692 } else {
693 error::InvalidSqlSnafu {
694 err_msg: format!("Expected a timestamp value, found {v:?}"),
695 }
696 .fail()
697 }
698 })?;
699 time_ranges.push((start, end));
700 }
701 Ok(time_ranges)
702 }
703
704 pub(crate) fn inserter(&self) -> &InserterRef {
706 &self.inserter
707 }
708}
709
710fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
711 let CopyQueryToArgument {
712 with,
713 connection,
714 location,
715 } = stmt;
716
717 Ok(CopyQueryToRequest {
718 location,
719 with: with.into_map(),
720 connection: connection.into_map(),
721 })
722}
723
724fn verify_time_related_format(with: &OptionMap) -> Result<()> {
726 let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
727 let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
728 let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
729 let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
730
731 if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
732 ensure!(
733 time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
734 error::TimestampFormatNotSupportedSnafu {
735 format: "<unknown>".to_string(),
736 file_format: file_format.unwrap_or_default(),
737 }
738 );
739 }
740
741 for (key, format_opt) in [
742 (common_datasource::file_format::TIME_FORMAT, time_format),
743 (common_datasource::file_format::DATE_FORMAT, date_format),
744 (
745 common_datasource::file_format::TIMESTAMP_FORMAT,
746 timestamp_format,
747 ),
748 ] {
749 if let Some(format) = format_opt {
750 chrono::format::strftime::StrftimeItems::new(format)
751 .parse()
752 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
753 }
754 }
755
756 Ok(())
757}
758
759fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
760 let direction = match stmt {
761 CopyTable::To(_) => CopyDirection::Export,
762 CopyTable::From(_) => CopyDirection::Import,
763 };
764
765 let CopyTableArgument {
766 location,
767 connection,
768 with,
769 table_name,
770 limit,
771 ..
772 } = match stmt {
773 CopyTable::To(arg) => arg,
774 CopyTable::From(arg) => arg,
775 };
776 let (catalog_name, schema_name, table_name) =
777 table_idents_to_full_name(&table_name, &query_ctx)
778 .map_err(BoxedError::new)
779 .context(ExternalSnafu)?;
780
781 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
782
783 verify_time_related_format(&with)?;
784
785 let pattern = with
786 .get(common_datasource::file_format::FILE_PATTERN)
787 .map(|x| x.to_string());
788
789 Ok(CopyTableRequest {
790 catalog_name,
791 schema_name,
792 table_name,
793 location,
794 with: with.into_map(),
795 connection: connection.into_map(),
796 pattern,
797 direction,
798 timestamp_range,
799 limit,
800 })
801}
802
803fn to_copy_database_request(
806 arg: CopyDatabaseArgument,
807 query_ctx: &QueryContextRef,
808) -> Result<CopyDatabaseRequest> {
809 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
810 .map_err(BoxedError::new)
811 .context(ExternalSnafu)?;
812 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
813
814 Ok(CopyDatabaseRequest {
815 catalog_name,
816 schema_name: database_name,
817 location: arg.location,
818 with: arg.with.into_map(),
819 connection: arg.connection.into_map(),
820 time_range,
821 })
822}
823
824fn timestamp_range_from_option_map(
828 options: &OptionMap,
829 query_ctx: &QueryContextRef,
830) -> Result<Option<TimestampRange>> {
831 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
832 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
833 let time_range = match (start_timestamp, end_timestamp) {
834 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
835 error::InvalidTimestampRangeSnafu {
836 start: start.to_iso8601_string(),
837 end: end.to_iso8601_string(),
838 }
839 })?),
840 (Some(start), None) => Some(TimestampRange::from_start(start)),
841 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
843 };
844 Ok(time_range)
845}
846
847fn extract_timestamp(
849 map: &OptionMap,
850 key: &str,
851 query_ctx: &QueryContextRef,
852) -> Result<Option<Timestamp>> {
853 map.get(key)
854 .map(|v| {
855 Timestamp::from_str(v, Some(&query_ctx.timezone()))
856 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
857 })
858 .transpose()
859}
860
861fn idents_to_full_database_name(
862 obj_name: &ObjectName,
863 query_ctx: &QueryContextRef,
864) -> Result<(String, String)> {
865 match &obj_name.0[..] {
866 [database] => Ok((
867 query_ctx.current_catalog().to_owned(),
868 database.to_string_unquoted(),
869 )),
870 [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
871 _ => InvalidSqlSnafu {
872 err_msg: format!(
873 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
874 ),
875 }
876 .fail(),
877 }
878}
879
880pub struct InserterImpl {
882 statement_executor: StatementExecutorRef,
883 options: Option<InsertOptions>,
884}
885
886impl InserterImpl {
887 pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
888 Self {
889 statement_executor,
890 options,
891 }
892 }
893}
894
895#[async_trait::async_trait]
896impl Inserter for InserterImpl {
897 async fn insert_rows(
898 &self,
899 context: &client::inserter::Context<'_>,
900 requests: RowInsertRequests,
901 ) -> ClientResult<()> {
902 let mut ctx_builder = QueryContextBuilder::default()
903 .current_catalog(context.catalog.to_string())
904 .current_schema(context.schema.to_string());
905 if let Some(options) = self.options.as_ref() {
906 ctx_builder = ctx_builder
907 .set_extension(
908 TTL_KEY.to_string(),
909 format_duration(options.ttl).to_string(),
910 )
911 .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
912 }
913 let query_ctx = ctx_builder.build().into();
914
915 self.statement_executor
916 .inserter()
917 .handle_row_inserts(
918 requests,
919 query_ctx,
920 self.statement_executor.as_ref(),
921 false,
922 false,
923 )
924 .await
925 .map_err(BoxedError::new)
926 .context(ClientExternalSnafu)
927 .map(|_| ())
928 }
929
930 fn set_options(&mut self, options: &InsertOptions) {
931 self.options = Some(*options);
932 }
933}
934
935#[cfg(test)]
936mod tests {
937 use std::assert_matches::assert_matches;
938 use std::collections::HashMap;
939
940 use common_time::range::TimestampRange;
941 use common_time::{Timestamp, Timezone};
942 use session::context::QueryContextBuilder;
943 use sql::statements::OptionMap;
944
945 use crate::error;
946 use crate::statement::copy_database::{
947 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
948 };
949 use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
950
951 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
952 let query_ctx = QueryContextBuilder::default()
953 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
954 .build()
955 .into();
956 let map = OptionMap::from(
957 [
958 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
959 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
960 ]
961 .into_iter()
962 .collect::<HashMap<_, _>>(),
963 );
964 timestamp_range_from_option_map(&map, &query_ctx)
965 }
966
967 #[test]
968 fn test_timestamp_range_from_option_map() {
969 assert_eq!(
970 Some(
971 TimestampRange::new(
972 Timestamp::new_second(1649635200),
973 Timestamp::new_second(1649664000),
974 )
975 .unwrap(),
976 ),
977 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
978 );
979
980 assert_matches!(
981 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
982 error::Error::InvalidTimestampRange { .. }
983 );
984 }
985
986 #[test]
987 fn test_verify_timestamp_format() {
988 let map = OptionMap::from(
989 [
990 (
991 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
992 "%Y-%m-%d %H:%M:%S".to_string(),
993 ),
994 (
995 common_datasource::file_format::FORMAT_TYPE.to_string(),
996 "csv".to_string(),
997 ),
998 ]
999 .into_iter()
1000 .collect::<HashMap<_, _>>(),
1001 );
1002 assert!(verify_time_related_format(&map).is_ok());
1003
1004 let map = OptionMap::from(
1005 [
1006 (
1007 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1008 "%Y-%m-%d %H:%M:%S".to_string(),
1009 ),
1010 (
1011 common_datasource::file_format::FORMAT_TYPE.to_string(),
1012 "json".to_string(),
1013 ),
1014 ]
1015 .into_iter()
1016 .collect::<HashMap<_, _>>(),
1017 );
1018
1019 assert_matches!(
1020 verify_time_related_format(&map).unwrap_err(),
1021 error::Error::TimestampFormatNotSupported { .. }
1022 );
1023 let map = OptionMap::from(
1024 [
1025 (
1026 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1027 "%111112".to_string(),
1028 ),
1029 (
1030 common_datasource::file_format::FORMAT_TYPE.to_string(),
1031 "csv".to_string(),
1032 ),
1033 ]
1034 .into_iter()
1035 .collect::<HashMap<_, _>>(),
1036 );
1037
1038 assert_matches!(
1039 verify_time_related_format(&map).unwrap_err(),
1040 error::Error::InvalidCopyParameter { .. }
1041 );
1042 }
1043}