1mod admin;
16mod copy_database;
17mod copy_query_to;
18mod copy_table_from;
19mod copy_table_to;
20mod cursor;
21pub mod ddl;
22mod describe;
23mod dml;
24mod kill;
25mod set;
26mod show;
27mod tql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::v1::RowInsertRequests;
33use catalog::CatalogManagerRef;
34use catalog::kvbackend::KvBackendCatalogManager;
35use catalog::process_manager::ProcessManagerRef;
36use client::RecordBatches;
37use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
38use client::inserter::{InsertOptions, Inserter};
39use common_error::ext::BoxedError;
40use common_meta::cache::TableRouteCacheRef;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
43use common_meta::key::schema_name::SchemaNameKey;
44use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
45use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
46use common_meta::kv_backend::KvBackendRef;
47use common_meta::procedure_executor::ProcedureExecutorRef;
48use common_query::Output;
49use common_telemetry::tracing;
50use common_time::Timestamp;
51use common_time::range::TimestampRange;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use humantime::format_duration;
55use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
56use query::QueryEngineRef;
57use query::parser::QueryStatement;
58use session::context::{Channel, QueryContextBuilder, QueryContextRef};
59use session::table_name::table_idents_to_full_name;
60use set::{set_query_timeout, set_read_preference};
61use snafu::{OptionExt, ResultExt, ensure};
62use sql::ast::ObjectNamePartExt;
63use sql::statements::OptionMap;
64use sql::statements::copy::{
65 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
66};
67use sql::statements::set_variables::SetVariables;
68use sql::statements::show::ShowCreateTableVariant;
69use sql::statements::statement::Statement;
70use sql::util::format_raw_object_name;
71use sqlparser::ast::ObjectName;
72use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
73use table::TableRef;
74use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
75use table::table_name::TableName;
76use table::table_reference::TableReference;
77
78use self::set::{
79 set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
80};
81use crate::error::{
82 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
83 PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
84 TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
85};
86use crate::insert::InserterRef;
87use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
88use crate::statement::set::set_allow_query_fallback;
89
90#[derive(Clone)]
91pub struct StatementExecutor {
92 catalog_manager: CatalogManagerRef,
93 query_engine: QueryEngineRef,
94 procedure_executor: ProcedureExecutorRef,
95 table_metadata_manager: TableMetadataManagerRef,
96 flow_metadata_manager: FlowMetadataManagerRef,
97 view_info_manager: ViewInfoManagerRef,
98 partition_manager: PartitionRuleManagerRef,
99 cache_invalidator: CacheInvalidatorRef,
100 inserter: InserterRef,
101 process_manager: Option<ProcessManagerRef>,
102 #[cfg(feature = "enterprise")]
103 trigger_querier: Option<TriggerQuerierRef>,
104}
105
106pub type StatementExecutorRef = Arc<StatementExecutor>;
107
108#[cfg(feature = "enterprise")]
110pub trait TriggerQuerierFactory: Send + Sync {
111 fn create(&self, kv_backend: KvBackendRef) -> TriggerQuerierRef;
112}
113
114#[cfg(feature = "enterprise")]
115pub type TriggerQuerierFactoryRef = Arc<dyn TriggerQuerierFactory>;
116
117#[cfg(feature = "enterprise")]
119#[async_trait::async_trait]
120pub trait TriggerQuerier: Send + Sync {
121 async fn show_create_trigger(
123 &self,
124 catalog: &str,
125 trigger: &str,
126 query_ctx: &QueryContextRef,
127 ) -> std::result::Result<Output, BoxedError>;
128
129 fn as_any(&self) -> &dyn std::any::Any;
130}
131
132#[cfg(feature = "enterprise")]
133pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
134
135impl StatementExecutor {
136 #[allow(clippy::too_many_arguments)]
137 pub fn new(
138 catalog_manager: CatalogManagerRef,
139 query_engine: QueryEngineRef,
140 procedure_executor: ProcedureExecutorRef,
141 kv_backend: KvBackendRef,
142 cache_invalidator: CacheInvalidatorRef,
143 inserter: InserterRef,
144 table_route_cache: TableRouteCacheRef,
145 process_manager: Option<ProcessManagerRef>,
146 ) -> Self {
147 Self {
148 catalog_manager,
149 query_engine,
150 procedure_executor,
151 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
152 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
153 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
154 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
155 cache_invalidator,
156 inserter,
157 process_manager,
158 #[cfg(feature = "enterprise")]
159 trigger_querier: None,
160 }
161 }
162
163 #[cfg(feature = "enterprise")]
164 pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
165 self.trigger_querier = Some(querier);
166 self
167 }
168
169 #[cfg(feature = "testing")]
170 pub async fn execute_stmt(
171 &self,
172 stmt: QueryStatement,
173 query_ctx: QueryContextRef,
174 ) -> Result<Output> {
175 match stmt {
176 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
177 QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
178 }
179 }
180
181 #[tracing::instrument(skip_all)]
182 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
183 match stmt {
184 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
185 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
186 }
187
188 Statement::DeclareCursor(declare_cursor) => {
189 self.declare_cursor(declare_cursor, query_ctx).await
190 }
191 Statement::FetchCursor(fetch_cursor) => {
192 self.fetch_cursor(fetch_cursor, query_ctx).await
193 }
194 Statement::CloseCursor(close_cursor) => {
195 self.close_cursor(close_cursor, query_ctx).await
196 }
197
198 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
199
200 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
201
202 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
203
204 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
205
206 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
207
208 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
209
210 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
211
212 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
213
214 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
215
216 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
217
218 #[cfg(feature = "enterprise")]
219 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
220
221 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
222 let query_output = self
223 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
224 .await?;
225 let req = to_copy_query_request(stmt.arg)?;
226
227 self.copy_query_to(req, query_output)
228 .await
229 .map(Output::new_with_affected_rows)
230 }
231
232 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
233 let req = to_copy_table_request(stmt, query_ctx.clone())?;
234 match req.direction {
235 CopyDirection::Export => self
236 .copy_table_to(req, query_ctx)
237 .await
238 .map(Output::new_with_affected_rows),
239 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
240 }
241 }
242
243 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
244 match copy_database {
245 CopyDatabase::To(arg) => {
246 self.copy_database_to(
247 to_copy_database_request(arg, &query_ctx)?,
248 query_ctx.clone(),
249 )
250 .await
251 }
252 CopyDatabase::From(arg) => {
253 self.copy_database_from(
254 to_copy_database_request(arg, &query_ctx)?,
255 query_ctx,
256 )
257 .await
258 }
259 }
260 }
261
262 Statement::CreateTable(stmt) => {
263 let _ = self.create_table(stmt, query_ctx).await?;
264 Ok(Output::new_with_affected_rows(0))
265 }
266 Statement::CreateTableLike(stmt) => {
267 let _ = self.create_table_like(stmt, query_ctx).await?;
268 Ok(Output::new_with_affected_rows(0))
269 }
270 Statement::CreateExternalTable(stmt) => {
271 let _ = self.create_external_table(stmt, query_ctx).await?;
272 Ok(Output::new_with_affected_rows(0))
273 }
274 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
275 #[cfg(feature = "enterprise")]
276 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
277 Statement::DropFlow(stmt) => {
278 self.drop_flow(
279 query_ctx.current_catalog().to_string(),
280 format_raw_object_name(stmt.flow_name()),
281 stmt.drop_if_exists(),
282 query_ctx,
283 )
284 .await
285 }
286 #[cfg(feature = "enterprise")]
287 Statement::DropTrigger(stmt) => {
288 self.drop_trigger(
289 query_ctx.current_catalog().to_string(),
290 format_raw_object_name(stmt.trigger_name()),
291 stmt.drop_if_exists(),
292 query_ctx,
293 )
294 .await
295 }
296 Statement::CreateView(stmt) => {
297 let _ = self.create_view(stmt, query_ctx).await?;
298 Ok(Output::new_with_affected_rows(0))
299 }
300 Statement::DropView(stmt) => {
301 let (catalog_name, schema_name, view_name) =
302 table_idents_to_full_name(&stmt.view_name, &query_ctx)
303 .map_err(BoxedError::new)
304 .context(ExternalSnafu)?;
305
306 self.drop_view(
307 catalog_name,
308 schema_name,
309 view_name,
310 stmt.drop_if_exists,
311 query_ctx,
312 )
313 .await
314 }
315 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
316
317 Statement::AlterDatabase(alter_database) => {
318 self.alter_database(alter_database, query_ctx).await
319 }
320
321 #[cfg(feature = "enterprise")]
322 Statement::AlterTrigger(alter_trigger) => {
323 self.alter_trigger(alter_trigger, query_ctx).await
324 }
325
326 Statement::DropTable(stmt) => {
327 let mut table_names = Vec::with_capacity(stmt.table_names().len());
328 for table_name_stmt in stmt.table_names() {
329 let (catalog, schema, table) =
330 table_idents_to_full_name(table_name_stmt, &query_ctx)
331 .map_err(BoxedError::new)
332 .context(ExternalSnafu)?;
333 table_names.push(TableName::new(catalog, schema, table));
334 }
335 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
336 .await
337 }
338 Statement::DropDatabase(stmt) => {
339 self.drop_database(
340 query_ctx.current_catalog().to_string(),
341 format_raw_object_name(stmt.name()),
342 stmt.drop_if_exists(),
343 query_ctx,
344 )
345 .await
346 }
347 Statement::TruncateTable(stmt) => {
348 let (catalog, schema, table) =
349 table_idents_to_full_name(stmt.table_name(), &query_ctx)
350 .map_err(BoxedError::new)
351 .context(ExternalSnafu)?;
352 let table_name = TableName::new(catalog, schema, table);
353 let time_ranges = self
354 .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
355 .await?;
356 self.truncate_table(table_name, time_ranges, query_ctx)
357 .await
358 }
359 Statement::CreateDatabase(stmt) => {
360 self.create_database(
361 &format_raw_object_name(&stmt.name),
362 stmt.if_not_exists,
363 stmt.options.into_map(),
364 query_ctx,
365 )
366 .await
367 }
368 Statement::ShowCreateDatabase(show) => {
369 let (catalog, database) =
370 idents_to_full_database_name(&show.database_name, &query_ctx)
371 .map_err(BoxedError::new)
372 .context(ExternalSnafu)?;
373 let table_metadata_manager = self
374 .catalog_manager
375 .as_any()
376 .downcast_ref::<KvBackendCatalogManager>()
377 .map(|manager| manager.table_metadata_manager_ref().clone())
378 .context(UpgradeCatalogManagerRefSnafu)?;
379 let opts: HashMap<String, String> = table_metadata_manager
380 .schema_manager()
381 .get(SchemaNameKey::new(&catalog, &database))
382 .await
383 .context(TableMetadataManagerSnafu)?
384 .context(SchemaNotFoundSnafu {
385 schema_info: &database,
386 })?
387 .into_inner()
388 .into();
389
390 self.show_create_database(&database, opts.into()).await
391 }
392 Statement::ShowCreateTable(show) => {
393 let (catalog, schema, table) =
394 table_idents_to_full_name(&show.table_name, &query_ctx)
395 .map_err(BoxedError::new)
396 .context(ExternalSnafu)?;
397
398 let table_ref = self
399 .catalog_manager
400 .table(&catalog, &schema, &table, Some(&query_ctx))
401 .await
402 .context(CatalogSnafu)?
403 .context(TableNotFoundSnafu { table_name: &table })?;
404 let table_name = TableName::new(catalog, schema, table);
405
406 match show.variant {
407 ShowCreateTableVariant::Original => {
408 self.show_create_table(table_name, table_ref, query_ctx)
409 .await
410 }
411 ShowCreateTableVariant::PostgresForeignTable => {
412 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
413 .await
414 }
415 }
416 }
417 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
418 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
419 #[cfg(feature = "enterprise")]
420 Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
421 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
422 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
423 Statement::ShowColumns(show_columns) => {
424 self.show_columns(show_columns, query_ctx).await
425 }
426 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
427 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
428 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
429 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
430 Statement::Use(db) => self.use_database(db, query_ctx).await,
431 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
432 Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
433 Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
434 }
435 }
436
437 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
438 let catalog = query_ctx.current_catalog();
439 ensure!(
440 self.catalog_manager
441 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
442 .await
443 .context(CatalogSnafu)?,
444 SchemaNotFoundSnafu { schema_info: &db }
445 );
446
447 query_ctx.set_current_schema(&db);
448
449 Ok(Output::new_with_record_batches(RecordBatches::empty()))
450 }
451
452 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
453 let var_name = set_var.variable.to_string().to_uppercase();
454
455 match var_name.as_str() {
456 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
457
458 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
459 set_timezone(set_var.value, query_ctx)?
460 }
461
462 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
463
464 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
468
469 "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
471
472 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
473 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
474 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
475 Channel::Postgres => {
476 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
477 }
478 _ => {
479 return NotSupportedSnafu {
480 feat: format!("Unsupported set variable {}", var_name),
481 }
482 .fail();
483 }
484 },
485 "STATEMENT_TIMEOUT" => {
486 if query_ctx.channel() == Channel::Postgres {
487 set_query_timeout(set_var.value, query_ctx)?
488 } else {
489 return NotSupportedSnafu {
490 feat: format!("Unsupported set variable {}", var_name),
491 }
492 .fail();
493 }
494 }
495 "SEARCH_PATH" => {
496 if query_ctx.channel() == Channel::Postgres {
497 set_search_path(set_var.value, query_ctx)?
498 } else {
499 return NotSupportedSnafu {
500 feat: format!("Unsupported set variable {}", var_name),
501 }
502 .fail();
503 }
504 }
505 _ => {
506 if query_ctx.channel() == Channel::Postgres {
511 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
512 } else if query_ctx.channel() == Channel::Mysql && var_name.starts_with("@@") {
513 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
515 } else {
516 return NotSupportedSnafu {
517 feat: format!("Unsupported set variable {}", var_name),
518 }
519 .fail();
520 }
521 }
522 }
523 Ok(Output::new_with_affected_rows(0))
524 }
525
526 #[tracing::instrument(skip_all)]
527 pub async fn plan(
528 &self,
529 stmt: &QueryStatement,
530 query_ctx: QueryContextRef,
531 ) -> Result<LogicalPlan> {
532 self.query_engine
533 .planner()
534 .plan(stmt, query_ctx)
535 .await
536 .context(PlanStatementSnafu)
537 }
538
539 #[tracing::instrument(skip_all)]
541 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
542 self.query_engine
543 .execute(plan, query_ctx)
544 .await
545 .context(ExecLogicalPlanSnafu)
546 }
547
548 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
549 self.query_engine
550 .planner()
551 .optimize(plan)
552 .context(PlanStatementSnafu)
553 }
554
555 #[tracing::instrument(skip_all)]
556 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
557 let plan = self.plan(&stmt, query_ctx.clone()).await?;
558 self.exec_plan(plan, query_ctx).await
559 }
560
561 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
562 let TableReference {
563 catalog,
564 schema,
565 table,
566 } = table_ref;
567 self.catalog_manager
568 .table(catalog, schema, table, None)
569 .await
570 .context(CatalogSnafu)?
571 .with_context(|| TableNotFoundSnafu {
572 table_name: table_ref.to_string(),
573 })
574 }
575
576 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
577 &self.procedure_executor
578 }
579
580 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
581 &self.cache_invalidator
582 }
583
584 pub async fn convert_truncate_time_ranges(
587 &self,
588 table_name: &TableName,
589 sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
590 query_ctx: &QueryContextRef,
591 ) -> Result<Vec<(Timestamp, Timestamp)>> {
592 if sql_values_time_range.is_empty() {
593 return Ok(vec![]);
594 }
595 let table = self.get_table(&table_name.table_ref()).await?;
596 let info = table.table_info();
597 let time_index_dt = info
598 .meta
599 .schema
600 .timestamp_column()
601 .context(UnexpectedSnafu {
602 violated: "Table must have a timestamp column",
603 })?;
604
605 let time_unit = time_index_dt
606 .data_type
607 .as_timestamp()
608 .with_context(|| UnexpectedSnafu {
609 violated: format!(
610 "Table {}'s time index column must be a timestamp type, found: {:?}",
611 table_name, time_index_dt
612 ),
613 })?
614 .unit();
615
616 let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
617 for (start, end) in sql_values_time_range {
618 let start = common_sql::convert::sql_value_to_value(
619 "range_start",
620 &ConcreteDataType::timestamp_datatype(time_unit),
621 start,
622 Some(&query_ctx.timezone()),
623 None,
624 false,
625 )
626 .context(SqlCommonSnafu)
627 .and_then(|v| {
628 if let datatypes::value::Value::Timestamp(t) = v {
629 Ok(t)
630 } else {
631 error::InvalidSqlSnafu {
632 err_msg: format!("Expected a timestamp value, found {v:?}"),
633 }
634 .fail()
635 }
636 })?;
637
638 let end = common_sql::convert::sql_value_to_value(
639 "range_end",
640 &ConcreteDataType::timestamp_datatype(time_unit),
641 end,
642 Some(&query_ctx.timezone()),
643 None,
644 false,
645 )
646 .context(SqlCommonSnafu)
647 .and_then(|v| {
648 if let datatypes::value::Value::Timestamp(t) = v {
649 Ok(t)
650 } else {
651 error::InvalidSqlSnafu {
652 err_msg: format!("Expected a timestamp value, found {v:?}"),
653 }
654 .fail()
655 }
656 })?;
657 time_ranges.push((start, end));
658 }
659 Ok(time_ranges)
660 }
661
662 pub(crate) fn inserter(&self) -> &InserterRef {
664 &self.inserter
665 }
666}
667
668fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
669 let CopyQueryToArgument {
670 with,
671 connection,
672 location,
673 } = stmt;
674
675 Ok(CopyQueryToRequest {
676 location,
677 with: with.into_map(),
678 connection: connection.into_map(),
679 })
680}
681
682fn verify_time_related_format(with: &OptionMap) -> Result<()> {
684 let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
685 let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
686 let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
687 let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
688
689 if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
690 ensure!(
691 time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
692 error::TimestampFormatNotSupportedSnafu {
693 format: "<unknown>".to_string(),
694 file_format: file_format.cloned().unwrap_or_default(),
695 }
696 );
697 }
698
699 for (key, format_opt) in [
700 (common_datasource::file_format::TIME_FORMAT, time_format),
701 (common_datasource::file_format::DATE_FORMAT, date_format),
702 (
703 common_datasource::file_format::TIMESTAMP_FORMAT,
704 timestamp_format,
705 ),
706 ] {
707 if let Some(format) = format_opt {
708 chrono::format::strftime::StrftimeItems::new(format)
709 .parse()
710 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
711 }
712 }
713
714 Ok(())
715}
716
717fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
718 let direction = match stmt {
719 CopyTable::To(_) => CopyDirection::Export,
720 CopyTable::From(_) => CopyDirection::Import,
721 };
722
723 let CopyTableArgument {
724 location,
725 connection,
726 with,
727 table_name,
728 limit,
729 ..
730 } = match stmt {
731 CopyTable::To(arg) => arg,
732 CopyTable::From(arg) => arg,
733 };
734 let (catalog_name, schema_name, table_name) =
735 table_idents_to_full_name(&table_name, &query_ctx)
736 .map_err(BoxedError::new)
737 .context(ExternalSnafu)?;
738
739 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
740
741 verify_time_related_format(&with)?;
742
743 let pattern = with
744 .get(common_datasource::file_format::FILE_PATTERN)
745 .cloned();
746
747 Ok(CopyTableRequest {
748 catalog_name,
749 schema_name,
750 table_name,
751 location,
752 with: with.into_map(),
753 connection: connection.into_map(),
754 pattern,
755 direction,
756 timestamp_range,
757 limit,
758 })
759}
760
761fn to_copy_database_request(
764 arg: CopyDatabaseArgument,
765 query_ctx: &QueryContextRef,
766) -> Result<CopyDatabaseRequest> {
767 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
768 .map_err(BoxedError::new)
769 .context(ExternalSnafu)?;
770 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
771
772 Ok(CopyDatabaseRequest {
773 catalog_name,
774 schema_name: database_name,
775 location: arg.location,
776 with: arg.with.into_map(),
777 connection: arg.connection.into_map(),
778 time_range,
779 })
780}
781
782fn timestamp_range_from_option_map(
786 options: &OptionMap,
787 query_ctx: &QueryContextRef,
788) -> Result<Option<TimestampRange>> {
789 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
790 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
791 let time_range = match (start_timestamp, end_timestamp) {
792 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
793 error::InvalidTimestampRangeSnafu {
794 start: start.to_iso8601_string(),
795 end: end.to_iso8601_string(),
796 }
797 })?),
798 (Some(start), None) => Some(TimestampRange::from_start(start)),
799 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
801 };
802 Ok(time_range)
803}
804
805fn extract_timestamp(
807 map: &OptionMap,
808 key: &str,
809 query_ctx: &QueryContextRef,
810) -> Result<Option<Timestamp>> {
811 map.get(key)
812 .map(|v| {
813 Timestamp::from_str(v, Some(&query_ctx.timezone()))
814 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
815 })
816 .transpose()
817}
818
819fn idents_to_full_database_name(
820 obj_name: &ObjectName,
821 query_ctx: &QueryContextRef,
822) -> Result<(String, String)> {
823 match &obj_name.0[..] {
824 [database] => Ok((
825 query_ctx.current_catalog().to_owned(),
826 database.to_string_unquoted(),
827 )),
828 [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
829 _ => InvalidSqlSnafu {
830 err_msg: format!(
831 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
832 ),
833 }
834 .fail(),
835 }
836}
837
838pub struct InserterImpl {
840 statement_executor: StatementExecutorRef,
841 options: Option<InsertOptions>,
842}
843
844impl InserterImpl {
845 pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
846 Self {
847 statement_executor,
848 options,
849 }
850 }
851}
852
853#[async_trait::async_trait]
854impl Inserter for InserterImpl {
855 async fn insert_rows(
856 &self,
857 context: &client::inserter::Context<'_>,
858 requests: RowInsertRequests,
859 ) -> ClientResult<()> {
860 let mut ctx_builder = QueryContextBuilder::default()
861 .current_catalog(context.catalog.to_string())
862 .current_schema(context.schema.to_string());
863 if let Some(options) = self.options.as_ref() {
864 ctx_builder = ctx_builder
865 .set_extension(
866 TTL_KEY.to_string(),
867 format_duration(options.ttl).to_string(),
868 )
869 .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
870 }
871 let query_ctx = ctx_builder.build().into();
872
873 self.statement_executor
874 .inserter()
875 .handle_row_inserts(
876 requests,
877 query_ctx,
878 self.statement_executor.as_ref(),
879 false,
880 false,
881 )
882 .await
883 .map_err(BoxedError::new)
884 .context(ClientExternalSnafu)
885 .map(|_| ())
886 }
887
888 fn set_options(&mut self, options: &InsertOptions) {
889 self.options = Some(*options);
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use std::assert_matches::assert_matches;
896 use std::collections::HashMap;
897
898 use common_time::range::TimestampRange;
899 use common_time::{Timestamp, Timezone};
900 use session::context::QueryContextBuilder;
901 use sql::statements::OptionMap;
902
903 use crate::error;
904 use crate::statement::copy_database::{
905 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
906 };
907 use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
908
909 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
910 let query_ctx = QueryContextBuilder::default()
911 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
912 .build()
913 .into();
914 let map = OptionMap::from(
915 [
916 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
917 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
918 ]
919 .into_iter()
920 .collect::<HashMap<_, _>>(),
921 );
922 timestamp_range_from_option_map(&map, &query_ctx)
923 }
924
925 #[test]
926 fn test_timestamp_range_from_option_map() {
927 assert_eq!(
928 Some(
929 TimestampRange::new(
930 Timestamp::new_second(1649635200),
931 Timestamp::new_second(1649664000),
932 )
933 .unwrap(),
934 ),
935 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
936 );
937
938 assert_matches!(
939 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
940 error::Error::InvalidTimestampRange { .. }
941 );
942 }
943
944 #[test]
945 fn test_verify_timestamp_format() {
946 let map = OptionMap::from(
947 [
948 (
949 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
950 "%Y-%m-%d %H:%M:%S".to_string(),
951 ),
952 (
953 common_datasource::file_format::FORMAT_TYPE.to_string(),
954 "csv".to_string(),
955 ),
956 ]
957 .into_iter()
958 .collect::<HashMap<_, _>>(),
959 );
960 assert!(verify_time_related_format(&map).is_ok());
961
962 let map = OptionMap::from(
963 [
964 (
965 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
966 "%Y-%m-%d %H:%M:%S".to_string(),
967 ),
968 (
969 common_datasource::file_format::FORMAT_TYPE.to_string(),
970 "json".to_string(),
971 ),
972 ]
973 .into_iter()
974 .collect::<HashMap<_, _>>(),
975 );
976
977 assert_matches!(
978 verify_time_related_format(&map).unwrap_err(),
979 error::Error::TimestampFormatNotSupported { .. }
980 );
981 let map = OptionMap::from(
982 [
983 (
984 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
985 "%111112".to_string(),
986 ),
987 (
988 common_datasource::file_format::FORMAT_TYPE.to_string(),
989 "csv".to_string(),
990 ),
991 ]
992 .into_iter()
993 .collect::<HashMap<_, _>>(),
994 );
995
996 assert_matches!(
997 verify_time_related_format(&map).unwrap_err(),
998 error::Error::InvalidCopyParameter { .. }
999 );
1000 }
1001}