1mod admin;
16mod comment;
17mod copy_database;
18mod copy_query_to;
19mod copy_table_from;
20mod copy_table_to;
21mod cursor;
22pub mod ddl;
23mod describe;
24mod dml;
25mod kill;
26mod set;
27mod show;
28mod tql;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32
33use api::v1::RowInsertRequests;
34use catalog::CatalogManagerRef;
35use catalog::kvbackend::KvBackendCatalogManager;
36use catalog::process_manager::ProcessManagerRef;
37use client::RecordBatches;
38use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
39use client::inserter::{InsertOptions, Inserter};
40use common_error::ext::BoxedError;
41use common_meta::cache::TableRouteCacheRef;
42use common_meta::cache_invalidator::CacheInvalidatorRef;
43use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
44use common_meta::key::schema_name::SchemaNameKey;
45use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
46use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
47use common_meta::kv_backend::KvBackendRef;
48use common_meta::procedure_executor::ProcedureExecutorRef;
49use common_query::Output;
50use common_telemetry::{debug, tracing, warn};
51use common_time::Timestamp;
52use common_time::range::TimestampRange;
53use datafusion_expr::LogicalPlan;
54use datatypes::prelude::ConcreteDataType;
55use datatypes::schema::ColumnSchema;
56use humantime::format_duration;
57use itertools::Itertools;
58use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
59use query::QueryEngineRef;
60use query::parser::QueryStatement;
61use session::context::{Channel, QueryContextBuilder, QueryContextRef};
62use session::table_name::table_idents_to_full_name;
63use set::{set_query_timeout, set_read_preference};
64use snafu::{OptionExt, ResultExt, ensure};
65use sql::ast::ObjectNamePartExt;
66use sql::statements::OptionMap;
67use sql::statements::copy::{
68 CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
69};
70use sql::statements::set_variables::SetVariables;
71use sql::statements::show::ShowCreateTableVariant;
72use sql::statements::statement::Statement;
73use sql::util::format_raw_object_name;
74use sqlparser::ast::ObjectName;
75use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
76use table::TableRef;
77use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
78use table::table_name::TableName;
79use table::table_reference::TableReference;
80
81use self::set::{
82 set_bytea_output, set_datestyle, set_search_path, set_timezone, validate_client_encoding,
83};
84use crate::error::{
85 self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
86 PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
87 TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
88};
89use crate::insert::InserterRef;
90use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
91use crate::statement::set::set_allow_query_fallback;
92
93#[async_trait::async_trait]
95pub trait StatementExecutorConfigurator: Send + Sync {
96 async fn configure(
97 &self,
98 executor: StatementExecutor,
99 ctx: ExecutorConfigureContext,
100 ) -> std::result::Result<StatementExecutor, BoxedError>;
101}
102
103pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
104
105pub struct ExecutorConfigureContext {
106 pub kv_backend: KvBackendRef,
107}
108
109#[derive(Clone)]
110pub struct StatementExecutor {
111 catalog_manager: CatalogManagerRef,
112 query_engine: QueryEngineRef,
113 procedure_executor: ProcedureExecutorRef,
114 table_metadata_manager: TableMetadataManagerRef,
115 flow_metadata_manager: FlowMetadataManagerRef,
116 view_info_manager: ViewInfoManagerRef,
117 partition_manager: PartitionRuleManagerRef,
118 cache_invalidator: CacheInvalidatorRef,
119 inserter: InserterRef,
120 process_manager: Option<ProcessManagerRef>,
121 #[cfg(feature = "enterprise")]
122 trigger_querier: Option<TriggerQuerierRef>,
123}
124
125pub type StatementExecutorRef = Arc<StatementExecutor>;
126
127#[cfg(feature = "enterprise")]
129#[async_trait::async_trait]
130pub trait TriggerQuerier: Send + Sync {
131 async fn show_create_trigger(
133 &self,
134 catalog: &str,
135 trigger: &str,
136 query_ctx: &QueryContextRef,
137 ) -> std::result::Result<Output, BoxedError>;
138
139 fn as_any(&self) -> &dyn std::any::Any;
140}
141
142#[cfg(feature = "enterprise")]
143pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
144
145impl StatementExecutor {
146 #[allow(clippy::too_many_arguments)]
147 pub fn new(
148 catalog_manager: CatalogManagerRef,
149 query_engine: QueryEngineRef,
150 procedure_executor: ProcedureExecutorRef,
151 kv_backend: KvBackendRef,
152 cache_invalidator: CacheInvalidatorRef,
153 inserter: InserterRef,
154 table_route_cache: TableRouteCacheRef,
155 process_manager: Option<ProcessManagerRef>,
156 ) -> Self {
157 Self {
158 catalog_manager,
159 query_engine,
160 procedure_executor,
161 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
162 flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
163 view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
164 partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
165 cache_invalidator,
166 inserter,
167 process_manager,
168 #[cfg(feature = "enterprise")]
169 trigger_querier: None,
170 }
171 }
172
173 #[cfg(feature = "enterprise")]
174 pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
175 self.trigger_querier = Some(querier);
176 self
177 }
178
179 #[cfg(feature = "testing")]
180 pub async fn execute_stmt(
181 &self,
182 stmt: QueryStatement,
183 query_ctx: QueryContextRef,
184 ) -> Result<Output> {
185 match stmt {
186 QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
187 QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
188 }
189 }
190
191 #[tracing::instrument(skip_all)]
192 pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
193 match stmt {
194 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
195 self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
196 }
197
198 Statement::DeclareCursor(declare_cursor) => {
199 self.declare_cursor(declare_cursor, query_ctx).await
200 }
201 Statement::FetchCursor(fetch_cursor) => {
202 self.fetch_cursor(fetch_cursor, query_ctx).await
203 }
204 Statement::CloseCursor(close_cursor) => {
205 self.close_cursor(close_cursor, query_ctx).await
206 }
207
208 Statement::Insert(insert) => self.insert(insert, query_ctx).await,
209
210 Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
211
212 Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
213
214 Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
215
216 Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
217
218 Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
219
220 Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
221
222 Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
223
224 Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
225
226 Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
227
228 #[cfg(feature = "enterprise")]
229 Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
230
231 Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
232 let query_output = self
233 .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
234 .await?;
235 let req = to_copy_query_request(stmt.arg)?;
236
237 self.copy_query_to(req, query_output)
238 .await
239 .map(Output::new_with_affected_rows)
240 }
241
242 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
243 let req = to_copy_table_request(stmt, query_ctx.clone())?;
244 match req.direction {
245 CopyDirection::Export => self
246 .copy_table_to(req, query_ctx)
247 .await
248 .map(Output::new_with_affected_rows),
249 CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
250 }
251 }
252
253 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
254 match copy_database {
255 CopyDatabase::To(arg) => {
256 self.copy_database_to(
257 to_copy_database_request(arg, &query_ctx)?,
258 query_ctx.clone(),
259 )
260 .await
261 }
262 CopyDatabase::From(arg) => {
263 self.copy_database_from(
264 to_copy_database_request(arg, &query_ctx)?,
265 query_ctx,
266 )
267 .await
268 }
269 }
270 }
271
272 Statement::CreateTable(stmt) => {
273 let _ = self.create_table(stmt, query_ctx).await?;
274 Ok(Output::new_with_affected_rows(0))
275 }
276 Statement::CreateTableLike(stmt) => {
277 let _ = self.create_table_like(stmt, query_ctx).await?;
278 Ok(Output::new_with_affected_rows(0))
279 }
280 Statement::CreateExternalTable(stmt) => {
281 let _ = self.create_external_table(stmt, query_ctx).await?;
282 Ok(Output::new_with_affected_rows(0))
283 }
284 Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
285 #[cfg(feature = "enterprise")]
286 Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
287 Statement::DropFlow(stmt) => {
288 self.drop_flow(
289 query_ctx.current_catalog().to_string(),
290 format_raw_object_name(stmt.flow_name()),
291 stmt.drop_if_exists(),
292 query_ctx,
293 )
294 .await
295 }
296 #[cfg(feature = "enterprise")]
297 Statement::DropTrigger(stmt) => {
298 self.drop_trigger(
299 query_ctx.current_catalog().to_string(),
300 format_raw_object_name(stmt.trigger_name()),
301 stmt.drop_if_exists(),
302 query_ctx,
303 )
304 .await
305 }
306 Statement::CreateView(stmt) => {
307 let _ = self.create_view(stmt, query_ctx).await?;
308 Ok(Output::new_with_affected_rows(0))
309 }
310 Statement::DropView(stmt) => {
311 let (catalog_name, schema_name, view_name) =
312 table_idents_to_full_name(&stmt.view_name, &query_ctx)
313 .map_err(BoxedError::new)
314 .context(ExternalSnafu)?;
315
316 self.drop_view(
317 catalog_name,
318 schema_name,
319 view_name,
320 stmt.drop_if_exists,
321 query_ctx,
322 )
323 .await
324 }
325 Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
326
327 Statement::AlterDatabase(alter_database) => {
328 self.alter_database(alter_database, query_ctx).await
329 }
330
331 #[cfg(feature = "enterprise")]
332 Statement::AlterTrigger(alter_trigger) => {
333 self.alter_trigger(alter_trigger, query_ctx).await
334 }
335
336 Statement::DropTable(stmt) => {
337 let mut table_names = Vec::with_capacity(stmt.table_names().len());
338 for table_name_stmt in stmt.table_names() {
339 let (catalog, schema, table) =
340 table_idents_to_full_name(table_name_stmt, &query_ctx)
341 .map_err(BoxedError::new)
342 .context(ExternalSnafu)?;
343 table_names.push(TableName::new(catalog, schema, table));
344 }
345 self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
346 .await
347 }
348 Statement::DropDatabase(stmt) => {
349 self.drop_database(
350 query_ctx.current_catalog().to_string(),
351 format_raw_object_name(stmt.name()),
352 stmt.drop_if_exists(),
353 query_ctx,
354 )
355 .await
356 }
357 Statement::TruncateTable(stmt) => {
358 let (catalog, schema, table) =
359 table_idents_to_full_name(stmt.table_name(), &query_ctx)
360 .map_err(BoxedError::new)
361 .context(ExternalSnafu)?;
362 let table_name = TableName::new(catalog, schema, table);
363 let time_ranges = self
364 .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
365 .await?;
366 self.truncate_table(table_name, time_ranges, query_ctx)
367 .await
368 }
369 Statement::CreateDatabase(stmt) => {
370 self.create_database(
371 &format_raw_object_name(&stmt.name),
372 stmt.if_not_exists,
373 stmt.options.into_map(),
374 query_ctx,
375 )
376 .await
377 }
378 Statement::ShowCreateDatabase(show) => {
379 let (catalog, database) =
380 idents_to_full_database_name(&show.database_name, &query_ctx)
381 .map_err(BoxedError::new)
382 .context(ExternalSnafu)?;
383 let table_metadata_manager = self
384 .catalog_manager
385 .as_any()
386 .downcast_ref::<KvBackendCatalogManager>()
387 .map(|manager| manager.table_metadata_manager_ref().clone())
388 .context(UpgradeCatalogManagerRefSnafu)?;
389 let opts: HashMap<String, String> = table_metadata_manager
390 .schema_manager()
391 .get(SchemaNameKey::new(&catalog, &database))
392 .await
393 .context(TableMetadataManagerSnafu)?
394 .context(SchemaNotFoundSnafu {
395 schema_info: &database,
396 })?
397 .into_inner()
398 .into();
399
400 self.show_create_database(&database, opts.into()).await
401 }
402 Statement::ShowCreateTable(show) => {
403 let (catalog, schema, table) =
404 table_idents_to_full_name(&show.table_name, &query_ctx)
405 .map_err(BoxedError::new)
406 .context(ExternalSnafu)?;
407
408 let table_ref = self
409 .catalog_manager
410 .table(&catalog, &schema, &table, Some(&query_ctx))
411 .await
412 .context(CatalogSnafu)?
413 .context(TableNotFoundSnafu { table_name: &table })?;
414 let table_name = TableName::new(catalog, schema, table);
415
416 match show.variant {
417 ShowCreateTableVariant::Original => {
418 self.show_create_table(table_name, table_ref, query_ctx)
419 .await
420 }
421 ShowCreateTableVariant::PostgresForeignTable => {
422 self.show_create_table_for_pg(table_name, table_ref, query_ctx)
423 .await
424 }
425 }
426 }
427 Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
428 Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
429 #[cfg(feature = "enterprise")]
430 Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
431 Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
432 Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
433 Statement::Comment(stmt) => self.comment(stmt, query_ctx).await,
434 Statement::ShowColumns(show_columns) => {
435 self.show_columns(show_columns, query_ctx).await
436 }
437 Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
438 Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
439 Statement::ShowStatus(_) => self.show_status(query_ctx).await,
440 Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
441 Statement::Use(db) => self.use_database(db, query_ctx).await,
442 Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
443 Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
444 Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
445 }
446 }
447
448 pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
449 let catalog = query_ctx.current_catalog();
450 ensure!(
451 self.catalog_manager
452 .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
453 .await
454 .context(CatalogSnafu)?,
455 SchemaNotFoundSnafu { schema_info: &db }
456 );
457
458 query_ctx.set_current_schema(&db);
459
460 Ok(Output::new_with_record_batches(RecordBatches::empty()))
461 }
462
463 fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
464 let var_name = set_var.variable.to_string().to_uppercase();
465
466 debug!(
467 "Trying to set {}={} for session: {} ",
468 var_name,
469 set_var.value.iter().map(|e| e.to_string()).join(", "),
470 query_ctx.conn_info()
471 );
472
473 match var_name.as_str() {
474 "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
475
476 "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
477 set_timezone(set_var.value, query_ctx)?
478 }
479
480 "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
481
482 "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
486
487 "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
489
490 "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
491 "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
492 Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
493 Channel::Postgres => {
494 warn!(
495 "Unsupported set variable {} for channel {:?}",
496 var_name,
497 query_ctx.channel()
498 );
499 query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
500 }
501 _ => {
502 return NotSupportedSnafu {
503 feat: format!("Unsupported set variable {}", var_name),
504 }
505 .fail();
506 }
507 },
508 "STATEMENT_TIMEOUT" => match query_ctx.channel() {
509 Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
510 Channel::Mysql => {
511 warn!(
512 "Unsupported set variable {} for channel {:?}",
513 var_name,
514 query_ctx.channel()
515 );
516 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
517 }
518 _ => {
519 return NotSupportedSnafu {
520 feat: format!("Unsupported set variable {}", var_name),
521 }
522 .fail();
523 }
524 },
525 "SEARCH_PATH" => {
526 if query_ctx.channel() == Channel::Postgres {
527 set_search_path(set_var.value, query_ctx)?
528 } else {
529 return NotSupportedSnafu {
530 feat: format!("Unsupported set variable {}", var_name),
531 }
532 .fail();
533 }
534 }
535 _ => {
536 if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
537 {
538 warn!(
542 "Unsupported set variable {} for channel {:?}",
543 var_name,
544 query_ctx.channel()
545 );
546 query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
547 } else {
548 return NotSupportedSnafu {
549 feat: format!("Unsupported set variable {}", var_name),
550 }
551 .fail();
552 }
553 }
554 }
555 Ok(Output::new_with_affected_rows(0))
556 }
557
558 #[tracing::instrument(skip_all)]
559 pub async fn plan(
560 &self,
561 stmt: &QueryStatement,
562 query_ctx: QueryContextRef,
563 ) -> Result<LogicalPlan> {
564 self.query_engine
565 .planner()
566 .plan(stmt, query_ctx)
567 .await
568 .context(PlanStatementSnafu)
569 }
570
571 #[tracing::instrument(skip_all)]
573 pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
574 self.query_engine
575 .execute(plan, query_ctx)
576 .await
577 .context(ExecLogicalPlanSnafu)
578 }
579
580 pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
581 self.query_engine
582 .planner()
583 .optimize(plan)
584 .context(PlanStatementSnafu)
585 }
586
587 #[tracing::instrument(skip_all)]
588 async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
589 let plan = self.plan(&stmt, query_ctx.clone()).await?;
590 self.exec_plan(plan, query_ctx).await
591 }
592
593 async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
594 let TableReference {
595 catalog,
596 schema,
597 table,
598 } = table_ref;
599 self.catalog_manager
600 .table(catalog, schema, table, None)
601 .await
602 .context(CatalogSnafu)?
603 .with_context(|| TableNotFoundSnafu {
604 table_name: table_ref.to_string(),
605 })
606 }
607
608 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
609 &self.procedure_executor
610 }
611
612 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
613 &self.cache_invalidator
614 }
615
616 pub async fn convert_truncate_time_ranges(
619 &self,
620 table_name: &TableName,
621 sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
622 query_ctx: &QueryContextRef,
623 ) -> Result<Vec<(Timestamp, Timestamp)>> {
624 if sql_values_time_range.is_empty() {
625 return Ok(vec![]);
626 }
627 let table = self.get_table(&table_name.table_ref()).await?;
628 let info = table.table_info();
629 let time_index_dt = info
630 .meta
631 .schema
632 .timestamp_column()
633 .context(UnexpectedSnafu {
634 violated: "Table must have a timestamp column",
635 })?;
636
637 let time_unit = time_index_dt
638 .data_type
639 .as_timestamp()
640 .with_context(|| UnexpectedSnafu {
641 violated: format!(
642 "Table {}'s time index column must be a timestamp type, found: {:?}",
643 table_name, time_index_dt
644 ),
645 })?
646 .unit();
647
648 let start_column = ColumnSchema::new(
649 "range_start",
650 ConcreteDataType::timestamp_datatype(time_unit),
651 false,
652 );
653 let end_column = ColumnSchema::new(
654 "range_end",
655 ConcreteDataType::timestamp_datatype(time_unit),
656 false,
657 );
658 let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
659 for (start, end) in sql_values_time_range {
660 let start = common_sql::convert::sql_value_to_value(
661 &start_column,
662 start,
663 Some(&query_ctx.timezone()),
664 None,
665 false,
666 )
667 .context(SqlCommonSnafu)
668 .and_then(|v| {
669 if let datatypes::value::Value::Timestamp(t) = v {
670 Ok(t)
671 } else {
672 error::InvalidSqlSnafu {
673 err_msg: format!("Expected a timestamp value, found {v:?}"),
674 }
675 .fail()
676 }
677 })?;
678
679 let end = common_sql::convert::sql_value_to_value(
680 &end_column,
681 end,
682 Some(&query_ctx.timezone()),
683 None,
684 false,
685 )
686 .context(SqlCommonSnafu)
687 .and_then(|v| {
688 if let datatypes::value::Value::Timestamp(t) = v {
689 Ok(t)
690 } else {
691 error::InvalidSqlSnafu {
692 err_msg: format!("Expected a timestamp value, found {v:?}"),
693 }
694 .fail()
695 }
696 })?;
697 time_ranges.push((start, end));
698 }
699 Ok(time_ranges)
700 }
701
702 pub(crate) fn inserter(&self) -> &InserterRef {
704 &self.inserter
705 }
706}
707
708fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
709 let CopyQueryToArgument {
710 with,
711 connection,
712 location,
713 } = stmt;
714
715 Ok(CopyQueryToRequest {
716 location,
717 with: with.into_map(),
718 connection: connection.into_map(),
719 })
720}
721
722fn verify_time_related_format(with: &OptionMap) -> Result<()> {
724 let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
725 let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
726 let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
727 let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
728
729 if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
730 ensure!(
731 time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
732 error::TimestampFormatNotSupportedSnafu {
733 format: "<unknown>".to_string(),
734 file_format: file_format.unwrap_or_default(),
735 }
736 );
737 }
738
739 for (key, format_opt) in [
740 (common_datasource::file_format::TIME_FORMAT, time_format),
741 (common_datasource::file_format::DATE_FORMAT, date_format),
742 (
743 common_datasource::file_format::TIMESTAMP_FORMAT,
744 timestamp_format,
745 ),
746 ] {
747 if let Some(format) = format_opt {
748 chrono::format::strftime::StrftimeItems::new(format)
749 .parse()
750 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
751 }
752 }
753
754 Ok(())
755}
756
757fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
758 let direction = match stmt {
759 CopyTable::To(_) => CopyDirection::Export,
760 CopyTable::From(_) => CopyDirection::Import,
761 };
762
763 let CopyTableArgument {
764 location,
765 connection,
766 with,
767 table_name,
768 limit,
769 ..
770 } = match stmt {
771 CopyTable::To(arg) => arg,
772 CopyTable::From(arg) => arg,
773 };
774 let (catalog_name, schema_name, table_name) =
775 table_idents_to_full_name(&table_name, &query_ctx)
776 .map_err(BoxedError::new)
777 .context(ExternalSnafu)?;
778
779 let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
780
781 verify_time_related_format(&with)?;
782
783 let pattern = with
784 .get(common_datasource::file_format::FILE_PATTERN)
785 .map(|x| x.to_string());
786
787 Ok(CopyTableRequest {
788 catalog_name,
789 schema_name,
790 table_name,
791 location,
792 with: with.into_map(),
793 connection: connection.into_map(),
794 pattern,
795 direction,
796 timestamp_range,
797 limit,
798 })
799}
800
801fn to_copy_database_request(
804 arg: CopyDatabaseArgument,
805 query_ctx: &QueryContextRef,
806) -> Result<CopyDatabaseRequest> {
807 let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
808 .map_err(BoxedError::new)
809 .context(ExternalSnafu)?;
810 let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
811
812 Ok(CopyDatabaseRequest {
813 catalog_name,
814 schema_name: database_name,
815 location: arg.location,
816 with: arg.with.into_map(),
817 connection: arg.connection.into_map(),
818 time_range,
819 })
820}
821
822fn timestamp_range_from_option_map(
826 options: &OptionMap,
827 query_ctx: &QueryContextRef,
828) -> Result<Option<TimestampRange>> {
829 let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
830 let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
831 let time_range = match (start_timestamp, end_timestamp) {
832 (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
833 error::InvalidTimestampRangeSnafu {
834 start: start.to_iso8601_string(),
835 end: end.to_iso8601_string(),
836 }
837 })?),
838 (Some(start), None) => Some(TimestampRange::from_start(start)),
839 (None, Some(end)) => Some(TimestampRange::until_end(end, false)), (None, None) => None,
841 };
842 Ok(time_range)
843}
844
845fn extract_timestamp(
847 map: &OptionMap,
848 key: &str,
849 query_ctx: &QueryContextRef,
850) -> Result<Option<Timestamp>> {
851 map.get(key)
852 .map(|v| {
853 Timestamp::from_str(v, Some(&query_ctx.timezone()))
854 .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
855 })
856 .transpose()
857}
858
859fn idents_to_full_database_name(
860 obj_name: &ObjectName,
861 query_ctx: &QueryContextRef,
862) -> Result<(String, String)> {
863 match &obj_name.0[..] {
864 [database] => Ok((
865 query_ctx.current_catalog().to_owned(),
866 database.to_string_unquoted(),
867 )),
868 [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
869 _ => InvalidSqlSnafu {
870 err_msg: format!(
871 "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
872 ),
873 }
874 .fail(),
875 }
876}
877
878pub struct InserterImpl {
880 statement_executor: StatementExecutorRef,
881 options: Option<InsertOptions>,
882}
883
884impl InserterImpl {
885 pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
886 Self {
887 statement_executor,
888 options,
889 }
890 }
891}
892
893#[async_trait::async_trait]
894impl Inserter for InserterImpl {
895 async fn insert_rows(
896 &self,
897 context: &client::inserter::Context<'_>,
898 requests: RowInsertRequests,
899 ) -> ClientResult<()> {
900 let mut ctx_builder = QueryContextBuilder::default()
901 .current_catalog(context.catalog.to_string())
902 .current_schema(context.schema.to_string());
903 if let Some(options) = self.options.as_ref() {
904 ctx_builder = ctx_builder
905 .set_extension(
906 TTL_KEY.to_string(),
907 format_duration(options.ttl).to_string(),
908 )
909 .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
910 }
911 let query_ctx = ctx_builder.build().into();
912
913 self.statement_executor
914 .inserter()
915 .handle_row_inserts(
916 requests,
917 query_ctx,
918 self.statement_executor.as_ref(),
919 false,
920 false,
921 )
922 .await
923 .map_err(BoxedError::new)
924 .context(ClientExternalSnafu)
925 .map(|_| ())
926 }
927
928 fn set_options(&mut self, options: &InsertOptions) {
929 self.options = Some(*options);
930 }
931}
932
933#[cfg(test)]
934mod tests {
935 use std::assert_matches::assert_matches;
936 use std::collections::HashMap;
937
938 use common_time::range::TimestampRange;
939 use common_time::{Timestamp, Timezone};
940 use session::context::QueryContextBuilder;
941 use sql::statements::OptionMap;
942
943 use crate::error;
944 use crate::statement::copy_database::{
945 COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
946 };
947 use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
948
949 fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
950 let query_ctx = QueryContextBuilder::default()
951 .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
952 .build()
953 .into();
954 let map = OptionMap::from(
955 [
956 (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
957 (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
958 ]
959 .into_iter()
960 .collect::<HashMap<_, _>>(),
961 );
962 timestamp_range_from_option_map(&map, &query_ctx)
963 }
964
965 #[test]
966 fn test_timestamp_range_from_option_map() {
967 assert_eq!(
968 Some(
969 TimestampRange::new(
970 Timestamp::new_second(1649635200),
971 Timestamp::new_second(1649664000),
972 )
973 .unwrap(),
974 ),
975 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
976 );
977
978 assert_matches!(
979 check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
980 error::Error::InvalidTimestampRange { .. }
981 );
982 }
983
984 #[test]
985 fn test_verify_timestamp_format() {
986 let map = OptionMap::from(
987 [
988 (
989 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
990 "%Y-%m-%d %H:%M:%S".to_string(),
991 ),
992 (
993 common_datasource::file_format::FORMAT_TYPE.to_string(),
994 "csv".to_string(),
995 ),
996 ]
997 .into_iter()
998 .collect::<HashMap<_, _>>(),
999 );
1000 assert!(verify_time_related_format(&map).is_ok());
1001
1002 let map = OptionMap::from(
1003 [
1004 (
1005 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1006 "%Y-%m-%d %H:%M:%S".to_string(),
1007 ),
1008 (
1009 common_datasource::file_format::FORMAT_TYPE.to_string(),
1010 "json".to_string(),
1011 ),
1012 ]
1013 .into_iter()
1014 .collect::<HashMap<_, _>>(),
1015 );
1016
1017 assert_matches!(
1018 verify_time_related_format(&map).unwrap_err(),
1019 error::Error::TimestampFormatNotSupported { .. }
1020 );
1021 let map = OptionMap::from(
1022 [
1023 (
1024 common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1025 "%111112".to_string(),
1026 ),
1027 (
1028 common_datasource::file_format::FORMAT_TYPE.to_string(),
1029 "csv".to_string(),
1030 ),
1031 ]
1032 .into_iter()
1033 .collect::<HashMap<_, _>>(),
1034 );
1035
1036 assert_matches!(
1037 verify_time_related_format(&map).unwrap_err(),
1038 error::Error::InvalidCopyParameter { .. }
1039 );
1040 }
1041}