1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
22use api::v1::meta::ddl_task_request::Task;
23use api::v1::meta::{
24 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
25 AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
26 CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
27 CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
28 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
29 DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
30 DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
31 DropViewTask as PbDropViewTask, Partition, ProcedureId,
32 TruncateTableTask as PbTruncateTableTask,
33};
34use api::v1::{
35 AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
36 CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
37 Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
38};
39use base64::engine::general_purpose;
40use base64::Engine as _;
41use common_time::{DatabaseTimeToLive, Timezone};
42use prost::Message;
43use serde::{Deserialize, Serialize};
44use serde_with::{serde_as, DefaultOnNull};
45use session::context::{QueryContextBuilder, QueryContextRef};
46use snafu::{OptionExt, ResultExt};
47use table::metadata::{RawTableInfo, TableId};
48use table::table_name::TableName;
49use table::table_reference::TableReference;
50
51use crate::error::{
52 self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
53 Result,
54};
55use crate::key::FlowId;
56
57#[derive(Debug, Clone)]
59pub enum DdlTask {
60 CreateTable(CreateTableTask),
61 DropTable(DropTableTask),
62 AlterTable(AlterTableTask),
63 TruncateTable(TruncateTableTask),
64 CreateLogicalTables(Vec<CreateTableTask>),
65 DropLogicalTables(Vec<DropTableTask>),
66 AlterLogicalTables(Vec<AlterTableTask>),
67 CreateDatabase(CreateDatabaseTask),
68 DropDatabase(DropDatabaseTask),
69 AlterDatabase(AlterDatabaseTask),
70 CreateFlow(CreateFlowTask),
71 DropFlow(DropFlowTask),
72 #[cfg(feature = "enterprise")]
73 DropTrigger(trigger::DropTriggerTask),
74 CreateView(CreateViewTask),
75 DropView(DropViewTask),
76 #[cfg(feature = "enterprise")]
77 CreateTrigger(trigger::CreateTriggerTask),
78}
79
80impl DdlTask {
81 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
83 DdlTask::CreateFlow(expr)
84 }
85
86 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
88 DdlTask::DropFlow(expr)
89 }
90
91 pub fn new_drop_view(expr: DropViewTask) -> Self {
93 DdlTask::DropView(expr)
94 }
95
96 pub fn new_create_table(
98 expr: CreateTableExpr,
99 partitions: Vec<Partition>,
100 table_info: RawTableInfo,
101 ) -> Self {
102 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
103 }
104
105 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
107 DdlTask::CreateLogicalTables(
108 table_data
109 .into_iter()
110 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
111 .collect(),
112 )
113 }
114
115 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
117 DdlTask::AlterLogicalTables(
118 table_data
119 .into_iter()
120 .map(|alter_table| AlterTableTask { alter_table })
121 .collect(),
122 )
123 }
124
125 pub fn new_drop_table(
127 catalog: String,
128 schema: String,
129 table: String,
130 table_id: TableId,
131 drop_if_exists: bool,
132 ) -> Self {
133 DdlTask::DropTable(DropTableTask {
134 catalog,
135 schema,
136 table,
137 table_id,
138 drop_if_exists,
139 })
140 }
141
142 pub fn new_create_database(
144 catalog: String,
145 schema: String,
146 create_if_not_exists: bool,
147 options: HashMap<String, String>,
148 ) -> Self {
149 DdlTask::CreateDatabase(CreateDatabaseTask {
150 catalog,
151 schema,
152 create_if_not_exists,
153 options,
154 })
155 }
156
157 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
159 DdlTask::DropDatabase(DropDatabaseTask {
160 catalog,
161 schema,
162 drop_if_exists,
163 })
164 }
165
166 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
168 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
169 }
170
171 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
173 DdlTask::AlterTable(AlterTableTask { alter_table })
174 }
175
176 pub fn new_truncate_table(
178 catalog: String,
179 schema: String,
180 table: String,
181 table_id: TableId,
182 ) -> Self {
183 DdlTask::TruncateTable(TruncateTableTask {
184 catalog,
185 schema,
186 table,
187 table_id,
188 })
189 }
190
191 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
193 DdlTask::CreateView(CreateViewTask {
194 create_view,
195 view_info,
196 })
197 }
198}
199
200impl TryFrom<Task> for DdlTask {
201 type Error = error::Error;
202 fn try_from(task: Task) -> Result<Self> {
203 match task {
204 Task::CreateTableTask(create_table) => {
205 Ok(DdlTask::CreateTable(create_table.try_into()?))
206 }
207 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
208 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
209 Task::TruncateTableTask(truncate_table) => {
210 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
211 }
212 Task::CreateTableTasks(create_tables) => {
213 let tasks = create_tables
214 .tasks
215 .into_iter()
216 .map(|task| task.try_into())
217 .collect::<Result<Vec<_>>>()?;
218
219 Ok(DdlTask::CreateLogicalTables(tasks))
220 }
221 Task::DropTableTasks(drop_tables) => {
222 let tasks = drop_tables
223 .tasks
224 .into_iter()
225 .map(|task| task.try_into())
226 .collect::<Result<Vec<_>>>()?;
227
228 Ok(DdlTask::DropLogicalTables(tasks))
229 }
230 Task::AlterTableTasks(alter_tables) => {
231 let tasks = alter_tables
232 .tasks
233 .into_iter()
234 .map(|task| task.try_into())
235 .collect::<Result<Vec<_>>>()?;
236
237 Ok(DdlTask::AlterLogicalTables(tasks))
238 }
239 Task::CreateDatabaseTask(create_database) => {
240 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
241 }
242 Task::DropDatabaseTask(drop_database) => {
243 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
244 }
245 Task::AlterDatabaseTask(alter_database) => {
246 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
247 }
248 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
249 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
250 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
251 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
252 Task::CreateTriggerTask(create_trigger) => {
253 #[cfg(feature = "enterprise")]
254 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
255 #[cfg(not(feature = "enterprise"))]
256 {
257 let _ = create_trigger;
258 crate::error::UnsupportedSnafu {
259 operation: "create trigger",
260 }
261 .fail()
262 }
263 }
264 Task::DropTriggerTask(drop_trigger) => {
265 #[cfg(feature = "enterprise")]
266 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
267 #[cfg(not(feature = "enterprise"))]
268 {
269 let _ = drop_trigger;
270 crate::error::UnsupportedSnafu {
271 operation: "drop trigger",
272 }
273 .fail()
274 }
275 }
276 }
277 }
278}
279
280#[derive(Clone)]
281pub struct SubmitDdlTaskRequest {
282 pub query_context: QueryContextRef,
283 pub task: DdlTask,
284}
285
286impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
287 type Error = error::Error;
288
289 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
290 let task = match request.task {
291 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
292 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
293 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
294 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
295 DdlTask::CreateLogicalTables(tasks) => {
296 let tasks = tasks
297 .into_iter()
298 .map(|task| task.try_into())
299 .collect::<Result<Vec<_>>>()?;
300
301 Task::CreateTableTasks(PbCreateTableTasks { tasks })
302 }
303 DdlTask::DropLogicalTables(tasks) => {
304 let tasks = tasks
305 .into_iter()
306 .map(|task| task.into())
307 .collect::<Vec<_>>();
308
309 Task::DropTableTasks(PbDropTableTasks { tasks })
310 }
311 DdlTask::AlterLogicalTables(tasks) => {
312 let tasks = tasks
313 .into_iter()
314 .map(|task| task.try_into())
315 .collect::<Result<Vec<_>>>()?;
316
317 Task::AlterTableTasks(PbAlterTableTasks { tasks })
318 }
319 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
320 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
321 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
322 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
323 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
324 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
325 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
326 #[cfg(feature = "enterprise")]
327 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
328 #[cfg(feature = "enterprise")]
329 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
330 };
331
332 Ok(Self {
333 header: None,
334 query_context: Some((*request.query_context).clone().into()),
335 task: Some(task),
336 })
337 }
338}
339
340#[derive(Debug, Default)]
341pub struct SubmitDdlTaskResponse {
342 pub key: Vec<u8>,
343 pub table_ids: Vec<TableId>,
345}
346
347impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
348 type Error = error::Error;
349
350 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
351 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
352 Ok(Self {
353 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
354 table_ids,
355 })
356 }
357}
358
359impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
360 fn from(val: SubmitDdlTaskResponse) -> Self {
361 Self {
362 pid: Some(ProcedureId { key: val.key }),
363 table_ids: val
364 .table_ids
365 .into_iter()
366 .map(|id| api::v1::TableId { id })
367 .collect(),
368 ..Default::default()
369 }
370 }
371}
372
373#[derive(Debug, PartialEq, Clone)]
375pub struct CreateViewTask {
376 pub create_view: CreateViewExpr,
377 pub view_info: RawTableInfo,
378}
379
380impl CreateViewTask {
381 pub fn table_ref(&self) -> TableReference {
383 TableReference {
384 catalog: &self.create_view.catalog_name,
385 schema: &self.create_view.schema_name,
386 table: &self.create_view.view_name,
387 }
388 }
389
390 pub fn raw_logical_plan(&self) -> &Vec<u8> {
392 &self.create_view.logical_plan
393 }
394
395 pub fn view_definition(&self) -> &str {
397 &self.create_view.definition
398 }
399
400 pub fn table_names(&self) -> HashSet<TableName> {
402 self.create_view
403 .table_names
404 .iter()
405 .map(|t| t.clone().into())
406 .collect()
407 }
408
409 pub fn columns(&self) -> &Vec<String> {
411 &self.create_view.columns
412 }
413
414 pub fn plan_columns(&self) -> &Vec<String> {
416 &self.create_view.plan_columns
417 }
418}
419
420impl TryFrom<PbCreateViewTask> for CreateViewTask {
421 type Error = error::Error;
422
423 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
424 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
425
426 Ok(CreateViewTask {
427 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
428 err_msg: "expected create view",
429 })?,
430 view_info,
431 })
432 }
433}
434
435impl TryFrom<CreateViewTask> for PbCreateViewTask {
436 type Error = error::Error;
437
438 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
439 Ok(PbCreateViewTask {
440 create_view: Some(task.create_view),
441 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
442 })
443 }
444}
445
446impl Serialize for CreateViewTask {
447 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
448 where
449 S: serde::Serializer,
450 {
451 let view_info = serde_json::to_vec(&self.view_info)
452 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
453
454 let pb = PbCreateViewTask {
455 create_view: Some(self.create_view.clone()),
456 view_info,
457 };
458 let buf = pb.encode_to_vec();
459 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
460 serializer.serialize_str(&encoded)
461 }
462}
463
464impl<'de> Deserialize<'de> for CreateViewTask {
465 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
466 where
467 D: serde::Deserializer<'de>,
468 {
469 let encoded = String::deserialize(deserializer)?;
470 let buf = general_purpose::STANDARD_NO_PAD
471 .decode(encoded)
472 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
473 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
474 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
475
476 let expr = CreateViewTask::try_from(expr)
477 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
478
479 Ok(expr)
480 }
481}
482
483#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
485pub struct DropViewTask {
486 pub catalog: String,
487 pub schema: String,
488 pub view: String,
489 pub view_id: TableId,
490 pub drop_if_exists: bool,
491}
492
493impl DropViewTask {
494 pub fn table_ref(&self) -> TableReference {
496 TableReference {
497 catalog: &self.catalog,
498 schema: &self.schema,
499 table: &self.view,
500 }
501 }
502}
503
504impl TryFrom<PbDropViewTask> for DropViewTask {
505 type Error = error::Error;
506
507 fn try_from(pb: PbDropViewTask) -> Result<Self> {
508 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
509 err_msg: "expected drop view",
510 })?;
511
512 Ok(DropViewTask {
513 catalog: expr.catalog_name,
514 schema: expr.schema_name,
515 view: expr.view_name,
516 view_id: expr
517 .view_id
518 .context(error::InvalidProtoMsgSnafu {
519 err_msg: "expected view_id",
520 })?
521 .id,
522 drop_if_exists: expr.drop_if_exists,
523 })
524 }
525}
526
527impl From<DropViewTask> for PbDropViewTask {
528 fn from(task: DropViewTask) -> Self {
529 PbDropViewTask {
530 drop_view: Some(DropViewExpr {
531 catalog_name: task.catalog,
532 schema_name: task.schema,
533 view_name: task.view,
534 view_id: Some(api::v1::TableId { id: task.view_id }),
535 drop_if_exists: task.drop_if_exists,
536 }),
537 }
538 }
539}
540
541#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
542pub struct DropTableTask {
543 pub catalog: String,
544 pub schema: String,
545 pub table: String,
546 pub table_id: TableId,
547 #[serde(default)]
548 pub drop_if_exists: bool,
549}
550
551impl DropTableTask {
552 pub fn table_ref(&self) -> TableReference {
553 TableReference {
554 catalog: &self.catalog,
555 schema: &self.schema,
556 table: &self.table,
557 }
558 }
559
560 pub fn table_name(&self) -> TableName {
561 TableName {
562 catalog_name: self.catalog.to_string(),
563 schema_name: self.schema.to_string(),
564 table_name: self.table.to_string(),
565 }
566 }
567}
568
569impl TryFrom<PbDropTableTask> for DropTableTask {
570 type Error = error::Error;
571
572 fn try_from(pb: PbDropTableTask) -> Result<Self> {
573 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
574 err_msg: "expected drop table",
575 })?;
576
577 Ok(Self {
578 catalog: drop_table.catalog_name,
579 schema: drop_table.schema_name,
580 table: drop_table.table_name,
581 table_id: drop_table
582 .table_id
583 .context(error::InvalidProtoMsgSnafu {
584 err_msg: "expected table_id",
585 })?
586 .id,
587 drop_if_exists: drop_table.drop_if_exists,
588 })
589 }
590}
591
592impl From<DropTableTask> for PbDropTableTask {
593 fn from(task: DropTableTask) -> Self {
594 PbDropTableTask {
595 drop_table: Some(DropTableExpr {
596 catalog_name: task.catalog,
597 schema_name: task.schema,
598 table_name: task.table,
599 table_id: Some(api::v1::TableId { id: task.table_id }),
600 drop_if_exists: task.drop_if_exists,
601 }),
602 }
603 }
604}
605
606#[derive(Debug, PartialEq, Clone)]
607pub struct CreateTableTask {
608 pub create_table: CreateTableExpr,
609 pub partitions: Vec<Partition>,
610 pub table_info: RawTableInfo,
611}
612
613impl TryFrom<PbCreateTableTask> for CreateTableTask {
614 type Error = error::Error;
615
616 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
617 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
618
619 Ok(CreateTableTask::new(
620 pb.create_table.context(error::InvalidProtoMsgSnafu {
621 err_msg: "expected create table",
622 })?,
623 pb.partitions,
624 table_info,
625 ))
626 }
627}
628
629impl TryFrom<CreateTableTask> for PbCreateTableTask {
630 type Error = error::Error;
631
632 fn try_from(task: CreateTableTask) -> Result<Self> {
633 Ok(PbCreateTableTask {
634 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
635 create_table: Some(task.create_table),
636 partitions: task.partitions,
637 })
638 }
639}
640
641impl CreateTableTask {
642 pub fn new(
643 expr: CreateTableExpr,
644 partitions: Vec<Partition>,
645 table_info: RawTableInfo,
646 ) -> CreateTableTask {
647 CreateTableTask {
648 create_table: expr,
649 partitions,
650 table_info,
651 }
652 }
653
654 pub fn table_name(&self) -> TableName {
655 let table = &self.create_table;
656
657 TableName {
658 catalog_name: table.catalog_name.to_string(),
659 schema_name: table.schema_name.to_string(),
660 table_name: table.table_name.to_string(),
661 }
662 }
663
664 pub fn table_ref(&self) -> TableReference {
665 let table = &self.create_table;
666
667 TableReference {
668 catalog: &table.catalog_name,
669 schema: &table.schema_name,
670 table: &table.table_name,
671 }
672 }
673
674 pub fn set_table_id(&mut self, table_id: TableId) {
676 self.table_info.ident.table_id = table_id;
677 }
678
679 pub fn sort_columns(&mut self) {
684 self.create_table
687 .column_defs
688 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
689
690 self.table_info.sort_columns();
691 }
692}
693
694impl Serialize for CreateTableTask {
695 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
696 where
697 S: serde::Serializer,
698 {
699 let table_info = serde_json::to_vec(&self.table_info)
700 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
701
702 let pb = PbCreateTableTask {
703 create_table: Some(self.create_table.clone()),
704 partitions: self.partitions.clone(),
705 table_info,
706 };
707 let buf = pb.encode_to_vec();
708 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
709 serializer.serialize_str(&encoded)
710 }
711}
712
713impl<'de> Deserialize<'de> for CreateTableTask {
714 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
715 where
716 D: serde::Deserializer<'de>,
717 {
718 let encoded = String::deserialize(deserializer)?;
719 let buf = general_purpose::STANDARD_NO_PAD
720 .decode(encoded)
721 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
722 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
723 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
724
725 let expr = CreateTableTask::try_from(expr)
726 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
727
728 Ok(expr)
729 }
730}
731
732#[derive(Debug, PartialEq, Clone)]
733pub struct AlterTableTask {
734 pub alter_table: AlterTableExpr,
736}
737
738impl AlterTableTask {
739 pub fn validate(&self) -> Result<()> {
740 self.alter_table
741 .kind
742 .as_ref()
743 .context(error::UnexpectedSnafu {
744 err_msg: "'kind' is absent",
745 })?;
746 Ok(())
747 }
748
749 pub fn table_ref(&self) -> TableReference {
750 TableReference {
751 catalog: &self.alter_table.catalog_name,
752 schema: &self.alter_table.schema_name,
753 table: &self.alter_table.table_name,
754 }
755 }
756
757 pub fn table_name(&self) -> TableName {
758 let table = &self.alter_table;
759
760 TableName {
761 catalog_name: table.catalog_name.to_string(),
762 schema_name: table.schema_name.to_string(),
763 table_name: table.table_name.to_string(),
764 }
765 }
766}
767
768impl TryFrom<PbAlterTableTask> for AlterTableTask {
769 type Error = error::Error;
770
771 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
772 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
773 err_msg: "expected alter_table",
774 })?;
775
776 Ok(AlterTableTask { alter_table })
777 }
778}
779
780impl TryFrom<AlterTableTask> for PbAlterTableTask {
781 type Error = error::Error;
782
783 fn try_from(task: AlterTableTask) -> Result<Self> {
784 Ok(PbAlterTableTask {
785 alter_table: Some(task.alter_table),
786 })
787 }
788}
789
790impl Serialize for AlterTableTask {
791 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
792 where
793 S: serde::Serializer,
794 {
795 let pb = PbAlterTableTask {
796 alter_table: Some(self.alter_table.clone()),
797 };
798 let buf = pb.encode_to_vec();
799 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
800 serializer.serialize_str(&encoded)
801 }
802}
803
804impl<'de> Deserialize<'de> for AlterTableTask {
805 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
806 where
807 D: serde::Deserializer<'de>,
808 {
809 let encoded = String::deserialize(deserializer)?;
810 let buf = general_purpose::STANDARD_NO_PAD
811 .decode(encoded)
812 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
813 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
814 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
815
816 let expr = AlterTableTask::try_from(expr)
817 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
818
819 Ok(expr)
820 }
821}
822
823#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
824pub struct TruncateTableTask {
825 pub catalog: String,
826 pub schema: String,
827 pub table: String,
828 pub table_id: TableId,
829}
830
831impl TruncateTableTask {
832 pub fn table_ref(&self) -> TableReference {
833 TableReference {
834 catalog: &self.catalog,
835 schema: &self.schema,
836 table: &self.table,
837 }
838 }
839
840 pub fn table_name(&self) -> TableName {
841 TableName {
842 catalog_name: self.catalog.to_string(),
843 schema_name: self.schema.to_string(),
844 table_name: self.table.to_string(),
845 }
846 }
847}
848
849impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
850 type Error = error::Error;
851
852 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
853 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
854 err_msg: "expected truncate table",
855 })?;
856
857 Ok(Self {
858 catalog: truncate_table.catalog_name,
859 schema: truncate_table.schema_name,
860 table: truncate_table.table_name,
861 table_id: truncate_table
862 .table_id
863 .context(error::InvalidProtoMsgSnafu {
864 err_msg: "expected table_id",
865 })?
866 .id,
867 })
868 }
869}
870
871impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
872 type Error = error::Error;
873
874 fn try_from(task: TruncateTableTask) -> Result<Self> {
875 Ok(PbTruncateTableTask {
876 truncate_table: Some(TruncateTableExpr {
877 catalog_name: task.catalog,
878 schema_name: task.schema,
879 table_name: task.table,
880 table_id: Some(api::v1::TableId { id: task.table_id }),
881 }),
882 })
883 }
884}
885
886#[serde_as]
887#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
888pub struct CreateDatabaseTask {
889 pub catalog: String,
890 pub schema: String,
891 pub create_if_not_exists: bool,
892 #[serde_as(deserialize_as = "DefaultOnNull")]
893 pub options: HashMap<String, String>,
894}
895
896impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
897 type Error = error::Error;
898
899 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
900 let CreateDatabaseExpr {
901 catalog_name,
902 schema_name,
903 create_if_not_exists,
904 options,
905 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
906 err_msg: "expected create database",
907 })?;
908
909 Ok(CreateDatabaseTask {
910 catalog: catalog_name,
911 schema: schema_name,
912 create_if_not_exists,
913 options,
914 })
915 }
916}
917
918impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
919 type Error = error::Error;
920
921 fn try_from(
922 CreateDatabaseTask {
923 catalog,
924 schema,
925 create_if_not_exists,
926 options,
927 }: CreateDatabaseTask,
928 ) -> Result<Self> {
929 Ok(PbCreateDatabaseTask {
930 create_database: Some(CreateDatabaseExpr {
931 catalog_name: catalog,
932 schema_name: schema,
933 create_if_not_exists,
934 options,
935 }),
936 })
937 }
938}
939
940#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
941pub struct DropDatabaseTask {
942 pub catalog: String,
943 pub schema: String,
944 pub drop_if_exists: bool,
945}
946
947impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
948 type Error = error::Error;
949
950 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
951 let DropDatabaseExpr {
952 catalog_name,
953 schema_name,
954 drop_if_exists,
955 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
956 err_msg: "expected drop database",
957 })?;
958
959 Ok(DropDatabaseTask {
960 catalog: catalog_name,
961 schema: schema_name,
962 drop_if_exists,
963 })
964 }
965}
966
967impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
968 type Error = error::Error;
969
970 fn try_from(
971 DropDatabaseTask {
972 catalog,
973 schema,
974 drop_if_exists,
975 }: DropDatabaseTask,
976 ) -> Result<Self> {
977 Ok(PbDropDatabaseTask {
978 drop_database: Some(DropDatabaseExpr {
979 catalog_name: catalog,
980 schema_name: schema,
981 drop_if_exists,
982 }),
983 })
984 }
985}
986
987#[derive(Debug, PartialEq, Clone)]
988pub struct AlterDatabaseTask {
989 pub alter_expr: AlterDatabaseExpr,
990}
991
992impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
993 type Error = error::Error;
994
995 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
996 Ok(PbAlterDatabaseTask {
997 task: Some(task.alter_expr),
998 })
999 }
1000}
1001
1002impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1003 type Error = error::Error;
1004
1005 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1006 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1007 err_msg: "expected alter database",
1008 })?;
1009
1010 Ok(AlterDatabaseTask { alter_expr })
1011 }
1012}
1013
1014impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1015 type Error = error::Error;
1016
1017 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1018 match pb {
1019 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1020 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1021 options
1022 .set_database_options
1023 .into_iter()
1024 .map(SetDatabaseOption::try_from)
1025 .collect::<Result<Vec<_>>>()?,
1026 )))
1027 }
1028 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1029 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1030 options
1031 .keys
1032 .iter()
1033 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1034 .collect::<Result<Vec<_>>>()?,
1035 )),
1036 ),
1037 }
1038 }
1039}
1040
1041const TTL_KEY: &str = "ttl";
1042
1043impl TryFrom<PbOption> for SetDatabaseOption {
1044 type Error = error::Error;
1045
1046 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1047 match key.to_ascii_lowercase().as_str() {
1048 TTL_KEY => {
1049 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1050 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1051
1052 Ok(SetDatabaseOption::Ttl(ttl))
1053 }
1054 _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1055 }
1056 }
1057}
1058
1059#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1060pub enum SetDatabaseOption {
1061 Ttl(DatabaseTimeToLive),
1062}
1063
1064#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1065pub enum UnsetDatabaseOption {
1066 Ttl,
1067}
1068
1069impl TryFrom<&str> for UnsetDatabaseOption {
1070 type Error = error::Error;
1071
1072 fn try_from(key: &str) -> Result<Self> {
1073 match key.to_ascii_lowercase().as_str() {
1074 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1075 _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1076 }
1077 }
1078}
1079
1080#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1081pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1082
1083#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1084pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1085
1086#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1087pub enum AlterDatabaseKind {
1088 SetDatabaseOptions(SetDatabaseOptions),
1089 UnsetDatabaseOptions(UnsetDatabaseOptions),
1090}
1091
1092impl AlterDatabaseTask {
1093 pub fn catalog(&self) -> &str {
1094 &self.alter_expr.catalog_name
1095 }
1096
1097 pub fn schema(&self) -> &str {
1098 &self.alter_expr.catalog_name
1099 }
1100}
1101
1102#[derive(Debug, Clone, Serialize, Deserialize)]
1104pub struct CreateFlowTask {
1105 pub catalog_name: String,
1106 pub flow_name: String,
1107 pub source_table_names: Vec<TableName>,
1108 pub sink_table_name: TableName,
1109 pub or_replace: bool,
1110 pub create_if_not_exists: bool,
1111 pub expire_after: Option<i64>,
1113 pub comment: String,
1114 pub sql: String,
1115 pub flow_options: HashMap<String, String>,
1116}
1117
1118impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1119 type Error = error::Error;
1120
1121 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1122 let CreateFlowExpr {
1123 catalog_name,
1124 flow_name,
1125 source_table_names,
1126 sink_table_name,
1127 or_replace,
1128 create_if_not_exists,
1129 expire_after,
1130 comment,
1131 sql,
1132 flow_options,
1133 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1134 err_msg: "expected create_flow",
1135 })?;
1136
1137 Ok(CreateFlowTask {
1138 catalog_name,
1139 flow_name,
1140 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1141 sink_table_name: sink_table_name
1142 .context(error::InvalidProtoMsgSnafu {
1143 err_msg: "expected sink_table_name",
1144 })?
1145 .into(),
1146 or_replace,
1147 create_if_not_exists,
1148 expire_after: expire_after.map(|e| e.value),
1149 comment,
1150 sql,
1151 flow_options,
1152 })
1153 }
1154}
1155
1156impl From<CreateFlowTask> for PbCreateFlowTask {
1157 fn from(
1158 CreateFlowTask {
1159 catalog_name,
1160 flow_name,
1161 source_table_names,
1162 sink_table_name,
1163 or_replace,
1164 create_if_not_exists,
1165 expire_after,
1166 comment,
1167 sql,
1168 flow_options,
1169 }: CreateFlowTask,
1170 ) -> Self {
1171 PbCreateFlowTask {
1172 create_flow: Some(CreateFlowExpr {
1173 catalog_name,
1174 flow_name,
1175 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1176 sink_table_name: Some(sink_table_name.into()),
1177 or_replace,
1178 create_if_not_exists,
1179 expire_after: expire_after.map(|value| ExpireAfter { value }),
1180 comment,
1181 sql,
1182 flow_options,
1183 }),
1184 }
1185 }
1186}
1187
1188#[derive(Debug, Clone, Serialize, Deserialize)]
1190pub struct DropFlowTask {
1191 pub catalog_name: String,
1192 pub flow_name: String,
1193 pub flow_id: FlowId,
1194 pub drop_if_exists: bool,
1195}
1196
1197impl TryFrom<PbDropFlowTask> for DropFlowTask {
1198 type Error = error::Error;
1199
1200 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1201 let DropFlowExpr {
1202 catalog_name,
1203 flow_name,
1204 flow_id,
1205 drop_if_exists,
1206 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1207 err_msg: "expected drop_flow",
1208 })?;
1209 let flow_id = flow_id
1210 .context(error::InvalidProtoMsgSnafu {
1211 err_msg: "expected flow_id",
1212 })?
1213 .id;
1214 Ok(DropFlowTask {
1215 catalog_name,
1216 flow_name,
1217 flow_id,
1218 drop_if_exists,
1219 })
1220 }
1221}
1222
1223impl From<DropFlowTask> for PbDropFlowTask {
1224 fn from(
1225 DropFlowTask {
1226 catalog_name,
1227 flow_name,
1228 flow_id,
1229 drop_if_exists,
1230 }: DropFlowTask,
1231 ) -> Self {
1232 PbDropFlowTask {
1233 drop_flow: Some(DropFlowExpr {
1234 catalog_name,
1235 flow_name,
1236 flow_id: Some(api::v1::FlowId { id: flow_id }),
1237 drop_if_exists,
1238 }),
1239 }
1240 }
1241}
1242
1243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1244pub struct QueryContext {
1245 current_catalog: String,
1246 current_schema: String,
1247 timezone: String,
1248 extensions: HashMap<String, String>,
1249 channel: u8,
1250}
1251
1252impl From<QueryContextRef> for QueryContext {
1253 fn from(query_context: QueryContextRef) -> Self {
1254 QueryContext {
1255 current_catalog: query_context.current_catalog().to_string(),
1256 current_schema: query_context.current_schema().to_string(),
1257 timezone: query_context.timezone().to_string(),
1258 extensions: query_context.extensions(),
1259 channel: query_context.channel() as u8,
1260 }
1261 }
1262}
1263
1264impl TryFrom<QueryContext> for session::context::QueryContext {
1265 type Error = error::Error;
1266 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1267 Ok(QueryContextBuilder::default()
1268 .current_catalog(value.current_catalog)
1269 .current_schema(value.current_schema)
1270 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1271 .extensions(value.extensions)
1272 .channel((value.channel as u32).into())
1273 .build())
1274 }
1275}
1276
1277impl From<QueryContext> for PbQueryContext {
1278 fn from(
1279 QueryContext {
1280 current_catalog,
1281 current_schema,
1282 timezone,
1283 extensions,
1284 channel,
1285 }: QueryContext,
1286 ) -> Self {
1287 PbQueryContext {
1288 current_catalog,
1289 current_schema,
1290 timezone,
1291 extensions,
1292 channel: channel as u32,
1293 snapshot_seqs: None,
1294 explain: None,
1295 }
1296 }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use std::sync::Arc;
1302
1303 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1304 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1305 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1306 use store_api::storage::ConcreteDataType;
1307 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1308 use table::test_util::table_info::test_table_info;
1309
1310 use super::{AlterTableTask, CreateTableTask};
1311
1312 #[test]
1313 fn test_basic_ser_de_create_table_task() {
1314 let schema = SchemaBuilder::default().build().unwrap();
1315 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1316 let task = CreateTableTask::new(
1317 CreateTableExpr::default(),
1318 Vec::new(),
1319 RawTableInfo::from(table_info),
1320 );
1321
1322 let output = serde_json::to_vec(&task).unwrap();
1323
1324 let de = serde_json::from_slice(&output).unwrap();
1325 assert_eq!(task, de);
1326 }
1327
1328 #[test]
1329 fn test_basic_ser_de_alter_table_task() {
1330 let task = AlterTableTask {
1331 alter_table: AlterTableExpr::default(),
1332 };
1333
1334 let output = serde_json::to_vec(&task).unwrap();
1335
1336 let de = serde_json::from_slice(&output).unwrap();
1337 assert_eq!(task, de);
1338 }
1339
1340 #[test]
1341 fn test_sort_columns() {
1342 let raw_schema = RawSchema {
1344 column_schemas: vec![
1345 ColumnSchema::new(
1346 "column3".to_string(),
1347 ConcreteDataType::string_datatype(),
1348 true,
1349 ),
1350 ColumnSchema::new(
1351 "column1".to_string(),
1352 ConcreteDataType::timestamp_millisecond_datatype(),
1353 false,
1354 )
1355 .with_time_index(true),
1356 ColumnSchema::new(
1357 "column2".to_string(),
1358 ConcreteDataType::float64_datatype(),
1359 true,
1360 ),
1361 ],
1362 timestamp_index: Some(1),
1363 version: 0,
1364 };
1365
1366 let raw_table_meta = RawTableMeta {
1368 schema: raw_schema,
1369 primary_key_indices: vec![0],
1370 value_indices: vec![2],
1371 engine: METRIC_ENGINE_NAME.to_string(),
1372 next_column_id: 0,
1373 region_numbers: vec![0],
1374 options: Default::default(),
1375 created_on: Default::default(),
1376 partition_key_indices: Default::default(),
1377 };
1378
1379 let raw_table_info = RawTableInfo {
1381 ident: Default::default(),
1382 meta: raw_table_meta,
1383 name: Default::default(),
1384 desc: Default::default(),
1385 catalog_name: Default::default(),
1386 schema_name: Default::default(),
1387 table_type: TableType::Base,
1388 };
1389
1390 let create_table_expr = CreateTableExpr {
1392 column_defs: vec![
1393 ColumnDef {
1394 name: "column3".to_string(),
1395 semantic_type: SemanticType::Tag as i32,
1396 ..Default::default()
1397 },
1398 ColumnDef {
1399 name: "column1".to_string(),
1400 semantic_type: SemanticType::Timestamp as i32,
1401 ..Default::default()
1402 },
1403 ColumnDef {
1404 name: "column2".to_string(),
1405 semantic_type: SemanticType::Field as i32,
1406 ..Default::default()
1407 },
1408 ],
1409 primary_keys: vec!["column3".to_string()],
1410 ..Default::default()
1411 };
1412
1413 let mut create_table_task =
1414 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1415
1416 create_table_task.sort_columns();
1418
1419 assert_eq!(
1421 create_table_task.create_table.column_defs[0].name,
1422 "column1".to_string()
1423 );
1424 assert_eq!(
1425 create_table_task.create_table.column_defs[1].name,
1426 "column2".to_string()
1427 );
1428 assert_eq!(
1429 create_table_task.create_table.column_defs[2].name,
1430 "column3".to_string()
1431 );
1432
1433 assert_eq!(
1435 create_table_task.table_info.meta.schema.timestamp_index,
1436 Some(0)
1437 );
1438 assert_eq!(
1439 create_table_task.table_info.meta.primary_key_indices,
1440 vec![2]
1441 );
1442 assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
1443 }
1444}