1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27 AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28 CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29 CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30 CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31 DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32 DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33 DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34 TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37 AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38 CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39 DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40 QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{TableId, TableInfo};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57 InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61#[derive(Debug, Clone)]
63pub enum DdlTask {
64 CreateTable(CreateTableTask),
65 DropTable(DropTableTask),
66 AlterTable(AlterTableTask),
67 TruncateTable(TruncateTableTask),
68 CreateLogicalTables(Vec<CreateTableTask>),
69 DropLogicalTables(Vec<DropTableTask>),
70 AlterLogicalTables(Vec<AlterTableTask>),
71 CreateDatabase(CreateDatabaseTask),
72 DropDatabase(DropDatabaseTask),
73 AlterDatabase(AlterDatabaseTask),
74 CreateFlow(CreateFlowTask),
75 DropFlow(DropFlowTask),
76 #[cfg(feature = "enterprise")]
77 DropTrigger(trigger::DropTriggerTask),
78 CreateView(CreateViewTask),
79 DropView(DropViewTask),
80 #[cfg(feature = "enterprise")]
81 CreateTrigger(trigger::CreateTriggerTask),
82 CommentOn(CommentOnTask),
83}
84
85impl DdlTask {
86 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
88 DdlTask::CreateFlow(expr)
89 }
90
91 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
93 DdlTask::DropFlow(expr)
94 }
95
96 pub fn new_drop_view(expr: DropViewTask) -> Self {
98 DdlTask::DropView(expr)
99 }
100
101 pub fn new_create_table(
103 expr: CreateTableExpr,
104 partitions: Vec<Partition>,
105 table_info: TableInfo,
106 ) -> Self {
107 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
108 }
109
110 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self {
112 DdlTask::CreateLogicalTables(
113 table_data
114 .into_iter()
115 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
116 .collect(),
117 )
118 }
119
120 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
122 DdlTask::AlterLogicalTables(
123 table_data
124 .into_iter()
125 .map(|alter_table| AlterTableTask { alter_table })
126 .collect(),
127 )
128 }
129
130 pub fn new_drop_table(
132 catalog: String,
133 schema: String,
134 table: String,
135 table_id: TableId,
136 drop_if_exists: bool,
137 ) -> Self {
138 DdlTask::DropTable(DropTableTask {
139 catalog,
140 schema,
141 table,
142 table_id,
143 drop_if_exists,
144 })
145 }
146
147 pub fn new_create_database(
149 catalog: String,
150 schema: String,
151 create_if_not_exists: bool,
152 options: HashMap<String, String>,
153 ) -> Self {
154 DdlTask::CreateDatabase(CreateDatabaseTask {
155 catalog,
156 schema,
157 create_if_not_exists,
158 options,
159 })
160 }
161
162 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
164 DdlTask::DropDatabase(DropDatabaseTask {
165 catalog,
166 schema,
167 drop_if_exists,
168 })
169 }
170
171 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
173 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
174 }
175
176 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
178 DdlTask::AlterTable(AlterTableTask { alter_table })
179 }
180
181 pub fn new_truncate_table(
183 catalog: String,
184 schema: String,
185 table: String,
186 table_id: TableId,
187 time_ranges: Vec<(Timestamp, Timestamp)>,
188 ) -> Self {
189 DdlTask::TruncateTable(TruncateTableTask {
190 catalog,
191 schema,
192 table,
193 table_id,
194 time_ranges,
195 })
196 }
197
198 pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self {
200 DdlTask::CreateView(CreateViewTask {
201 create_view,
202 view_info,
203 })
204 }
205
206 pub fn new_comment_on(task: CommentOnTask) -> Self {
208 DdlTask::CommentOn(task)
209 }
210}
211
212impl TryFrom<Task> for DdlTask {
213 type Error = error::Error;
214 fn try_from(task: Task) -> Result<Self> {
215 match task {
216 Task::CreateTableTask(create_table) => {
217 Ok(DdlTask::CreateTable(create_table.try_into()?))
218 }
219 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
220 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
221 Task::TruncateTableTask(truncate_table) => {
222 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
223 }
224 Task::CreateTableTasks(create_tables) => {
225 let tasks = create_tables
226 .tasks
227 .into_iter()
228 .map(|task| task.try_into())
229 .collect::<Result<Vec<_>>>()?;
230
231 Ok(DdlTask::CreateLogicalTables(tasks))
232 }
233 Task::DropTableTasks(drop_tables) => {
234 let tasks = drop_tables
235 .tasks
236 .into_iter()
237 .map(|task| task.try_into())
238 .collect::<Result<Vec<_>>>()?;
239
240 Ok(DdlTask::DropLogicalTables(tasks))
241 }
242 Task::AlterTableTasks(alter_tables) => {
243 let tasks = alter_tables
244 .tasks
245 .into_iter()
246 .map(|task| task.try_into())
247 .collect::<Result<Vec<_>>>()?;
248
249 Ok(DdlTask::AlterLogicalTables(tasks))
250 }
251 Task::CreateDatabaseTask(create_database) => {
252 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
253 }
254 Task::DropDatabaseTask(drop_database) => {
255 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
256 }
257 Task::AlterDatabaseTask(alter_database) => {
258 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
259 }
260 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
261 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
262 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
263 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
264 Task::CreateTriggerTask(create_trigger) => {
265 #[cfg(feature = "enterprise")]
266 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
267 #[cfg(not(feature = "enterprise"))]
268 {
269 let _ = create_trigger;
270 crate::error::UnsupportedSnafu {
271 operation: "create trigger",
272 }
273 .fail()
274 }
275 }
276 Task::DropTriggerTask(drop_trigger) => {
277 #[cfg(feature = "enterprise")]
278 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
279 #[cfg(not(feature = "enterprise"))]
280 {
281 let _ = drop_trigger;
282 crate::error::UnsupportedSnafu {
283 operation: "drop trigger",
284 }
285 .fail()
286 }
287 }
288 Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
289 }
290 }
291}
292
293#[derive(Clone)]
294pub struct SubmitDdlTaskRequest {
295 pub query_context: QueryContext,
296 pub wait: bool,
297 pub timeout: Duration,
298 pub task: DdlTask,
299}
300
301impl SubmitDdlTaskRequest {
302 pub fn new(query_context: QueryContext, task: DdlTask) -> Self {
304 Self {
305 query_context,
306 wait: Self::default_wait(),
307 timeout: Self::default_timeout(),
308 task,
309 }
310 }
311
312 pub fn default_timeout() -> Duration {
314 Duration::from_secs(60)
315 }
316
317 pub fn default_wait() -> bool {
319 true
320 }
321}
322
323impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
324 type Error = error::Error;
325
326 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
327 let task = match request.task {
328 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
329 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
330 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
331 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
332 DdlTask::CreateLogicalTables(tasks) => {
333 let tasks = tasks
334 .into_iter()
335 .map(|task| task.try_into())
336 .collect::<Result<Vec<_>>>()?;
337
338 Task::CreateTableTasks(PbCreateTableTasks { tasks })
339 }
340 DdlTask::DropLogicalTables(tasks) => {
341 let tasks = tasks
342 .into_iter()
343 .map(|task| task.into())
344 .collect::<Vec<_>>();
345
346 Task::DropTableTasks(PbDropTableTasks { tasks })
347 }
348 DdlTask::AlterLogicalTables(tasks) => {
349 let tasks = tasks
350 .into_iter()
351 .map(|task| task.try_into())
352 .collect::<Result<Vec<_>>>()?;
353
354 Task::AlterTableTasks(PbAlterTableTasks { tasks })
355 }
356 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
357 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
358 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
359 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
360 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
361 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
362 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
363 #[cfg(feature = "enterprise")]
364 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
365 #[cfg(feature = "enterprise")]
366 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
367 DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
368 };
369
370 Ok(Self {
371 header: None,
372 query_context: Some(request.query_context.into()),
373 timeout_secs: request.timeout.as_secs() as u32,
374 wait: request.wait,
375 task: Some(task),
376 })
377 }
378}
379
380#[derive(Debug, Default)]
381pub struct SubmitDdlTaskResponse {
382 pub key: Vec<u8>,
383 pub table_ids: Vec<TableId>,
385}
386
387impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
388 type Error = error::Error;
389
390 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
391 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
392 Ok(Self {
393 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
394 table_ids,
395 })
396 }
397}
398
399impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
400 fn from(val: SubmitDdlTaskResponse) -> Self {
401 Self {
402 pid: Some(ProcedureId { key: val.key }),
403 table_ids: val
404 .table_ids
405 .into_iter()
406 .map(|id| api::v1::TableId { id })
407 .collect(),
408 ..Default::default()
409 }
410 }
411}
412
413#[derive(Debug, PartialEq, Clone)]
415pub struct CreateViewTask {
416 pub create_view: CreateViewExpr,
417 pub view_info: TableInfo,
418}
419
420impl CreateViewTask {
421 pub fn table_ref(&self) -> TableReference<'_> {
423 TableReference {
424 catalog: &self.create_view.catalog_name,
425 schema: &self.create_view.schema_name,
426 table: &self.create_view.view_name,
427 }
428 }
429
430 pub fn raw_logical_plan(&self) -> &Vec<u8> {
432 &self.create_view.logical_plan
433 }
434
435 pub fn view_definition(&self) -> &str {
437 &self.create_view.definition
438 }
439
440 pub fn table_names(&self) -> HashSet<TableName> {
442 self.create_view
443 .table_names
444 .iter()
445 .map(|t| t.clone().into())
446 .collect()
447 }
448
449 pub fn columns(&self) -> &Vec<String> {
451 &self.create_view.columns
452 }
453
454 pub fn plan_columns(&self) -> &Vec<String> {
456 &self.create_view.plan_columns
457 }
458}
459
460impl TryFrom<PbCreateViewTask> for CreateViewTask {
461 type Error = error::Error;
462
463 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
464 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
465
466 Ok(CreateViewTask {
467 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
468 err_msg: "expected create view",
469 })?,
470 view_info,
471 })
472 }
473}
474
475impl TryFrom<CreateViewTask> for PbCreateViewTask {
476 type Error = error::Error;
477
478 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
479 Ok(PbCreateViewTask {
480 create_view: Some(task.create_view),
481 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
482 })
483 }
484}
485
486impl Serialize for CreateViewTask {
487 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
488 where
489 S: serde::Serializer,
490 {
491 let view_info = serde_json::to_vec(&self.view_info)
492 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
493
494 let pb = PbCreateViewTask {
495 create_view: Some(self.create_view.clone()),
496 view_info,
497 };
498 let buf = pb.encode_to_vec();
499 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
500 serializer.serialize_str(&encoded)
501 }
502}
503
504impl<'de> Deserialize<'de> for CreateViewTask {
505 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
506 where
507 D: serde::Deserializer<'de>,
508 {
509 let encoded = String::deserialize(deserializer)?;
510 let buf = general_purpose::STANDARD_NO_PAD
511 .decode(encoded)
512 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
513 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
514 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
515
516 let expr = CreateViewTask::try_from(expr)
517 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
518
519 Ok(expr)
520 }
521}
522
523#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
525pub struct DropViewTask {
526 pub catalog: String,
527 pub schema: String,
528 pub view: String,
529 pub view_id: TableId,
530 pub drop_if_exists: bool,
531}
532
533impl DropViewTask {
534 pub fn table_ref(&self) -> TableReference<'_> {
536 TableReference {
537 catalog: &self.catalog,
538 schema: &self.schema,
539 table: &self.view,
540 }
541 }
542}
543
544impl TryFrom<PbDropViewTask> for DropViewTask {
545 type Error = error::Error;
546
547 fn try_from(pb: PbDropViewTask) -> Result<Self> {
548 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
549 err_msg: "expected drop view",
550 })?;
551
552 Ok(DropViewTask {
553 catalog: expr.catalog_name,
554 schema: expr.schema_name,
555 view: expr.view_name,
556 view_id: expr
557 .view_id
558 .context(error::InvalidProtoMsgSnafu {
559 err_msg: "expected view_id",
560 })?
561 .id,
562 drop_if_exists: expr.drop_if_exists,
563 })
564 }
565}
566
567impl From<DropViewTask> for PbDropViewTask {
568 fn from(task: DropViewTask) -> Self {
569 PbDropViewTask {
570 drop_view: Some(DropViewExpr {
571 catalog_name: task.catalog,
572 schema_name: task.schema,
573 view_name: task.view,
574 view_id: Some(api::v1::TableId { id: task.view_id }),
575 drop_if_exists: task.drop_if_exists,
576 }),
577 }
578 }
579}
580
581#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
582pub struct DropTableTask {
583 pub catalog: String,
584 pub schema: String,
585 pub table: String,
586 pub table_id: TableId,
587 #[serde(default)]
588 pub drop_if_exists: bool,
589}
590
591impl DropTableTask {
592 pub fn table_ref(&self) -> TableReference<'_> {
593 TableReference {
594 catalog: &self.catalog,
595 schema: &self.schema,
596 table: &self.table,
597 }
598 }
599
600 pub fn table_name(&self) -> TableName {
601 TableName {
602 catalog_name: self.catalog.clone(),
603 schema_name: self.schema.clone(),
604 table_name: self.table.clone(),
605 }
606 }
607}
608
609impl TryFrom<PbDropTableTask> for DropTableTask {
610 type Error = error::Error;
611
612 fn try_from(pb: PbDropTableTask) -> Result<Self> {
613 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
614 err_msg: "expected drop table",
615 })?;
616
617 Ok(Self {
618 catalog: drop_table.catalog_name,
619 schema: drop_table.schema_name,
620 table: drop_table.table_name,
621 table_id: drop_table
622 .table_id
623 .context(error::InvalidProtoMsgSnafu {
624 err_msg: "expected table_id",
625 })?
626 .id,
627 drop_if_exists: drop_table.drop_if_exists,
628 })
629 }
630}
631
632impl From<DropTableTask> for PbDropTableTask {
633 fn from(task: DropTableTask) -> Self {
634 PbDropTableTask {
635 drop_table: Some(DropTableExpr {
636 catalog_name: task.catalog,
637 schema_name: task.schema,
638 table_name: task.table,
639 table_id: Some(api::v1::TableId { id: task.table_id }),
640 drop_if_exists: task.drop_if_exists,
641 }),
642 }
643 }
644}
645
646#[derive(Debug, PartialEq, Clone)]
647pub struct CreateTableTask {
648 pub create_table: CreateTableExpr,
649 pub partitions: Vec<Partition>,
650 pub table_info: TableInfo,
651}
652
653impl TryFrom<PbCreateTableTask> for CreateTableTask {
654 type Error = error::Error;
655
656 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
657 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
658
659 Ok(CreateTableTask::new(
660 pb.create_table.context(error::InvalidProtoMsgSnafu {
661 err_msg: "expected create table",
662 })?,
663 pb.partitions,
664 table_info,
665 ))
666 }
667}
668
669impl TryFrom<CreateTableTask> for PbCreateTableTask {
670 type Error = error::Error;
671
672 fn try_from(task: CreateTableTask) -> Result<Self> {
673 Ok(PbCreateTableTask {
674 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
675 create_table: Some(task.create_table),
676 partitions: task.partitions,
677 })
678 }
679}
680
681impl CreateTableTask {
682 pub fn new(
683 expr: CreateTableExpr,
684 partitions: Vec<Partition>,
685 table_info: TableInfo,
686 ) -> CreateTableTask {
687 CreateTableTask {
688 create_table: expr,
689 partitions,
690 table_info,
691 }
692 }
693
694 pub fn table_name(&self) -> TableName {
695 let table = &self.create_table;
696
697 TableName {
698 catalog_name: table.catalog_name.clone(),
699 schema_name: table.schema_name.clone(),
700 table_name: table.table_name.clone(),
701 }
702 }
703
704 pub fn table_ref(&self) -> TableReference<'_> {
705 let table = &self.create_table;
706
707 TableReference {
708 catalog: &table.catalog_name,
709 schema: &table.schema_name,
710 table: &table.table_name,
711 }
712 }
713
714 pub fn set_table_id(&mut self, table_id: TableId) {
716 self.table_info.ident.table_id = table_id;
717 }
718
719 pub fn sort_columns(&mut self) {
724 self.create_table
727 .column_defs
728 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
729
730 self.table_info.sort_columns();
731 }
732}
733
734impl Serialize for CreateTableTask {
735 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
736 where
737 S: serde::Serializer,
738 {
739 let table_info = serde_json::to_vec(&self.table_info)
740 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
741
742 let pb = PbCreateTableTask {
743 create_table: Some(self.create_table.clone()),
744 partitions: self.partitions.clone(),
745 table_info,
746 };
747 let buf = pb.encode_to_vec();
748 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
749 serializer.serialize_str(&encoded)
750 }
751}
752
753impl<'de> Deserialize<'de> for CreateTableTask {
754 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
755 where
756 D: serde::Deserializer<'de>,
757 {
758 let encoded = String::deserialize(deserializer)?;
759 let buf = general_purpose::STANDARD_NO_PAD
760 .decode(encoded)
761 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
762 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
763 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
764
765 let expr = CreateTableTask::try_from(expr)
766 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
767
768 Ok(expr)
769 }
770}
771
772#[derive(Debug, PartialEq, Clone)]
773pub struct AlterTableTask {
774 pub alter_table: AlterTableExpr,
776}
777
778impl AlterTableTask {
779 pub fn validate(&self) -> Result<()> {
780 self.alter_table
781 .kind
782 .as_ref()
783 .context(error::UnexpectedSnafu {
784 err_msg: "'kind' is absent",
785 })?;
786 Ok(())
787 }
788
789 pub fn table_ref(&self) -> TableReference<'_> {
790 TableReference {
791 catalog: &self.alter_table.catalog_name,
792 schema: &self.alter_table.schema_name,
793 table: &self.alter_table.table_name,
794 }
795 }
796
797 pub fn table_name(&self) -> TableName {
798 let table = &self.alter_table;
799
800 TableName {
801 catalog_name: table.catalog_name.clone(),
802 schema_name: table.schema_name.clone(),
803 table_name: table.table_name.clone(),
804 }
805 }
806}
807
808impl TryFrom<PbAlterTableTask> for AlterTableTask {
809 type Error = error::Error;
810
811 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
812 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
813 err_msg: "expected alter_table",
814 })?;
815
816 Ok(AlterTableTask { alter_table })
817 }
818}
819
820impl TryFrom<AlterTableTask> for PbAlterTableTask {
821 type Error = error::Error;
822
823 fn try_from(task: AlterTableTask) -> Result<Self> {
824 Ok(PbAlterTableTask {
825 alter_table: Some(task.alter_table),
826 })
827 }
828}
829
830impl Serialize for AlterTableTask {
831 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
832 where
833 S: serde::Serializer,
834 {
835 let pb = PbAlterTableTask {
836 alter_table: Some(self.alter_table.clone()),
837 };
838 let buf = pb.encode_to_vec();
839 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
840 serializer.serialize_str(&encoded)
841 }
842}
843
844impl<'de> Deserialize<'de> for AlterTableTask {
845 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
846 where
847 D: serde::Deserializer<'de>,
848 {
849 let encoded = String::deserialize(deserializer)?;
850 let buf = general_purpose::STANDARD_NO_PAD
851 .decode(encoded)
852 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
853 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
854 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
855
856 let expr = AlterTableTask::try_from(expr)
857 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
858
859 Ok(expr)
860 }
861}
862
863#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
864pub struct TruncateTableTask {
865 pub catalog: String,
866 pub schema: String,
867 pub table: String,
868 pub table_id: TableId,
869 pub time_ranges: Vec<(Timestamp, Timestamp)>,
870}
871
872impl TruncateTableTask {
873 pub fn table_ref(&self) -> TableReference<'_> {
874 TableReference {
875 catalog: &self.catalog,
876 schema: &self.schema,
877 table: &self.table,
878 }
879 }
880
881 pub fn table_name(&self) -> TableName {
882 TableName {
883 catalog_name: self.catalog.clone(),
884 schema_name: self.schema.clone(),
885 table_name: self.table.clone(),
886 }
887 }
888}
889
890impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
891 type Error = error::Error;
892
893 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
894 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
895 err_msg: "expected truncate table",
896 })?;
897
898 Ok(Self {
899 catalog: truncate_table.catalog_name,
900 schema: truncate_table.schema_name,
901 table: truncate_table.table_name,
902 table_id: truncate_table
903 .table_id
904 .context(error::InvalidProtoMsgSnafu {
905 err_msg: "expected table_id",
906 })?
907 .id,
908 time_ranges: truncate_table
909 .time_ranges
910 .map(from_pb_time_ranges)
911 .transpose()
912 .map_err(BoxedError::new)
913 .context(ExternalSnafu)?
914 .unwrap_or_default(),
915 })
916 }
917}
918
919impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
920 type Error = error::Error;
921
922 fn try_from(task: TruncateTableTask) -> Result<Self> {
923 Ok(PbTruncateTableTask {
924 truncate_table: Some(TruncateTableExpr {
925 catalog_name: task.catalog,
926 schema_name: task.schema,
927 table_name: task.table,
928 table_id: Some(api::v1::TableId { id: task.table_id }),
929 time_ranges: Some(
930 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
931 ),
932 }),
933 })
934 }
935}
936
937#[serde_as]
938#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
939pub struct CreateDatabaseTask {
940 pub catalog: String,
941 pub schema: String,
942 pub create_if_not_exists: bool,
943 #[serde_as(deserialize_as = "DefaultOnNull")]
944 pub options: HashMap<String, String>,
945}
946
947impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
948 type Error = error::Error;
949
950 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
951 let CreateDatabaseExpr {
952 catalog_name,
953 schema_name,
954 create_if_not_exists,
955 options,
956 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
957 err_msg: "expected create database",
958 })?;
959
960 Ok(CreateDatabaseTask {
961 catalog: catalog_name,
962 schema: schema_name,
963 create_if_not_exists,
964 options,
965 })
966 }
967}
968
969impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
970 type Error = error::Error;
971
972 fn try_from(
973 CreateDatabaseTask {
974 catalog,
975 schema,
976 create_if_not_exists,
977 options,
978 }: CreateDatabaseTask,
979 ) -> Result<Self> {
980 Ok(PbCreateDatabaseTask {
981 create_database: Some(CreateDatabaseExpr {
982 catalog_name: catalog,
983 schema_name: schema,
984 create_if_not_exists,
985 options,
986 }),
987 })
988 }
989}
990
991#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
992pub struct DropDatabaseTask {
993 pub catalog: String,
994 pub schema: String,
995 pub drop_if_exists: bool,
996}
997
998impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
999 type Error = error::Error;
1000
1001 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1002 let DropDatabaseExpr {
1003 catalog_name,
1004 schema_name,
1005 drop_if_exists,
1006 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1007 err_msg: "expected drop database",
1008 })?;
1009
1010 Ok(DropDatabaseTask {
1011 catalog: catalog_name,
1012 schema: schema_name,
1013 drop_if_exists,
1014 })
1015 }
1016}
1017
1018impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1019 type Error = error::Error;
1020
1021 fn try_from(
1022 DropDatabaseTask {
1023 catalog,
1024 schema,
1025 drop_if_exists,
1026 }: DropDatabaseTask,
1027 ) -> Result<Self> {
1028 Ok(PbDropDatabaseTask {
1029 drop_database: Some(DropDatabaseExpr {
1030 catalog_name: catalog,
1031 schema_name: schema,
1032 drop_if_exists,
1033 }),
1034 })
1035 }
1036}
1037
1038#[derive(Debug, PartialEq, Clone)]
1039pub struct AlterDatabaseTask {
1040 pub alter_expr: AlterDatabaseExpr,
1041}
1042
1043impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1044 type Error = error::Error;
1045
1046 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1047 Ok(PbAlterDatabaseTask {
1048 task: Some(task.alter_expr),
1049 })
1050 }
1051}
1052
1053impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1054 type Error = error::Error;
1055
1056 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1057 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1058 err_msg: "expected alter database",
1059 })?;
1060
1061 Ok(AlterDatabaseTask { alter_expr })
1062 }
1063}
1064
1065impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1066 type Error = error::Error;
1067
1068 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1069 match pb {
1070 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1071 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1072 options
1073 .set_database_options
1074 .into_iter()
1075 .map(SetDatabaseOption::try_from)
1076 .collect::<Result<Vec<_>>>()?,
1077 )))
1078 }
1079 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1080 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1081 options
1082 .keys
1083 .iter()
1084 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1085 .collect::<Result<Vec<_>>>()?,
1086 )),
1087 ),
1088 }
1089 }
1090}
1091
1092const TTL_KEY: &str = "ttl";
1093
1094impl TryFrom<PbOption> for SetDatabaseOption {
1095 type Error = error::Error;
1096
1097 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1098 let key_lower = key.to_ascii_lowercase();
1099 match key_lower.as_str() {
1100 TTL_KEY => {
1101 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1102 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1103
1104 Ok(SetDatabaseOption::Ttl(ttl))
1105 }
1106 _ => {
1107 if validate_database_option(&key_lower) {
1108 Ok(SetDatabaseOption::Other(key_lower, value))
1109 } else {
1110 InvalidSetDatabaseOptionSnafu { key, value }.fail()
1111 }
1112 }
1113 }
1114 }
1115}
1116
1117#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1118pub enum SetDatabaseOption {
1119 Ttl(DatabaseTimeToLive),
1120 Other(String, String),
1121}
1122
1123#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1124pub enum UnsetDatabaseOption {
1125 Ttl,
1126 Other(String),
1127}
1128
1129impl TryFrom<&str> for UnsetDatabaseOption {
1130 type Error = error::Error;
1131
1132 fn try_from(key: &str) -> Result<Self> {
1133 let key_lower = key.to_ascii_lowercase();
1134 match key_lower.as_str() {
1135 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1136 _ => {
1137 if validate_database_option(&key_lower) {
1138 Ok(UnsetDatabaseOption::Other(key_lower))
1139 } else {
1140 InvalidUnsetDatabaseOptionSnafu { key }.fail()
1141 }
1142 }
1143 }
1144 }
1145}
1146
1147#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1148pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1149
1150#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1151pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1152
1153#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1154pub enum AlterDatabaseKind {
1155 SetDatabaseOptions(SetDatabaseOptions),
1156 UnsetDatabaseOptions(UnsetDatabaseOptions),
1157}
1158
1159impl AlterDatabaseTask {
1160 pub fn catalog(&self) -> &str {
1161 &self.alter_expr.catalog_name
1162 }
1163
1164 pub fn schema(&self) -> &str {
1165 &self.alter_expr.catalog_name
1166 }
1167}
1168
1169#[derive(Debug, Clone, Serialize, Deserialize)]
1171pub struct CreateFlowTask {
1172 pub catalog_name: String,
1173 pub flow_name: String,
1174 pub source_table_names: Vec<TableName>,
1175 pub sink_table_name: TableName,
1176 pub or_replace: bool,
1177 pub create_if_not_exists: bool,
1178 pub expire_after: Option<i64>,
1180 pub eval_interval_secs: Option<i64>,
1181 pub comment: String,
1182 pub sql: String,
1183 pub flow_options: HashMap<String, String>,
1184}
1185
1186impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1187 type Error = error::Error;
1188
1189 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1190 let CreateFlowExpr {
1191 catalog_name,
1192 flow_name,
1193 source_table_names,
1194 sink_table_name,
1195 or_replace,
1196 create_if_not_exists,
1197 expire_after,
1198 eval_interval,
1199 comment,
1200 sql,
1201 flow_options,
1202 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1203 err_msg: "expected create_flow",
1204 })?;
1205
1206 Ok(CreateFlowTask {
1207 catalog_name,
1208 flow_name,
1209 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1210 sink_table_name: sink_table_name
1211 .context(error::InvalidProtoMsgSnafu {
1212 err_msg: "expected sink_table_name",
1213 })?
1214 .into(),
1215 or_replace,
1216 create_if_not_exists,
1217 expire_after: expire_after.map(|e| e.value),
1218 eval_interval_secs: eval_interval.map(|e| e.seconds),
1219 comment,
1220 sql,
1221 flow_options,
1222 })
1223 }
1224}
1225
1226impl From<CreateFlowTask> for PbCreateFlowTask {
1227 fn from(
1228 CreateFlowTask {
1229 catalog_name,
1230 flow_name,
1231 source_table_names,
1232 sink_table_name,
1233 or_replace,
1234 create_if_not_exists,
1235 expire_after,
1236 eval_interval_secs: eval_interval,
1237 comment,
1238 sql,
1239 flow_options,
1240 }: CreateFlowTask,
1241 ) -> Self {
1242 PbCreateFlowTask {
1243 create_flow: Some(CreateFlowExpr {
1244 catalog_name,
1245 flow_name,
1246 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1247 sink_table_name: Some(sink_table_name.into()),
1248 or_replace,
1249 create_if_not_exists,
1250 expire_after: expire_after.map(|value| ExpireAfter { value }),
1251 eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1252 comment,
1253 sql,
1254 flow_options,
1255 }),
1256 }
1257 }
1258}
1259
1260#[derive(Debug, Clone, Serialize, Deserialize)]
1262pub struct DropFlowTask {
1263 pub catalog_name: String,
1264 pub flow_name: String,
1265 pub flow_id: FlowId,
1266 pub drop_if_exists: bool,
1267}
1268
1269impl TryFrom<PbDropFlowTask> for DropFlowTask {
1270 type Error = error::Error;
1271
1272 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1273 let DropFlowExpr {
1274 catalog_name,
1275 flow_name,
1276 flow_id,
1277 drop_if_exists,
1278 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1279 err_msg: "expected drop_flow",
1280 })?;
1281 let flow_id = flow_id
1282 .context(error::InvalidProtoMsgSnafu {
1283 err_msg: "expected flow_id",
1284 })?
1285 .id;
1286 Ok(DropFlowTask {
1287 catalog_name,
1288 flow_name,
1289 flow_id,
1290 drop_if_exists,
1291 })
1292 }
1293}
1294
1295impl From<DropFlowTask> for PbDropFlowTask {
1296 fn from(
1297 DropFlowTask {
1298 catalog_name,
1299 flow_name,
1300 flow_id,
1301 drop_if_exists,
1302 }: DropFlowTask,
1303 ) -> Self {
1304 PbDropFlowTask {
1305 drop_flow: Some(DropFlowExpr {
1306 catalog_name,
1307 flow_name,
1308 flow_id: Some(api::v1::FlowId { id: flow_id }),
1309 drop_if_exists,
1310 }),
1311 }
1312 }
1313}
1314
1315#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1317pub enum CommentObjectId {
1318 Table(TableId),
1319 Flow(FlowId),
1320}
1321
1322#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1324pub struct CommentOnTask {
1325 pub catalog_name: String,
1326 pub schema_name: String,
1327 pub object_type: CommentObjectType,
1328 pub object_name: String,
1329 pub column_name: Option<String>,
1331 pub object_id: Option<CommentObjectId>,
1333 pub comment: Option<String>,
1334}
1335
1336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1337pub enum CommentObjectType {
1338 Table,
1339 Column,
1340 Flow,
1341}
1342
1343impl CommentOnTask {
1344 pub fn table_ref(&self) -> TableReference<'_> {
1345 TableReference {
1346 catalog: &self.catalog_name,
1347 schema: &self.schema_name,
1348 table: &self.object_name,
1349 }
1350 }
1351}
1352
1353impl From<CommentObjectType> for PbCommentObjectType {
1355 fn from(object_type: CommentObjectType) -> Self {
1356 match object_type {
1357 CommentObjectType::Table => PbCommentObjectType::Table,
1358 CommentObjectType::Column => PbCommentObjectType::Column,
1359 CommentObjectType::Flow => PbCommentObjectType::Flow,
1360 }
1361 }
1362}
1363
1364impl TryFrom<i32> for CommentObjectType {
1365 type Error = error::Error;
1366
1367 fn try_from(value: i32) -> Result<Self> {
1368 match value {
1369 0 => Ok(CommentObjectType::Table),
1370 1 => Ok(CommentObjectType::Column),
1371 2 => Ok(CommentObjectType::Flow),
1372 _ => error::InvalidProtoMsgSnafu {
1373 err_msg: format!(
1374 "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1375 value
1376 ),
1377 }
1378 .fail(),
1379 }
1380 }
1381}
1382
1383impl TryFrom<PbCommentOnTask> for CommentOnTask {
1385 type Error = error::Error;
1386
1387 fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1388 let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1389 err_msg: "expected comment_on",
1390 })?;
1391
1392 Ok(CommentOnTask {
1393 catalog_name: comment_on.catalog_name,
1394 schema_name: comment_on.schema_name,
1395 object_type: comment_on.object_type.try_into()?,
1396 object_name: comment_on.object_name,
1397 column_name: if comment_on.column_name.is_empty() {
1398 None
1399 } else {
1400 Some(comment_on.column_name)
1401 },
1402 comment: if comment_on.comment.is_empty() {
1403 None
1404 } else {
1405 Some(comment_on.comment)
1406 },
1407 object_id: None,
1408 })
1409 }
1410}
1411
1412impl From<CommentOnTask> for PbCommentOnTask {
1413 fn from(task: CommentOnTask) -> Self {
1414 let pb_object_type: PbCommentObjectType = task.object_type.into();
1415 PbCommentOnTask {
1416 comment_on: Some(CommentOnExpr {
1417 catalog_name: task.catalog_name,
1418 schema_name: task.schema_name,
1419 object_type: pb_object_type as i32,
1420 object_name: task.object_name,
1421 column_name: task.column_name.unwrap_or_default(),
1422 comment: task.comment.unwrap_or_default(),
1423 }),
1424 }
1425 }
1426}
1427
1428#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1429pub struct QueryContext {
1430 pub current_catalog: String,
1431 pub current_schema: String,
1432 pub timezone: String,
1433 pub extensions: HashMap<String, String>,
1434 pub channel: u8,
1435}
1436
1437impl QueryContext {
1438 pub fn current_catalog(&self) -> &str {
1440 &self.current_catalog
1441 }
1442
1443 pub fn current_schema(&self) -> &str {
1445 &self.current_schema
1446 }
1447
1448 pub fn timezone(&self) -> &str {
1450 &self.timezone
1451 }
1452
1453 pub fn extensions(&self) -> &HashMap<String, String> {
1455 &self.extensions
1456 }
1457
1458 pub fn channel(&self) -> u8 {
1460 self.channel
1461 }
1462}
1463
1464#[derive(Debug, Clone, Serialize, PartialEq)]
1468pub struct FlowQueryContext {
1469 pub catalog: String,
1471 pub schema: String,
1473 pub timezone: String,
1475}
1476
1477impl<'de> Deserialize<'de> for FlowQueryContext {
1478 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1479 where
1480 D: serde::Deserializer<'de>,
1481 {
1482 #[derive(Deserialize)]
1484 #[serde(untagged)]
1485 enum ContextCompat {
1486 Flow(FlowQueryContextHelper),
1487 Full(QueryContext),
1488 }
1489
1490 #[derive(Deserialize)]
1491 struct FlowQueryContextHelper {
1492 catalog: String,
1493 schema: String,
1494 timezone: String,
1495 }
1496
1497 match ContextCompat::deserialize(deserializer)? {
1498 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1499 catalog: helper.catalog,
1500 schema: helper.schema,
1501 timezone: helper.timezone,
1502 }),
1503 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1504 }
1505 }
1506}
1507
1508impl From<PbQueryContext> for QueryContext {
1509 fn from(pb_ctx: PbQueryContext) -> Self {
1510 Self {
1511 current_catalog: pb_ctx.current_catalog,
1512 current_schema: pb_ctx.current_schema,
1513 timezone: pb_ctx.timezone,
1514 extensions: pb_ctx.extensions,
1515 channel: pb_ctx.channel as u8,
1516 }
1517 }
1518}
1519
1520impl From<QueryContext> for PbQueryContext {
1521 fn from(
1522 QueryContext {
1523 current_catalog,
1524 current_schema,
1525 timezone,
1526 extensions,
1527 channel,
1528 }: QueryContext,
1529 ) -> Self {
1530 PbQueryContext {
1531 current_catalog,
1532 current_schema,
1533 timezone,
1534 extensions,
1535 channel: channel as u32,
1536 snapshot_seqs: None,
1537 explain: None,
1538 }
1539 }
1540}
1541
1542impl From<QueryContext> for FlowQueryContext {
1543 fn from(ctx: QueryContext) -> Self {
1544 Self {
1545 catalog: ctx.current_catalog,
1546 schema: ctx.current_schema,
1547 timezone: ctx.timezone,
1548 }
1549 }
1550}
1551
1552impl From<FlowQueryContext> for QueryContext {
1553 fn from(flow_ctx: FlowQueryContext) -> Self {
1554 Self {
1555 current_catalog: flow_ctx.catalog,
1556 current_schema: flow_ctx.schema,
1557 timezone: flow_ctx.timezone,
1558 extensions: HashMap::new(),
1559 channel: 0, }
1561 }
1562}
1563
1564impl From<FlowQueryContext> for PbQueryContext {
1565 fn from(flow_ctx: FlowQueryContext) -> Self {
1566 let query_ctx: QueryContext = flow_ctx.into();
1567 query_ctx.into()
1568 }
1569}
1570
1571#[cfg(test)]
1572mod tests {
1573 use std::sync::Arc;
1574
1575 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1576 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
1577 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1578 use store_api::storage::ConcreteDataType;
1579 use table::metadata::{TableInfo, TableMeta, TableType};
1580 use table::test_util::table_info::test_table_info;
1581
1582 use super::{AlterTableTask, CreateTableTask, *};
1583
1584 #[test]
1585 fn test_basic_ser_de_create_table_task() {
1586 let schema = SchemaBuilder::default().build().unwrap();
1587 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1588 let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info);
1589
1590 let output = serde_json::to_vec(&task).unwrap();
1591
1592 let de = serde_json::from_slice(&output).unwrap();
1593 assert_eq!(task, de);
1594 }
1595
1596 #[test]
1597 fn test_basic_ser_de_alter_table_task() {
1598 let task = AlterTableTask {
1599 alter_table: AlterTableExpr::default(),
1600 };
1601
1602 let output = serde_json::to_vec(&task).unwrap();
1603
1604 let de = serde_json::from_slice(&output).unwrap();
1605 assert_eq!(task, de);
1606 }
1607
1608 #[test]
1609 fn test_sort_columns() {
1610 let schema = Arc::new(Schema::new(vec![
1612 ColumnSchema::new(
1613 "column3".to_string(),
1614 ConcreteDataType::string_datatype(),
1615 true,
1616 ),
1617 ColumnSchema::new(
1618 "column1".to_string(),
1619 ConcreteDataType::timestamp_millisecond_datatype(),
1620 false,
1621 )
1622 .with_time_index(true),
1623 ColumnSchema::new(
1624 "column2".to_string(),
1625 ConcreteDataType::float64_datatype(),
1626 true,
1627 ),
1628 ]));
1629
1630 let meta = TableMeta {
1632 schema,
1633 primary_key_indices: vec![0],
1634 value_indices: vec![2],
1635 engine: METRIC_ENGINE_NAME.to_string(),
1636 next_column_id: 0,
1637 options: Default::default(),
1638 created_on: Default::default(),
1639 updated_on: Default::default(),
1640 partition_key_indices: Default::default(),
1641 column_ids: Default::default(),
1642 };
1643
1644 let raw_table_info = TableInfo {
1646 ident: Default::default(),
1647 meta,
1648 name: Default::default(),
1649 desc: Default::default(),
1650 catalog_name: Default::default(),
1651 schema_name: Default::default(),
1652 table_type: TableType::Base,
1653 };
1654
1655 let create_table_expr = CreateTableExpr {
1657 column_defs: vec![
1658 ColumnDef {
1659 name: "column3".to_string(),
1660 semantic_type: SemanticType::Tag as i32,
1661 ..Default::default()
1662 },
1663 ColumnDef {
1664 name: "column1".to_string(),
1665 semantic_type: SemanticType::Timestamp as i32,
1666 ..Default::default()
1667 },
1668 ColumnDef {
1669 name: "column2".to_string(),
1670 semantic_type: SemanticType::Field as i32,
1671 ..Default::default()
1672 },
1673 ],
1674 primary_keys: vec!["column3".to_string()],
1675 ..Default::default()
1676 };
1677
1678 let mut create_table_task =
1679 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1680
1681 create_table_task.sort_columns();
1683
1684 assert_eq!(
1686 create_table_task.create_table.column_defs[0].name,
1687 "column1".to_string()
1688 );
1689 assert_eq!(
1690 create_table_task.create_table.column_defs[1].name,
1691 "column2".to_string()
1692 );
1693 assert_eq!(
1694 create_table_task.create_table.column_defs[2].name,
1695 "column3".to_string()
1696 );
1697
1698 assert_eq!(
1700 create_table_task.table_info.meta.schema.timestamp_index(),
1701 Some(0)
1702 );
1703 assert_eq!(
1704 create_table_task.table_info.meta.primary_key_indices,
1705 vec![2]
1706 );
1707 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1708 }
1709
1710 #[test]
1711 fn test_flow_query_context_conversion_from_query_context() {
1712 use std::collections::HashMap;
1713 let mut extensions = HashMap::new();
1714 extensions.insert("key1".to_string(), "value1".to_string());
1715 extensions.insert("key2".to_string(), "value2".to_string());
1716
1717 let query_ctx = QueryContext {
1718 current_catalog: "test_catalog".to_string(),
1719 current_schema: "test_schema".to_string(),
1720 timezone: "UTC".to_string(),
1721 extensions,
1722 channel: 5,
1723 };
1724
1725 let flow_ctx: FlowQueryContext = query_ctx.into();
1726
1727 assert_eq!(flow_ctx.catalog, "test_catalog");
1728 assert_eq!(flow_ctx.schema, "test_schema");
1729 assert_eq!(flow_ctx.timezone, "UTC");
1730 }
1731
1732 #[test]
1733 fn test_flow_query_context_conversion_to_query_context() {
1734 let flow_ctx = FlowQueryContext {
1735 catalog: "prod_catalog".to_string(),
1736 schema: "public".to_string(),
1737 timezone: "America/New_York".to_string(),
1738 };
1739
1740 let query_ctx: QueryContext = flow_ctx.clone().into();
1741
1742 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1743 assert_eq!(query_ctx.current_schema, "public");
1744 assert_eq!(query_ctx.timezone, "America/New_York");
1745 assert!(query_ctx.extensions.is_empty());
1746 assert_eq!(query_ctx.channel, 0);
1747
1748 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1750 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1751 }
1752
1753 #[test]
1754 fn test_flow_query_context_serialization() {
1755 let flow_ctx = FlowQueryContext {
1756 catalog: "test_catalog".to_string(),
1757 schema: "test_schema".to_string(),
1758 timezone: "UTC".to_string(),
1759 };
1760
1761 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1762 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1763
1764 assert_eq!(flow_ctx, deserialized);
1765
1766 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1768 assert_eq!(json_value["catalog"], "test_catalog");
1769 assert_eq!(json_value["schema"], "test_schema");
1770 assert_eq!(json_value["timezone"], "UTC");
1771 }
1772
1773 #[test]
1774 fn test_flow_query_context_conversion_to_pb() {
1775 let flow_ctx = FlowQueryContext {
1776 catalog: "pb_catalog".to_string(),
1777 schema: "pb_schema".to_string(),
1778 timezone: "Asia/Tokyo".to_string(),
1779 };
1780
1781 let pb_ctx: PbQueryContext = flow_ctx.into();
1782
1783 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1784 assert_eq!(pb_ctx.current_schema, "pb_schema");
1785 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1786 assert!(pb_ctx.extensions.is_empty());
1787 assert_eq!(pb_ctx.channel, 0);
1788 assert!(pb_ctx.snapshot_seqs.is_none());
1789 assert!(pb_ctx.explain.is_none());
1790 }
1791}