1use std::collections::{HashMap, HashSet};
16use std::result;
17
18use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
19use api::v1::meta::ddl_task_request::Task;
20use api::v1::meta::{
21 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
22 AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
23 CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
24 CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
25 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
26 DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
27 DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
28 DropViewTask as PbDropViewTask, Partition, ProcedureId,
29 TruncateTableTask as PbTruncateTableTask,
30};
31use api::v1::{
32 AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
33 CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
34 Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
35};
36use base64::engine::general_purpose;
37use base64::Engine as _;
38use common_time::{DatabaseTimeToLive, Timezone};
39use prost::Message;
40use serde::{Deserialize, Serialize};
41use serde_with::{serde_as, DefaultOnNull};
42use session::context::{QueryContextBuilder, QueryContextRef};
43use snafu::{OptionExt, ResultExt};
44use table::metadata::{RawTableInfo, TableId};
45use table::table_name::TableName;
46use table::table_reference::TableReference;
47
48use crate::error::{
49 self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
50 Result,
51};
52use crate::key::FlowId;
53
54#[derive(Debug, Clone)]
56pub enum DdlTask {
57 CreateTable(CreateTableTask),
58 DropTable(DropTableTask),
59 AlterTable(AlterTableTask),
60 TruncateTable(TruncateTableTask),
61 CreateLogicalTables(Vec<CreateTableTask>),
62 DropLogicalTables(Vec<DropTableTask>),
63 AlterLogicalTables(Vec<AlterTableTask>),
64 CreateDatabase(CreateDatabaseTask),
65 DropDatabase(DropDatabaseTask),
66 AlterDatabase(AlterDatabaseTask),
67 CreateFlow(CreateFlowTask),
68 DropFlow(DropFlowTask),
69 CreateView(CreateViewTask),
70 DropView(DropViewTask),
71}
72
73impl DdlTask {
74 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
76 DdlTask::CreateFlow(expr)
77 }
78
79 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
81 DdlTask::DropFlow(expr)
82 }
83
84 pub fn new_drop_view(expr: DropViewTask) -> Self {
86 DdlTask::DropView(expr)
87 }
88
89 pub fn new_create_table(
91 expr: CreateTableExpr,
92 partitions: Vec<Partition>,
93 table_info: RawTableInfo,
94 ) -> Self {
95 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
96 }
97
98 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
100 DdlTask::CreateLogicalTables(
101 table_data
102 .into_iter()
103 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
104 .collect(),
105 )
106 }
107
108 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
110 DdlTask::AlterLogicalTables(
111 table_data
112 .into_iter()
113 .map(|alter_table| AlterTableTask { alter_table })
114 .collect(),
115 )
116 }
117
118 pub fn new_drop_table(
120 catalog: String,
121 schema: String,
122 table: String,
123 table_id: TableId,
124 drop_if_exists: bool,
125 ) -> Self {
126 DdlTask::DropTable(DropTableTask {
127 catalog,
128 schema,
129 table,
130 table_id,
131 drop_if_exists,
132 })
133 }
134
135 pub fn new_create_database(
137 catalog: String,
138 schema: String,
139 create_if_not_exists: bool,
140 options: HashMap<String, String>,
141 ) -> Self {
142 DdlTask::CreateDatabase(CreateDatabaseTask {
143 catalog,
144 schema,
145 create_if_not_exists,
146 options,
147 })
148 }
149
150 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
152 DdlTask::DropDatabase(DropDatabaseTask {
153 catalog,
154 schema,
155 drop_if_exists,
156 })
157 }
158
159 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
161 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
162 }
163
164 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
166 DdlTask::AlterTable(AlterTableTask { alter_table })
167 }
168
169 pub fn new_truncate_table(
171 catalog: String,
172 schema: String,
173 table: String,
174 table_id: TableId,
175 ) -> Self {
176 DdlTask::TruncateTable(TruncateTableTask {
177 catalog,
178 schema,
179 table,
180 table_id,
181 })
182 }
183
184 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
186 DdlTask::CreateView(CreateViewTask {
187 create_view,
188 view_info,
189 })
190 }
191}
192
193impl TryFrom<Task> for DdlTask {
194 type Error = error::Error;
195 fn try_from(task: Task) -> Result<Self> {
196 match task {
197 Task::CreateTableTask(create_table) => {
198 Ok(DdlTask::CreateTable(create_table.try_into()?))
199 }
200 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
201 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
202 Task::TruncateTableTask(truncate_table) => {
203 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
204 }
205 Task::CreateTableTasks(create_tables) => {
206 let tasks = create_tables
207 .tasks
208 .into_iter()
209 .map(|task| task.try_into())
210 .collect::<Result<Vec<_>>>()?;
211
212 Ok(DdlTask::CreateLogicalTables(tasks))
213 }
214 Task::DropTableTasks(drop_tables) => {
215 let tasks = drop_tables
216 .tasks
217 .into_iter()
218 .map(|task| task.try_into())
219 .collect::<Result<Vec<_>>>()?;
220
221 Ok(DdlTask::DropLogicalTables(tasks))
222 }
223 Task::AlterTableTasks(alter_tables) => {
224 let tasks = alter_tables
225 .tasks
226 .into_iter()
227 .map(|task| task.try_into())
228 .collect::<Result<Vec<_>>>()?;
229
230 Ok(DdlTask::AlterLogicalTables(tasks))
231 }
232 Task::CreateDatabaseTask(create_database) => {
233 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
234 }
235 Task::DropDatabaseTask(drop_database) => {
236 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
237 }
238 Task::AlterDatabaseTask(alter_database) => {
239 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
240 }
241 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
242 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
243 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
244 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
245 }
246 }
247}
248
249#[derive(Clone)]
250pub struct SubmitDdlTaskRequest {
251 pub query_context: QueryContextRef,
252 pub task: DdlTask,
253}
254
255impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
256 type Error = error::Error;
257
258 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
259 let task = match request.task {
260 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
261 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
262 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
263 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
264 DdlTask::CreateLogicalTables(tasks) => {
265 let tasks = tasks
266 .into_iter()
267 .map(|task| task.try_into())
268 .collect::<Result<Vec<_>>>()?;
269
270 Task::CreateTableTasks(PbCreateTableTasks { tasks })
271 }
272 DdlTask::DropLogicalTables(tasks) => {
273 let tasks = tasks
274 .into_iter()
275 .map(|task| task.into())
276 .collect::<Vec<_>>();
277
278 Task::DropTableTasks(PbDropTableTasks { tasks })
279 }
280 DdlTask::AlterLogicalTables(tasks) => {
281 let tasks = tasks
282 .into_iter()
283 .map(|task| task.try_into())
284 .collect::<Result<Vec<_>>>()?;
285
286 Task::AlterTableTasks(PbAlterTableTasks { tasks })
287 }
288 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
289 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
290 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
291 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
292 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
293 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
294 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
295 };
296
297 Ok(Self {
298 header: None,
299 query_context: Some((*request.query_context).clone().into()),
300 task: Some(task),
301 })
302 }
303}
304
305#[derive(Debug, Default)]
306pub struct SubmitDdlTaskResponse {
307 pub key: Vec<u8>,
308 pub table_ids: Vec<TableId>,
310}
311
312impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
313 type Error = error::Error;
314
315 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
316 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
317 Ok(Self {
318 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
319 table_ids,
320 })
321 }
322}
323
324impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
325 fn from(val: SubmitDdlTaskResponse) -> Self {
326 Self {
327 pid: Some(ProcedureId { key: val.key }),
328 table_ids: val
329 .table_ids
330 .into_iter()
331 .map(|id| api::v1::TableId { id })
332 .collect(),
333 ..Default::default()
334 }
335 }
336}
337
338#[derive(Debug, PartialEq, Clone)]
340pub struct CreateViewTask {
341 pub create_view: CreateViewExpr,
342 pub view_info: RawTableInfo,
343}
344
345impl CreateViewTask {
346 pub fn table_ref(&self) -> TableReference {
348 TableReference {
349 catalog: &self.create_view.catalog_name,
350 schema: &self.create_view.schema_name,
351 table: &self.create_view.view_name,
352 }
353 }
354
355 pub fn raw_logical_plan(&self) -> &Vec<u8> {
357 &self.create_view.logical_plan
358 }
359
360 pub fn view_definition(&self) -> &str {
362 &self.create_view.definition
363 }
364
365 pub fn table_names(&self) -> HashSet<TableName> {
367 self.create_view
368 .table_names
369 .iter()
370 .map(|t| t.clone().into())
371 .collect()
372 }
373
374 pub fn columns(&self) -> &Vec<String> {
376 &self.create_view.columns
377 }
378
379 pub fn plan_columns(&self) -> &Vec<String> {
381 &self.create_view.plan_columns
382 }
383}
384
385impl TryFrom<PbCreateViewTask> for CreateViewTask {
386 type Error = error::Error;
387
388 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
389 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
390
391 Ok(CreateViewTask {
392 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
393 err_msg: "expected create view",
394 })?,
395 view_info,
396 })
397 }
398}
399
400impl TryFrom<CreateViewTask> for PbCreateViewTask {
401 type Error = error::Error;
402
403 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
404 Ok(PbCreateViewTask {
405 create_view: Some(task.create_view),
406 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
407 })
408 }
409}
410
411impl Serialize for CreateViewTask {
412 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
413 where
414 S: serde::Serializer,
415 {
416 let view_info = serde_json::to_vec(&self.view_info)
417 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
418
419 let pb = PbCreateViewTask {
420 create_view: Some(self.create_view.clone()),
421 view_info,
422 };
423 let buf = pb.encode_to_vec();
424 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
425 serializer.serialize_str(&encoded)
426 }
427}
428
429impl<'de> Deserialize<'de> for CreateViewTask {
430 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
431 where
432 D: serde::Deserializer<'de>,
433 {
434 let encoded = String::deserialize(deserializer)?;
435 let buf = general_purpose::STANDARD_NO_PAD
436 .decode(encoded)
437 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
438 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
439 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
440
441 let expr = CreateViewTask::try_from(expr)
442 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
443
444 Ok(expr)
445 }
446}
447
448#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
450pub struct DropViewTask {
451 pub catalog: String,
452 pub schema: String,
453 pub view: String,
454 pub view_id: TableId,
455 pub drop_if_exists: bool,
456}
457
458impl DropViewTask {
459 pub fn table_ref(&self) -> TableReference {
461 TableReference {
462 catalog: &self.catalog,
463 schema: &self.schema,
464 table: &self.view,
465 }
466 }
467}
468
469impl TryFrom<PbDropViewTask> for DropViewTask {
470 type Error = error::Error;
471
472 fn try_from(pb: PbDropViewTask) -> Result<Self> {
473 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
474 err_msg: "expected drop view",
475 })?;
476
477 Ok(DropViewTask {
478 catalog: expr.catalog_name,
479 schema: expr.schema_name,
480 view: expr.view_name,
481 view_id: expr
482 .view_id
483 .context(error::InvalidProtoMsgSnafu {
484 err_msg: "expected view_id",
485 })?
486 .id,
487 drop_if_exists: expr.drop_if_exists,
488 })
489 }
490}
491
492impl From<DropViewTask> for PbDropViewTask {
493 fn from(task: DropViewTask) -> Self {
494 PbDropViewTask {
495 drop_view: Some(DropViewExpr {
496 catalog_name: task.catalog,
497 schema_name: task.schema,
498 view_name: task.view,
499 view_id: Some(api::v1::TableId { id: task.view_id }),
500 drop_if_exists: task.drop_if_exists,
501 }),
502 }
503 }
504}
505
506#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
507pub struct DropTableTask {
508 pub catalog: String,
509 pub schema: String,
510 pub table: String,
511 pub table_id: TableId,
512 #[serde(default)]
513 pub drop_if_exists: bool,
514}
515
516impl DropTableTask {
517 pub fn table_ref(&self) -> TableReference {
518 TableReference {
519 catalog: &self.catalog,
520 schema: &self.schema,
521 table: &self.table,
522 }
523 }
524
525 pub fn table_name(&self) -> TableName {
526 TableName {
527 catalog_name: self.catalog.to_string(),
528 schema_name: self.schema.to_string(),
529 table_name: self.table.to_string(),
530 }
531 }
532}
533
534impl TryFrom<PbDropTableTask> for DropTableTask {
535 type Error = error::Error;
536
537 fn try_from(pb: PbDropTableTask) -> Result<Self> {
538 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
539 err_msg: "expected drop table",
540 })?;
541
542 Ok(Self {
543 catalog: drop_table.catalog_name,
544 schema: drop_table.schema_name,
545 table: drop_table.table_name,
546 table_id: drop_table
547 .table_id
548 .context(error::InvalidProtoMsgSnafu {
549 err_msg: "expected table_id",
550 })?
551 .id,
552 drop_if_exists: drop_table.drop_if_exists,
553 })
554 }
555}
556
557impl From<DropTableTask> for PbDropTableTask {
558 fn from(task: DropTableTask) -> Self {
559 PbDropTableTask {
560 drop_table: Some(DropTableExpr {
561 catalog_name: task.catalog,
562 schema_name: task.schema,
563 table_name: task.table,
564 table_id: Some(api::v1::TableId { id: task.table_id }),
565 drop_if_exists: task.drop_if_exists,
566 }),
567 }
568 }
569}
570
571#[derive(Debug, PartialEq, Clone)]
572pub struct CreateTableTask {
573 pub create_table: CreateTableExpr,
574 pub partitions: Vec<Partition>,
575 pub table_info: RawTableInfo,
576}
577
578impl TryFrom<PbCreateTableTask> for CreateTableTask {
579 type Error = error::Error;
580
581 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
582 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
583
584 Ok(CreateTableTask::new(
585 pb.create_table.context(error::InvalidProtoMsgSnafu {
586 err_msg: "expected create table",
587 })?,
588 pb.partitions,
589 table_info,
590 ))
591 }
592}
593
594impl TryFrom<CreateTableTask> for PbCreateTableTask {
595 type Error = error::Error;
596
597 fn try_from(task: CreateTableTask) -> Result<Self> {
598 Ok(PbCreateTableTask {
599 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
600 create_table: Some(task.create_table),
601 partitions: task.partitions,
602 })
603 }
604}
605
606impl CreateTableTask {
607 pub fn new(
608 expr: CreateTableExpr,
609 partitions: Vec<Partition>,
610 table_info: RawTableInfo,
611 ) -> CreateTableTask {
612 CreateTableTask {
613 create_table: expr,
614 partitions,
615 table_info,
616 }
617 }
618
619 pub fn table_name(&self) -> TableName {
620 let table = &self.create_table;
621
622 TableName {
623 catalog_name: table.catalog_name.to_string(),
624 schema_name: table.schema_name.to_string(),
625 table_name: table.table_name.to_string(),
626 }
627 }
628
629 pub fn table_ref(&self) -> TableReference {
630 let table = &self.create_table;
631
632 TableReference {
633 catalog: &table.catalog_name,
634 schema: &table.schema_name,
635 table: &table.table_name,
636 }
637 }
638
639 pub fn set_table_id(&mut self, table_id: TableId) {
641 self.table_info.ident.table_id = table_id;
642 }
643
644 pub fn sort_columns(&mut self) {
649 self.create_table
652 .column_defs
653 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
654
655 self.table_info.sort_columns();
656 }
657}
658
659impl Serialize for CreateTableTask {
660 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
661 where
662 S: serde::Serializer,
663 {
664 let table_info = serde_json::to_vec(&self.table_info)
665 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
666
667 let pb = PbCreateTableTask {
668 create_table: Some(self.create_table.clone()),
669 partitions: self.partitions.clone(),
670 table_info,
671 };
672 let buf = pb.encode_to_vec();
673 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
674 serializer.serialize_str(&encoded)
675 }
676}
677
678impl<'de> Deserialize<'de> for CreateTableTask {
679 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
680 where
681 D: serde::Deserializer<'de>,
682 {
683 let encoded = String::deserialize(deserializer)?;
684 let buf = general_purpose::STANDARD_NO_PAD
685 .decode(encoded)
686 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
687 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
688 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
689
690 let expr = CreateTableTask::try_from(expr)
691 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
692
693 Ok(expr)
694 }
695}
696
697#[derive(Debug, PartialEq, Clone)]
698pub struct AlterTableTask {
699 pub alter_table: AlterTableExpr,
701}
702
703impl AlterTableTask {
704 pub fn validate(&self) -> Result<()> {
705 self.alter_table
706 .kind
707 .as_ref()
708 .context(error::UnexpectedSnafu {
709 err_msg: "'kind' is absent",
710 })?;
711 Ok(())
712 }
713
714 pub fn table_ref(&self) -> TableReference {
715 TableReference {
716 catalog: &self.alter_table.catalog_name,
717 schema: &self.alter_table.schema_name,
718 table: &self.alter_table.table_name,
719 }
720 }
721
722 pub fn table_name(&self) -> TableName {
723 let table = &self.alter_table;
724
725 TableName {
726 catalog_name: table.catalog_name.to_string(),
727 schema_name: table.schema_name.to_string(),
728 table_name: table.table_name.to_string(),
729 }
730 }
731}
732
733impl TryFrom<PbAlterTableTask> for AlterTableTask {
734 type Error = error::Error;
735
736 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
737 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
738 err_msg: "expected alter_table",
739 })?;
740
741 Ok(AlterTableTask { alter_table })
742 }
743}
744
745impl TryFrom<AlterTableTask> for PbAlterTableTask {
746 type Error = error::Error;
747
748 fn try_from(task: AlterTableTask) -> Result<Self> {
749 Ok(PbAlterTableTask {
750 alter_table: Some(task.alter_table),
751 })
752 }
753}
754
755impl Serialize for AlterTableTask {
756 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
757 where
758 S: serde::Serializer,
759 {
760 let pb = PbAlterTableTask {
761 alter_table: Some(self.alter_table.clone()),
762 };
763 let buf = pb.encode_to_vec();
764 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
765 serializer.serialize_str(&encoded)
766 }
767}
768
769impl<'de> Deserialize<'de> for AlterTableTask {
770 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
771 where
772 D: serde::Deserializer<'de>,
773 {
774 let encoded = String::deserialize(deserializer)?;
775 let buf = general_purpose::STANDARD_NO_PAD
776 .decode(encoded)
777 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
778 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
779 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
780
781 let expr = AlterTableTask::try_from(expr)
782 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
783
784 Ok(expr)
785 }
786}
787
788#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
789pub struct TruncateTableTask {
790 pub catalog: String,
791 pub schema: String,
792 pub table: String,
793 pub table_id: TableId,
794}
795
796impl TruncateTableTask {
797 pub fn table_ref(&self) -> TableReference {
798 TableReference {
799 catalog: &self.catalog,
800 schema: &self.schema,
801 table: &self.table,
802 }
803 }
804
805 pub fn table_name(&self) -> TableName {
806 TableName {
807 catalog_name: self.catalog.to_string(),
808 schema_name: self.schema.to_string(),
809 table_name: self.table.to_string(),
810 }
811 }
812}
813
814impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
815 type Error = error::Error;
816
817 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
818 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
819 err_msg: "expected truncate table",
820 })?;
821
822 Ok(Self {
823 catalog: truncate_table.catalog_name,
824 schema: truncate_table.schema_name,
825 table: truncate_table.table_name,
826 table_id: truncate_table
827 .table_id
828 .context(error::InvalidProtoMsgSnafu {
829 err_msg: "expected table_id",
830 })?
831 .id,
832 })
833 }
834}
835
836impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
837 type Error = error::Error;
838
839 fn try_from(task: TruncateTableTask) -> Result<Self> {
840 Ok(PbTruncateTableTask {
841 truncate_table: Some(TruncateTableExpr {
842 catalog_name: task.catalog,
843 schema_name: task.schema,
844 table_name: task.table,
845 table_id: Some(api::v1::TableId { id: task.table_id }),
846 }),
847 })
848 }
849}
850
851#[serde_as]
852#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
853pub struct CreateDatabaseTask {
854 pub catalog: String,
855 pub schema: String,
856 pub create_if_not_exists: bool,
857 #[serde_as(deserialize_as = "DefaultOnNull")]
858 pub options: HashMap<String, String>,
859}
860
861impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
862 type Error = error::Error;
863
864 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
865 let CreateDatabaseExpr {
866 catalog_name,
867 schema_name,
868 create_if_not_exists,
869 options,
870 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
871 err_msg: "expected create database",
872 })?;
873
874 Ok(CreateDatabaseTask {
875 catalog: catalog_name,
876 schema: schema_name,
877 create_if_not_exists,
878 options,
879 })
880 }
881}
882
883impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
884 type Error = error::Error;
885
886 fn try_from(
887 CreateDatabaseTask {
888 catalog,
889 schema,
890 create_if_not_exists,
891 options,
892 }: CreateDatabaseTask,
893 ) -> Result<Self> {
894 Ok(PbCreateDatabaseTask {
895 create_database: Some(CreateDatabaseExpr {
896 catalog_name: catalog,
897 schema_name: schema,
898 create_if_not_exists,
899 options,
900 }),
901 })
902 }
903}
904
905#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
906pub struct DropDatabaseTask {
907 pub catalog: String,
908 pub schema: String,
909 pub drop_if_exists: bool,
910}
911
912impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
913 type Error = error::Error;
914
915 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
916 let DropDatabaseExpr {
917 catalog_name,
918 schema_name,
919 drop_if_exists,
920 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
921 err_msg: "expected drop database",
922 })?;
923
924 Ok(DropDatabaseTask {
925 catalog: catalog_name,
926 schema: schema_name,
927 drop_if_exists,
928 })
929 }
930}
931
932impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
933 type Error = error::Error;
934
935 fn try_from(
936 DropDatabaseTask {
937 catalog,
938 schema,
939 drop_if_exists,
940 }: DropDatabaseTask,
941 ) -> Result<Self> {
942 Ok(PbDropDatabaseTask {
943 drop_database: Some(DropDatabaseExpr {
944 catalog_name: catalog,
945 schema_name: schema,
946 drop_if_exists,
947 }),
948 })
949 }
950}
951
952#[derive(Debug, PartialEq, Clone)]
953pub struct AlterDatabaseTask {
954 pub alter_expr: AlterDatabaseExpr,
955}
956
957impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
958 type Error = error::Error;
959
960 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
961 Ok(PbAlterDatabaseTask {
962 task: Some(task.alter_expr),
963 })
964 }
965}
966
967impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
968 type Error = error::Error;
969
970 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
971 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
972 err_msg: "expected alter database",
973 })?;
974
975 Ok(AlterDatabaseTask { alter_expr })
976 }
977}
978
979impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
980 type Error = error::Error;
981
982 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
983 match pb {
984 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
985 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
986 options
987 .set_database_options
988 .into_iter()
989 .map(SetDatabaseOption::try_from)
990 .collect::<Result<Vec<_>>>()?,
991 )))
992 }
993 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
994 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
995 options
996 .keys
997 .iter()
998 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
999 .collect::<Result<Vec<_>>>()?,
1000 )),
1001 ),
1002 }
1003 }
1004}
1005
1006const TTL_KEY: &str = "ttl";
1007
1008impl TryFrom<PbOption> for SetDatabaseOption {
1009 type Error = error::Error;
1010
1011 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1012 match key.to_ascii_lowercase().as_str() {
1013 TTL_KEY => {
1014 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1015 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1016
1017 Ok(SetDatabaseOption::Ttl(ttl))
1018 }
1019 _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1020 }
1021 }
1022}
1023
1024#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1025pub enum SetDatabaseOption {
1026 Ttl(DatabaseTimeToLive),
1027}
1028
1029#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1030pub enum UnsetDatabaseOption {
1031 Ttl,
1032}
1033
1034impl TryFrom<&str> for UnsetDatabaseOption {
1035 type Error = error::Error;
1036
1037 fn try_from(key: &str) -> Result<Self> {
1038 match key.to_ascii_lowercase().as_str() {
1039 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1040 _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1041 }
1042 }
1043}
1044
1045#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1046pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1047
1048#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1049pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1050
1051#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1052pub enum AlterDatabaseKind {
1053 SetDatabaseOptions(SetDatabaseOptions),
1054 UnsetDatabaseOptions(UnsetDatabaseOptions),
1055}
1056
1057impl AlterDatabaseTask {
1058 pub fn catalog(&self) -> &str {
1059 &self.alter_expr.catalog_name
1060 }
1061
1062 pub fn schema(&self) -> &str {
1063 &self.alter_expr.catalog_name
1064 }
1065}
1066
1067#[derive(Debug, Clone, Serialize, Deserialize)]
1069pub struct CreateFlowTask {
1070 pub catalog_name: String,
1071 pub flow_name: String,
1072 pub source_table_names: Vec<TableName>,
1073 pub sink_table_name: TableName,
1074 pub or_replace: bool,
1075 pub create_if_not_exists: bool,
1076 pub expire_after: Option<i64>,
1078 pub comment: String,
1079 pub sql: String,
1080 pub flow_options: HashMap<String, String>,
1081}
1082
1083impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1084 type Error = error::Error;
1085
1086 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1087 let CreateFlowExpr {
1088 catalog_name,
1089 flow_name,
1090 source_table_names,
1091 sink_table_name,
1092 or_replace,
1093 create_if_not_exists,
1094 expire_after,
1095 comment,
1096 sql,
1097 flow_options,
1098 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1099 err_msg: "expected create_flow",
1100 })?;
1101
1102 Ok(CreateFlowTask {
1103 catalog_name,
1104 flow_name,
1105 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1106 sink_table_name: sink_table_name
1107 .context(error::InvalidProtoMsgSnafu {
1108 err_msg: "expected sink_table_name",
1109 })?
1110 .into(),
1111 or_replace,
1112 create_if_not_exists,
1113 expire_after: expire_after.map(|e| e.value),
1114 comment,
1115 sql,
1116 flow_options,
1117 })
1118 }
1119}
1120
1121impl From<CreateFlowTask> for PbCreateFlowTask {
1122 fn from(
1123 CreateFlowTask {
1124 catalog_name,
1125 flow_name,
1126 source_table_names,
1127 sink_table_name,
1128 or_replace,
1129 create_if_not_exists,
1130 expire_after,
1131 comment,
1132 sql,
1133 flow_options,
1134 }: CreateFlowTask,
1135 ) -> Self {
1136 PbCreateFlowTask {
1137 create_flow: Some(CreateFlowExpr {
1138 catalog_name,
1139 flow_name,
1140 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1141 sink_table_name: Some(sink_table_name.into()),
1142 or_replace,
1143 create_if_not_exists,
1144 expire_after: expire_after.map(|value| ExpireAfter { value }),
1145 comment,
1146 sql,
1147 flow_options,
1148 }),
1149 }
1150 }
1151}
1152
1153#[derive(Debug, Clone, Serialize, Deserialize)]
1155pub struct DropFlowTask {
1156 pub catalog_name: String,
1157 pub flow_name: String,
1158 pub flow_id: FlowId,
1159 pub drop_if_exists: bool,
1160}
1161
1162impl TryFrom<PbDropFlowTask> for DropFlowTask {
1163 type Error = error::Error;
1164
1165 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1166 let DropFlowExpr {
1167 catalog_name,
1168 flow_name,
1169 flow_id,
1170 drop_if_exists,
1171 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1172 err_msg: "expected drop_flow",
1173 })?;
1174 let flow_id = flow_id
1175 .context(error::InvalidProtoMsgSnafu {
1176 err_msg: "expected flow_id",
1177 })?
1178 .id;
1179 Ok(DropFlowTask {
1180 catalog_name,
1181 flow_name,
1182 flow_id,
1183 drop_if_exists,
1184 })
1185 }
1186}
1187
1188impl From<DropFlowTask> for PbDropFlowTask {
1189 fn from(
1190 DropFlowTask {
1191 catalog_name,
1192 flow_name,
1193 flow_id,
1194 drop_if_exists,
1195 }: DropFlowTask,
1196 ) -> Self {
1197 PbDropFlowTask {
1198 drop_flow: Some(DropFlowExpr {
1199 catalog_name,
1200 flow_name,
1201 flow_id: Some(api::v1::FlowId { id: flow_id }),
1202 drop_if_exists,
1203 }),
1204 }
1205 }
1206}
1207
1208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1209pub struct QueryContext {
1210 current_catalog: String,
1211 current_schema: String,
1212 timezone: String,
1213 extensions: HashMap<String, String>,
1214 channel: u8,
1215}
1216
1217impl From<QueryContextRef> for QueryContext {
1218 fn from(query_context: QueryContextRef) -> Self {
1219 QueryContext {
1220 current_catalog: query_context.current_catalog().to_string(),
1221 current_schema: query_context.current_schema().to_string(),
1222 timezone: query_context.timezone().to_string(),
1223 extensions: query_context.extensions(),
1224 channel: query_context.channel() as u8,
1225 }
1226 }
1227}
1228
1229impl TryFrom<QueryContext> for session::context::QueryContext {
1230 type Error = error::Error;
1231 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1232 Ok(QueryContextBuilder::default()
1233 .current_catalog(value.current_catalog)
1234 .current_schema(value.current_schema)
1235 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1236 .extensions(value.extensions)
1237 .channel((value.channel as u32).into())
1238 .build())
1239 }
1240}
1241
1242impl From<QueryContext> for PbQueryContext {
1243 fn from(
1244 QueryContext {
1245 current_catalog,
1246 current_schema,
1247 timezone,
1248 extensions,
1249 channel,
1250 }: QueryContext,
1251 ) -> Self {
1252 PbQueryContext {
1253 current_catalog,
1254 current_schema,
1255 timezone,
1256 extensions,
1257 channel: channel as u32,
1258 snapshot_seqs: None,
1259 explain: None,
1260 }
1261 }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266 use std::sync::Arc;
1267
1268 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1269 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1270 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1271 use store_api::storage::ConcreteDataType;
1272 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1273 use table::test_util::table_info::test_table_info;
1274
1275 use super::{AlterTableTask, CreateTableTask};
1276
1277 #[test]
1278 fn test_basic_ser_de_create_table_task() {
1279 let schema = SchemaBuilder::default().build().unwrap();
1280 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1281 let task = CreateTableTask::new(
1282 CreateTableExpr::default(),
1283 Vec::new(),
1284 RawTableInfo::from(table_info),
1285 );
1286
1287 let output = serde_json::to_vec(&task).unwrap();
1288
1289 let de = serde_json::from_slice(&output).unwrap();
1290 assert_eq!(task, de);
1291 }
1292
1293 #[test]
1294 fn test_basic_ser_de_alter_table_task() {
1295 let task = AlterTableTask {
1296 alter_table: AlterTableExpr::default(),
1297 };
1298
1299 let output = serde_json::to_vec(&task).unwrap();
1300
1301 let de = serde_json::from_slice(&output).unwrap();
1302 assert_eq!(task, de);
1303 }
1304
1305 #[test]
1306 fn test_sort_columns() {
1307 let raw_schema = RawSchema {
1309 column_schemas: vec![
1310 ColumnSchema::new(
1311 "column3".to_string(),
1312 ConcreteDataType::string_datatype(),
1313 true,
1314 ),
1315 ColumnSchema::new(
1316 "column1".to_string(),
1317 ConcreteDataType::timestamp_millisecond_datatype(),
1318 false,
1319 )
1320 .with_time_index(true),
1321 ColumnSchema::new(
1322 "column2".to_string(),
1323 ConcreteDataType::float64_datatype(),
1324 true,
1325 ),
1326 ],
1327 timestamp_index: Some(1),
1328 version: 0,
1329 };
1330
1331 let raw_table_meta = RawTableMeta {
1333 schema: raw_schema,
1334 primary_key_indices: vec![0],
1335 value_indices: vec![2],
1336 engine: METRIC_ENGINE_NAME.to_string(),
1337 next_column_id: 0,
1338 region_numbers: vec![0],
1339 options: Default::default(),
1340 created_on: Default::default(),
1341 partition_key_indices: Default::default(),
1342 };
1343
1344 let raw_table_info = RawTableInfo {
1346 ident: Default::default(),
1347 meta: raw_table_meta,
1348 name: Default::default(),
1349 desc: Default::default(),
1350 catalog_name: Default::default(),
1351 schema_name: Default::default(),
1352 table_type: TableType::Base,
1353 };
1354
1355 let create_table_expr = CreateTableExpr {
1357 column_defs: vec![
1358 ColumnDef {
1359 name: "column3".to_string(),
1360 semantic_type: SemanticType::Tag as i32,
1361 ..Default::default()
1362 },
1363 ColumnDef {
1364 name: "column1".to_string(),
1365 semantic_type: SemanticType::Timestamp as i32,
1366 ..Default::default()
1367 },
1368 ColumnDef {
1369 name: "column2".to_string(),
1370 semantic_type: SemanticType::Field as i32,
1371 ..Default::default()
1372 },
1373 ],
1374 primary_keys: vec!["column3".to_string()],
1375 ..Default::default()
1376 };
1377
1378 let mut create_table_task =
1379 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1380
1381 create_table_task.sort_columns();
1383
1384 assert_eq!(
1386 create_table_task.create_table.column_defs[0].name,
1387 "column1".to_string()
1388 );
1389 assert_eq!(
1390 create_table_task.create_table.column_defs[1].name,
1391 "column2".to_string()
1392 );
1393 assert_eq!(
1394 create_table_task.create_table.column_defs[2].name,
1395 "column3".to_string()
1396 );
1397
1398 assert_eq!(
1400 create_table_task.table_info.meta.schema.timestamp_index,
1401 Some(0)
1402 );
1403 assert_eq!(
1404 create_table_task.table_info.meta.primary_key_indices,
1405 vec![2]
1406 );
1407 assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
1408 }
1409}