1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
22use api::v1::meta::ddl_task_request::Task;
23use api::v1::meta::{
24 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
25 AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
26 CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
27 CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
28 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
29 DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
30 DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
31 DropViewTask as PbDropViewTask, Partition, ProcedureId,
32 TruncateTableTask as PbTruncateTableTask,
33};
34use api::v1::{
35 AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
36 CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
37 Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
38};
39use base64::engine::general_purpose;
40use base64::Engine as _;
41use common_time::{DatabaseTimeToLive, Timezone};
42use prost::Message;
43use serde::{Deserialize, Serialize};
44use serde_with::{serde_as, DefaultOnNull};
45use session::context::{QueryContextBuilder, QueryContextRef};
46use snafu::{OptionExt, ResultExt};
47use table::metadata::{RawTableInfo, TableId};
48use table::table_name::TableName;
49use table::table_reference::TableReference;
50
51use crate::error::{
52 self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
53 Result,
54};
55use crate::key::FlowId;
56
57#[derive(Debug, Clone)]
59pub enum DdlTask {
60 CreateTable(CreateTableTask),
61 DropTable(DropTableTask),
62 AlterTable(AlterTableTask),
63 TruncateTable(TruncateTableTask),
64 CreateLogicalTables(Vec<CreateTableTask>),
65 DropLogicalTables(Vec<DropTableTask>),
66 AlterLogicalTables(Vec<AlterTableTask>),
67 CreateDatabase(CreateDatabaseTask),
68 DropDatabase(DropDatabaseTask),
69 AlterDatabase(AlterDatabaseTask),
70 CreateFlow(CreateFlowTask),
71 DropFlow(DropFlowTask),
72 CreateView(CreateViewTask),
73 DropView(DropViewTask),
74 #[cfg(feature = "enterprise")]
75 CreateTrigger(trigger::CreateTriggerTask),
76}
77
78impl DdlTask {
79 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
81 DdlTask::CreateFlow(expr)
82 }
83
84 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
86 DdlTask::DropFlow(expr)
87 }
88
89 pub fn new_drop_view(expr: DropViewTask) -> Self {
91 DdlTask::DropView(expr)
92 }
93
94 pub fn new_create_table(
96 expr: CreateTableExpr,
97 partitions: Vec<Partition>,
98 table_info: RawTableInfo,
99 ) -> Self {
100 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
101 }
102
103 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
105 DdlTask::CreateLogicalTables(
106 table_data
107 .into_iter()
108 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
109 .collect(),
110 )
111 }
112
113 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
115 DdlTask::AlterLogicalTables(
116 table_data
117 .into_iter()
118 .map(|alter_table| AlterTableTask { alter_table })
119 .collect(),
120 )
121 }
122
123 pub fn new_drop_table(
125 catalog: String,
126 schema: String,
127 table: String,
128 table_id: TableId,
129 drop_if_exists: bool,
130 ) -> Self {
131 DdlTask::DropTable(DropTableTask {
132 catalog,
133 schema,
134 table,
135 table_id,
136 drop_if_exists,
137 })
138 }
139
140 pub fn new_create_database(
142 catalog: String,
143 schema: String,
144 create_if_not_exists: bool,
145 options: HashMap<String, String>,
146 ) -> Self {
147 DdlTask::CreateDatabase(CreateDatabaseTask {
148 catalog,
149 schema,
150 create_if_not_exists,
151 options,
152 })
153 }
154
155 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
157 DdlTask::DropDatabase(DropDatabaseTask {
158 catalog,
159 schema,
160 drop_if_exists,
161 })
162 }
163
164 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
166 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
167 }
168
169 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
171 DdlTask::AlterTable(AlterTableTask { alter_table })
172 }
173
174 pub fn new_truncate_table(
176 catalog: String,
177 schema: String,
178 table: String,
179 table_id: TableId,
180 ) -> Self {
181 DdlTask::TruncateTable(TruncateTableTask {
182 catalog,
183 schema,
184 table,
185 table_id,
186 })
187 }
188
189 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
191 DdlTask::CreateView(CreateViewTask {
192 create_view,
193 view_info,
194 })
195 }
196}
197
198impl TryFrom<Task> for DdlTask {
199 type Error = error::Error;
200 fn try_from(task: Task) -> Result<Self> {
201 match task {
202 Task::CreateTableTask(create_table) => {
203 Ok(DdlTask::CreateTable(create_table.try_into()?))
204 }
205 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
206 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
207 Task::TruncateTableTask(truncate_table) => {
208 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
209 }
210 Task::CreateTableTasks(create_tables) => {
211 let tasks = create_tables
212 .tasks
213 .into_iter()
214 .map(|task| task.try_into())
215 .collect::<Result<Vec<_>>>()?;
216
217 Ok(DdlTask::CreateLogicalTables(tasks))
218 }
219 Task::DropTableTasks(drop_tables) => {
220 let tasks = drop_tables
221 .tasks
222 .into_iter()
223 .map(|task| task.try_into())
224 .collect::<Result<Vec<_>>>()?;
225
226 Ok(DdlTask::DropLogicalTables(tasks))
227 }
228 Task::AlterTableTasks(alter_tables) => {
229 let tasks = alter_tables
230 .tasks
231 .into_iter()
232 .map(|task| task.try_into())
233 .collect::<Result<Vec<_>>>()?;
234
235 Ok(DdlTask::AlterLogicalTables(tasks))
236 }
237 Task::CreateDatabaseTask(create_database) => {
238 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
239 }
240 Task::DropDatabaseTask(drop_database) => {
241 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
242 }
243 Task::AlterDatabaseTask(alter_database) => {
244 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
245 }
246 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
247 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
248 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
249 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
250 Task::CreateTriggerTask(create_trigger) => {
251 #[cfg(feature = "enterprise")]
252 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
253 #[cfg(not(feature = "enterprise"))]
254 {
255 let _ = create_trigger;
256 crate::error::UnsupportedSnafu {
257 operation: "create trigger",
258 }
259 .fail()
260 }
261 }
262 }
263 }
264}
265
266#[derive(Clone)]
267pub struct SubmitDdlTaskRequest {
268 pub query_context: QueryContextRef,
269 pub task: DdlTask,
270}
271
272impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
273 type Error = error::Error;
274
275 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
276 let task = match request.task {
277 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
278 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
279 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
280 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
281 DdlTask::CreateLogicalTables(tasks) => {
282 let tasks = tasks
283 .into_iter()
284 .map(|task| task.try_into())
285 .collect::<Result<Vec<_>>>()?;
286
287 Task::CreateTableTasks(PbCreateTableTasks { tasks })
288 }
289 DdlTask::DropLogicalTables(tasks) => {
290 let tasks = tasks
291 .into_iter()
292 .map(|task| task.into())
293 .collect::<Vec<_>>();
294
295 Task::DropTableTasks(PbDropTableTasks { tasks })
296 }
297 DdlTask::AlterLogicalTables(tasks) => {
298 let tasks = tasks
299 .into_iter()
300 .map(|task| task.try_into())
301 .collect::<Result<Vec<_>>>()?;
302
303 Task::AlterTableTasks(PbAlterTableTasks { tasks })
304 }
305 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
306 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
307 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
308 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
309 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
310 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
311 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
312 #[cfg(feature = "enterprise")]
313 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
314 };
315
316 Ok(Self {
317 header: None,
318 query_context: Some((*request.query_context).clone().into()),
319 task: Some(task),
320 })
321 }
322}
323
324#[derive(Debug, Default)]
325pub struct SubmitDdlTaskResponse {
326 pub key: Vec<u8>,
327 pub table_ids: Vec<TableId>,
329}
330
331impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
332 type Error = error::Error;
333
334 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
335 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
336 Ok(Self {
337 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
338 table_ids,
339 })
340 }
341}
342
343impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
344 fn from(val: SubmitDdlTaskResponse) -> Self {
345 Self {
346 pid: Some(ProcedureId { key: val.key }),
347 table_ids: val
348 .table_ids
349 .into_iter()
350 .map(|id| api::v1::TableId { id })
351 .collect(),
352 ..Default::default()
353 }
354 }
355}
356
357#[derive(Debug, PartialEq, Clone)]
359pub struct CreateViewTask {
360 pub create_view: CreateViewExpr,
361 pub view_info: RawTableInfo,
362}
363
364impl CreateViewTask {
365 pub fn table_ref(&self) -> TableReference {
367 TableReference {
368 catalog: &self.create_view.catalog_name,
369 schema: &self.create_view.schema_name,
370 table: &self.create_view.view_name,
371 }
372 }
373
374 pub fn raw_logical_plan(&self) -> &Vec<u8> {
376 &self.create_view.logical_plan
377 }
378
379 pub fn view_definition(&self) -> &str {
381 &self.create_view.definition
382 }
383
384 pub fn table_names(&self) -> HashSet<TableName> {
386 self.create_view
387 .table_names
388 .iter()
389 .map(|t| t.clone().into())
390 .collect()
391 }
392
393 pub fn columns(&self) -> &Vec<String> {
395 &self.create_view.columns
396 }
397
398 pub fn plan_columns(&self) -> &Vec<String> {
400 &self.create_view.plan_columns
401 }
402}
403
404impl TryFrom<PbCreateViewTask> for CreateViewTask {
405 type Error = error::Error;
406
407 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
408 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
409
410 Ok(CreateViewTask {
411 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
412 err_msg: "expected create view",
413 })?,
414 view_info,
415 })
416 }
417}
418
419impl TryFrom<CreateViewTask> for PbCreateViewTask {
420 type Error = error::Error;
421
422 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
423 Ok(PbCreateViewTask {
424 create_view: Some(task.create_view),
425 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
426 })
427 }
428}
429
430impl Serialize for CreateViewTask {
431 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
432 where
433 S: serde::Serializer,
434 {
435 let view_info = serde_json::to_vec(&self.view_info)
436 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
437
438 let pb = PbCreateViewTask {
439 create_view: Some(self.create_view.clone()),
440 view_info,
441 };
442 let buf = pb.encode_to_vec();
443 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
444 serializer.serialize_str(&encoded)
445 }
446}
447
448impl<'de> Deserialize<'de> for CreateViewTask {
449 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
450 where
451 D: serde::Deserializer<'de>,
452 {
453 let encoded = String::deserialize(deserializer)?;
454 let buf = general_purpose::STANDARD_NO_PAD
455 .decode(encoded)
456 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
457 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
458 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
459
460 let expr = CreateViewTask::try_from(expr)
461 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
462
463 Ok(expr)
464 }
465}
466
467#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
469pub struct DropViewTask {
470 pub catalog: String,
471 pub schema: String,
472 pub view: String,
473 pub view_id: TableId,
474 pub drop_if_exists: bool,
475}
476
477impl DropViewTask {
478 pub fn table_ref(&self) -> TableReference {
480 TableReference {
481 catalog: &self.catalog,
482 schema: &self.schema,
483 table: &self.view,
484 }
485 }
486}
487
488impl TryFrom<PbDropViewTask> for DropViewTask {
489 type Error = error::Error;
490
491 fn try_from(pb: PbDropViewTask) -> Result<Self> {
492 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
493 err_msg: "expected drop view",
494 })?;
495
496 Ok(DropViewTask {
497 catalog: expr.catalog_name,
498 schema: expr.schema_name,
499 view: expr.view_name,
500 view_id: expr
501 .view_id
502 .context(error::InvalidProtoMsgSnafu {
503 err_msg: "expected view_id",
504 })?
505 .id,
506 drop_if_exists: expr.drop_if_exists,
507 })
508 }
509}
510
511impl From<DropViewTask> for PbDropViewTask {
512 fn from(task: DropViewTask) -> Self {
513 PbDropViewTask {
514 drop_view: Some(DropViewExpr {
515 catalog_name: task.catalog,
516 schema_name: task.schema,
517 view_name: task.view,
518 view_id: Some(api::v1::TableId { id: task.view_id }),
519 drop_if_exists: task.drop_if_exists,
520 }),
521 }
522 }
523}
524
525#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
526pub struct DropTableTask {
527 pub catalog: String,
528 pub schema: String,
529 pub table: String,
530 pub table_id: TableId,
531 #[serde(default)]
532 pub drop_if_exists: bool,
533}
534
535impl DropTableTask {
536 pub fn table_ref(&self) -> TableReference {
537 TableReference {
538 catalog: &self.catalog,
539 schema: &self.schema,
540 table: &self.table,
541 }
542 }
543
544 pub fn table_name(&self) -> TableName {
545 TableName {
546 catalog_name: self.catalog.to_string(),
547 schema_name: self.schema.to_string(),
548 table_name: self.table.to_string(),
549 }
550 }
551}
552
553impl TryFrom<PbDropTableTask> for DropTableTask {
554 type Error = error::Error;
555
556 fn try_from(pb: PbDropTableTask) -> Result<Self> {
557 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
558 err_msg: "expected drop table",
559 })?;
560
561 Ok(Self {
562 catalog: drop_table.catalog_name,
563 schema: drop_table.schema_name,
564 table: drop_table.table_name,
565 table_id: drop_table
566 .table_id
567 .context(error::InvalidProtoMsgSnafu {
568 err_msg: "expected table_id",
569 })?
570 .id,
571 drop_if_exists: drop_table.drop_if_exists,
572 })
573 }
574}
575
576impl From<DropTableTask> for PbDropTableTask {
577 fn from(task: DropTableTask) -> Self {
578 PbDropTableTask {
579 drop_table: Some(DropTableExpr {
580 catalog_name: task.catalog,
581 schema_name: task.schema,
582 table_name: task.table,
583 table_id: Some(api::v1::TableId { id: task.table_id }),
584 drop_if_exists: task.drop_if_exists,
585 }),
586 }
587 }
588}
589
590#[derive(Debug, PartialEq, Clone)]
591pub struct CreateTableTask {
592 pub create_table: CreateTableExpr,
593 pub partitions: Vec<Partition>,
594 pub table_info: RawTableInfo,
595}
596
597impl TryFrom<PbCreateTableTask> for CreateTableTask {
598 type Error = error::Error;
599
600 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
601 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
602
603 Ok(CreateTableTask::new(
604 pb.create_table.context(error::InvalidProtoMsgSnafu {
605 err_msg: "expected create table",
606 })?,
607 pb.partitions,
608 table_info,
609 ))
610 }
611}
612
613impl TryFrom<CreateTableTask> for PbCreateTableTask {
614 type Error = error::Error;
615
616 fn try_from(task: CreateTableTask) -> Result<Self> {
617 Ok(PbCreateTableTask {
618 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
619 create_table: Some(task.create_table),
620 partitions: task.partitions,
621 })
622 }
623}
624
625impl CreateTableTask {
626 pub fn new(
627 expr: CreateTableExpr,
628 partitions: Vec<Partition>,
629 table_info: RawTableInfo,
630 ) -> CreateTableTask {
631 CreateTableTask {
632 create_table: expr,
633 partitions,
634 table_info,
635 }
636 }
637
638 pub fn table_name(&self) -> TableName {
639 let table = &self.create_table;
640
641 TableName {
642 catalog_name: table.catalog_name.to_string(),
643 schema_name: table.schema_name.to_string(),
644 table_name: table.table_name.to_string(),
645 }
646 }
647
648 pub fn table_ref(&self) -> TableReference {
649 let table = &self.create_table;
650
651 TableReference {
652 catalog: &table.catalog_name,
653 schema: &table.schema_name,
654 table: &table.table_name,
655 }
656 }
657
658 pub fn set_table_id(&mut self, table_id: TableId) {
660 self.table_info.ident.table_id = table_id;
661 }
662
663 pub fn sort_columns(&mut self) {
668 self.create_table
671 .column_defs
672 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
673
674 self.table_info.sort_columns();
675 }
676}
677
678impl Serialize for CreateTableTask {
679 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
680 where
681 S: serde::Serializer,
682 {
683 let table_info = serde_json::to_vec(&self.table_info)
684 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
685
686 let pb = PbCreateTableTask {
687 create_table: Some(self.create_table.clone()),
688 partitions: self.partitions.clone(),
689 table_info,
690 };
691 let buf = pb.encode_to_vec();
692 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
693 serializer.serialize_str(&encoded)
694 }
695}
696
697impl<'de> Deserialize<'de> for CreateTableTask {
698 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
699 where
700 D: serde::Deserializer<'de>,
701 {
702 let encoded = String::deserialize(deserializer)?;
703 let buf = general_purpose::STANDARD_NO_PAD
704 .decode(encoded)
705 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
706 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
707 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
708
709 let expr = CreateTableTask::try_from(expr)
710 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
711
712 Ok(expr)
713 }
714}
715
716#[derive(Debug, PartialEq, Clone)]
717pub struct AlterTableTask {
718 pub alter_table: AlterTableExpr,
720}
721
722impl AlterTableTask {
723 pub fn validate(&self) -> Result<()> {
724 self.alter_table
725 .kind
726 .as_ref()
727 .context(error::UnexpectedSnafu {
728 err_msg: "'kind' is absent",
729 })?;
730 Ok(())
731 }
732
733 pub fn table_ref(&self) -> TableReference {
734 TableReference {
735 catalog: &self.alter_table.catalog_name,
736 schema: &self.alter_table.schema_name,
737 table: &self.alter_table.table_name,
738 }
739 }
740
741 pub fn table_name(&self) -> TableName {
742 let table = &self.alter_table;
743
744 TableName {
745 catalog_name: table.catalog_name.to_string(),
746 schema_name: table.schema_name.to_string(),
747 table_name: table.table_name.to_string(),
748 }
749 }
750}
751
752impl TryFrom<PbAlterTableTask> for AlterTableTask {
753 type Error = error::Error;
754
755 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
756 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
757 err_msg: "expected alter_table",
758 })?;
759
760 Ok(AlterTableTask { alter_table })
761 }
762}
763
764impl TryFrom<AlterTableTask> for PbAlterTableTask {
765 type Error = error::Error;
766
767 fn try_from(task: AlterTableTask) -> Result<Self> {
768 Ok(PbAlterTableTask {
769 alter_table: Some(task.alter_table),
770 })
771 }
772}
773
774impl Serialize for AlterTableTask {
775 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
776 where
777 S: serde::Serializer,
778 {
779 let pb = PbAlterTableTask {
780 alter_table: Some(self.alter_table.clone()),
781 };
782 let buf = pb.encode_to_vec();
783 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
784 serializer.serialize_str(&encoded)
785 }
786}
787
788impl<'de> Deserialize<'de> for AlterTableTask {
789 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
790 where
791 D: serde::Deserializer<'de>,
792 {
793 let encoded = String::deserialize(deserializer)?;
794 let buf = general_purpose::STANDARD_NO_PAD
795 .decode(encoded)
796 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
797 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
798 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
799
800 let expr = AlterTableTask::try_from(expr)
801 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
802
803 Ok(expr)
804 }
805}
806
807#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
808pub struct TruncateTableTask {
809 pub catalog: String,
810 pub schema: String,
811 pub table: String,
812 pub table_id: TableId,
813}
814
815impl TruncateTableTask {
816 pub fn table_ref(&self) -> TableReference {
817 TableReference {
818 catalog: &self.catalog,
819 schema: &self.schema,
820 table: &self.table,
821 }
822 }
823
824 pub fn table_name(&self) -> TableName {
825 TableName {
826 catalog_name: self.catalog.to_string(),
827 schema_name: self.schema.to_string(),
828 table_name: self.table.to_string(),
829 }
830 }
831}
832
833impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
834 type Error = error::Error;
835
836 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
837 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
838 err_msg: "expected truncate table",
839 })?;
840
841 Ok(Self {
842 catalog: truncate_table.catalog_name,
843 schema: truncate_table.schema_name,
844 table: truncate_table.table_name,
845 table_id: truncate_table
846 .table_id
847 .context(error::InvalidProtoMsgSnafu {
848 err_msg: "expected table_id",
849 })?
850 .id,
851 })
852 }
853}
854
855impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
856 type Error = error::Error;
857
858 fn try_from(task: TruncateTableTask) -> Result<Self> {
859 Ok(PbTruncateTableTask {
860 truncate_table: Some(TruncateTableExpr {
861 catalog_name: task.catalog,
862 schema_name: task.schema,
863 table_name: task.table,
864 table_id: Some(api::v1::TableId { id: task.table_id }),
865 }),
866 })
867 }
868}
869
870#[serde_as]
871#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
872pub struct CreateDatabaseTask {
873 pub catalog: String,
874 pub schema: String,
875 pub create_if_not_exists: bool,
876 #[serde_as(deserialize_as = "DefaultOnNull")]
877 pub options: HashMap<String, String>,
878}
879
880impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
881 type Error = error::Error;
882
883 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
884 let CreateDatabaseExpr {
885 catalog_name,
886 schema_name,
887 create_if_not_exists,
888 options,
889 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
890 err_msg: "expected create database",
891 })?;
892
893 Ok(CreateDatabaseTask {
894 catalog: catalog_name,
895 schema: schema_name,
896 create_if_not_exists,
897 options,
898 })
899 }
900}
901
902impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
903 type Error = error::Error;
904
905 fn try_from(
906 CreateDatabaseTask {
907 catalog,
908 schema,
909 create_if_not_exists,
910 options,
911 }: CreateDatabaseTask,
912 ) -> Result<Self> {
913 Ok(PbCreateDatabaseTask {
914 create_database: Some(CreateDatabaseExpr {
915 catalog_name: catalog,
916 schema_name: schema,
917 create_if_not_exists,
918 options,
919 }),
920 })
921 }
922}
923
924#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
925pub struct DropDatabaseTask {
926 pub catalog: String,
927 pub schema: String,
928 pub drop_if_exists: bool,
929}
930
931impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
932 type Error = error::Error;
933
934 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
935 let DropDatabaseExpr {
936 catalog_name,
937 schema_name,
938 drop_if_exists,
939 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
940 err_msg: "expected drop database",
941 })?;
942
943 Ok(DropDatabaseTask {
944 catalog: catalog_name,
945 schema: schema_name,
946 drop_if_exists,
947 })
948 }
949}
950
951impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
952 type Error = error::Error;
953
954 fn try_from(
955 DropDatabaseTask {
956 catalog,
957 schema,
958 drop_if_exists,
959 }: DropDatabaseTask,
960 ) -> Result<Self> {
961 Ok(PbDropDatabaseTask {
962 drop_database: Some(DropDatabaseExpr {
963 catalog_name: catalog,
964 schema_name: schema,
965 drop_if_exists,
966 }),
967 })
968 }
969}
970
971#[derive(Debug, PartialEq, Clone)]
972pub struct AlterDatabaseTask {
973 pub alter_expr: AlterDatabaseExpr,
974}
975
976impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
977 type Error = error::Error;
978
979 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
980 Ok(PbAlterDatabaseTask {
981 task: Some(task.alter_expr),
982 })
983 }
984}
985
986impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
987 type Error = error::Error;
988
989 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
990 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
991 err_msg: "expected alter database",
992 })?;
993
994 Ok(AlterDatabaseTask { alter_expr })
995 }
996}
997
998impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
999 type Error = error::Error;
1000
1001 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1002 match pb {
1003 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1004 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1005 options
1006 .set_database_options
1007 .into_iter()
1008 .map(SetDatabaseOption::try_from)
1009 .collect::<Result<Vec<_>>>()?,
1010 )))
1011 }
1012 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1013 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1014 options
1015 .keys
1016 .iter()
1017 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1018 .collect::<Result<Vec<_>>>()?,
1019 )),
1020 ),
1021 }
1022 }
1023}
1024
1025const TTL_KEY: &str = "ttl";
1026
1027impl TryFrom<PbOption> for SetDatabaseOption {
1028 type Error = error::Error;
1029
1030 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1031 match key.to_ascii_lowercase().as_str() {
1032 TTL_KEY => {
1033 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1034 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1035
1036 Ok(SetDatabaseOption::Ttl(ttl))
1037 }
1038 _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1039 }
1040 }
1041}
1042
1043#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1044pub enum SetDatabaseOption {
1045 Ttl(DatabaseTimeToLive),
1046}
1047
1048#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1049pub enum UnsetDatabaseOption {
1050 Ttl,
1051}
1052
1053impl TryFrom<&str> for UnsetDatabaseOption {
1054 type Error = error::Error;
1055
1056 fn try_from(key: &str) -> Result<Self> {
1057 match key.to_ascii_lowercase().as_str() {
1058 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1059 _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1060 }
1061 }
1062}
1063
1064#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1065pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1066
1067#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1068pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1069
1070#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1071pub enum AlterDatabaseKind {
1072 SetDatabaseOptions(SetDatabaseOptions),
1073 UnsetDatabaseOptions(UnsetDatabaseOptions),
1074}
1075
1076impl AlterDatabaseTask {
1077 pub fn catalog(&self) -> &str {
1078 &self.alter_expr.catalog_name
1079 }
1080
1081 pub fn schema(&self) -> &str {
1082 &self.alter_expr.catalog_name
1083 }
1084}
1085
1086#[derive(Debug, Clone, Serialize, Deserialize)]
1088pub struct CreateFlowTask {
1089 pub catalog_name: String,
1090 pub flow_name: String,
1091 pub source_table_names: Vec<TableName>,
1092 pub sink_table_name: TableName,
1093 pub or_replace: bool,
1094 pub create_if_not_exists: bool,
1095 pub expire_after: Option<i64>,
1097 pub comment: String,
1098 pub sql: String,
1099 pub flow_options: HashMap<String, String>,
1100}
1101
1102impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1103 type Error = error::Error;
1104
1105 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1106 let CreateFlowExpr {
1107 catalog_name,
1108 flow_name,
1109 source_table_names,
1110 sink_table_name,
1111 or_replace,
1112 create_if_not_exists,
1113 expire_after,
1114 comment,
1115 sql,
1116 flow_options,
1117 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1118 err_msg: "expected create_flow",
1119 })?;
1120
1121 Ok(CreateFlowTask {
1122 catalog_name,
1123 flow_name,
1124 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1125 sink_table_name: sink_table_name
1126 .context(error::InvalidProtoMsgSnafu {
1127 err_msg: "expected sink_table_name",
1128 })?
1129 .into(),
1130 or_replace,
1131 create_if_not_exists,
1132 expire_after: expire_after.map(|e| e.value),
1133 comment,
1134 sql,
1135 flow_options,
1136 })
1137 }
1138}
1139
1140impl From<CreateFlowTask> for PbCreateFlowTask {
1141 fn from(
1142 CreateFlowTask {
1143 catalog_name,
1144 flow_name,
1145 source_table_names,
1146 sink_table_name,
1147 or_replace,
1148 create_if_not_exists,
1149 expire_after,
1150 comment,
1151 sql,
1152 flow_options,
1153 }: CreateFlowTask,
1154 ) -> Self {
1155 PbCreateFlowTask {
1156 create_flow: Some(CreateFlowExpr {
1157 catalog_name,
1158 flow_name,
1159 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1160 sink_table_name: Some(sink_table_name.into()),
1161 or_replace,
1162 create_if_not_exists,
1163 expire_after: expire_after.map(|value| ExpireAfter { value }),
1164 comment,
1165 sql,
1166 flow_options,
1167 }),
1168 }
1169 }
1170}
1171
1172#[derive(Debug, Clone, Serialize, Deserialize)]
1174pub struct DropFlowTask {
1175 pub catalog_name: String,
1176 pub flow_name: String,
1177 pub flow_id: FlowId,
1178 pub drop_if_exists: bool,
1179}
1180
1181impl TryFrom<PbDropFlowTask> for DropFlowTask {
1182 type Error = error::Error;
1183
1184 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1185 let DropFlowExpr {
1186 catalog_name,
1187 flow_name,
1188 flow_id,
1189 drop_if_exists,
1190 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1191 err_msg: "expected drop_flow",
1192 })?;
1193 let flow_id = flow_id
1194 .context(error::InvalidProtoMsgSnafu {
1195 err_msg: "expected flow_id",
1196 })?
1197 .id;
1198 Ok(DropFlowTask {
1199 catalog_name,
1200 flow_name,
1201 flow_id,
1202 drop_if_exists,
1203 })
1204 }
1205}
1206
1207impl From<DropFlowTask> for PbDropFlowTask {
1208 fn from(
1209 DropFlowTask {
1210 catalog_name,
1211 flow_name,
1212 flow_id,
1213 drop_if_exists,
1214 }: DropFlowTask,
1215 ) -> Self {
1216 PbDropFlowTask {
1217 drop_flow: Some(DropFlowExpr {
1218 catalog_name,
1219 flow_name,
1220 flow_id: Some(api::v1::FlowId { id: flow_id }),
1221 drop_if_exists,
1222 }),
1223 }
1224 }
1225}
1226
1227#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1228pub struct QueryContext {
1229 current_catalog: String,
1230 current_schema: String,
1231 timezone: String,
1232 extensions: HashMap<String, String>,
1233 channel: u8,
1234}
1235
1236impl From<QueryContextRef> for QueryContext {
1237 fn from(query_context: QueryContextRef) -> Self {
1238 QueryContext {
1239 current_catalog: query_context.current_catalog().to_string(),
1240 current_schema: query_context.current_schema().to_string(),
1241 timezone: query_context.timezone().to_string(),
1242 extensions: query_context.extensions(),
1243 channel: query_context.channel() as u8,
1244 }
1245 }
1246}
1247
1248impl TryFrom<QueryContext> for session::context::QueryContext {
1249 type Error = error::Error;
1250 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1251 Ok(QueryContextBuilder::default()
1252 .current_catalog(value.current_catalog)
1253 .current_schema(value.current_schema)
1254 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1255 .extensions(value.extensions)
1256 .channel((value.channel as u32).into())
1257 .build())
1258 }
1259}
1260
1261impl From<QueryContext> for PbQueryContext {
1262 fn from(
1263 QueryContext {
1264 current_catalog,
1265 current_schema,
1266 timezone,
1267 extensions,
1268 channel,
1269 }: QueryContext,
1270 ) -> Self {
1271 PbQueryContext {
1272 current_catalog,
1273 current_schema,
1274 timezone,
1275 extensions,
1276 channel: channel as u32,
1277 snapshot_seqs: None,
1278 explain: None,
1279 }
1280 }
1281}
1282
1283#[cfg(test)]
1284mod tests {
1285 use std::sync::Arc;
1286
1287 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1288 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1289 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1290 use store_api::storage::ConcreteDataType;
1291 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1292 use table::test_util::table_info::test_table_info;
1293
1294 use super::{AlterTableTask, CreateTableTask};
1295
1296 #[test]
1297 fn test_basic_ser_de_create_table_task() {
1298 let schema = SchemaBuilder::default().build().unwrap();
1299 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1300 let task = CreateTableTask::new(
1301 CreateTableExpr::default(),
1302 Vec::new(),
1303 RawTableInfo::from(table_info),
1304 );
1305
1306 let output = serde_json::to_vec(&task).unwrap();
1307
1308 let de = serde_json::from_slice(&output).unwrap();
1309 assert_eq!(task, de);
1310 }
1311
1312 #[test]
1313 fn test_basic_ser_de_alter_table_task() {
1314 let task = AlterTableTask {
1315 alter_table: AlterTableExpr::default(),
1316 };
1317
1318 let output = serde_json::to_vec(&task).unwrap();
1319
1320 let de = serde_json::from_slice(&output).unwrap();
1321 assert_eq!(task, de);
1322 }
1323
1324 #[test]
1325 fn test_sort_columns() {
1326 let raw_schema = RawSchema {
1328 column_schemas: vec![
1329 ColumnSchema::new(
1330 "column3".to_string(),
1331 ConcreteDataType::string_datatype(),
1332 true,
1333 ),
1334 ColumnSchema::new(
1335 "column1".to_string(),
1336 ConcreteDataType::timestamp_millisecond_datatype(),
1337 false,
1338 )
1339 .with_time_index(true),
1340 ColumnSchema::new(
1341 "column2".to_string(),
1342 ConcreteDataType::float64_datatype(),
1343 true,
1344 ),
1345 ],
1346 timestamp_index: Some(1),
1347 version: 0,
1348 };
1349
1350 let raw_table_meta = RawTableMeta {
1352 schema: raw_schema,
1353 primary_key_indices: vec![0],
1354 value_indices: vec![2],
1355 engine: METRIC_ENGINE_NAME.to_string(),
1356 next_column_id: 0,
1357 region_numbers: vec![0],
1358 options: Default::default(),
1359 created_on: Default::default(),
1360 partition_key_indices: Default::default(),
1361 };
1362
1363 let raw_table_info = RawTableInfo {
1365 ident: Default::default(),
1366 meta: raw_table_meta,
1367 name: Default::default(),
1368 desc: Default::default(),
1369 catalog_name: Default::default(),
1370 schema_name: Default::default(),
1371 table_type: TableType::Base,
1372 };
1373
1374 let create_table_expr = CreateTableExpr {
1376 column_defs: vec![
1377 ColumnDef {
1378 name: "column3".to_string(),
1379 semantic_type: SemanticType::Tag as i32,
1380 ..Default::default()
1381 },
1382 ColumnDef {
1383 name: "column1".to_string(),
1384 semantic_type: SemanticType::Timestamp as i32,
1385 ..Default::default()
1386 },
1387 ColumnDef {
1388 name: "column2".to_string(),
1389 semantic_type: SemanticType::Field as i32,
1390 ..Default::default()
1391 },
1392 ],
1393 primary_keys: vec!["column3".to_string()],
1394 ..Default::default()
1395 };
1396
1397 let mut create_table_task =
1398 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1399
1400 create_table_task.sort_columns();
1402
1403 assert_eq!(
1405 create_table_task.create_table.column_defs[0].name,
1406 "column1".to_string()
1407 );
1408 assert_eq!(
1409 create_table_task.create_table.column_defs[1].name,
1410 "column2".to_string()
1411 );
1412 assert_eq!(
1413 create_table_task.create_table.column_defs[2].name,
1414 "column3".to_string()
1415 );
1416
1417 assert_eq!(
1419 create_table_task.table_info.meta.schema.timestamp_index,
1420 Some(0)
1421 );
1422 assert_eq!(
1423 create_table_task.table_info.meta.primary_key_indices,
1424 vec![2]
1425 );
1426 assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
1427 }
1428}