1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27 AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28 CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29 CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30 CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31 DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32 DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33 DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34 TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37 AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38 CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39 DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40 QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{TableId, TableInfo};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57 InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61#[derive(Debug, Clone)]
63pub enum DdlTask {
64 CreateTable(CreateTableTask),
65 DropTable(DropTableTask),
66 AlterTable(AlterTableTask),
67 TruncateTable(TruncateTableTask),
68 CreateLogicalTables(Vec<CreateTableTask>),
69 DropLogicalTables(Vec<DropTableTask>),
70 AlterLogicalTables(Vec<AlterTableTask>),
71 CreateDatabase(CreateDatabaseTask),
72 DropDatabase(DropDatabaseTask),
73 AlterDatabase(AlterDatabaseTask),
74 CreateFlow(CreateFlowTask),
75 DropFlow(DropFlowTask),
76 #[cfg(feature = "enterprise")]
77 DropTrigger(trigger::DropTriggerTask),
78 CreateView(CreateViewTask),
79 DropView(DropViewTask),
80 #[cfg(feature = "enterprise")]
81 CreateTrigger(trigger::CreateTriggerTask),
82 CommentOn(CommentOnTask),
83}
84
85impl DdlTask {
86 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
88 DdlTask::CreateFlow(expr)
89 }
90
91 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
93 DdlTask::DropFlow(expr)
94 }
95
96 pub fn new_drop_view(expr: DropViewTask) -> Self {
98 DdlTask::DropView(expr)
99 }
100
101 pub fn new_create_table(
103 expr: CreateTableExpr,
104 partitions: Vec<Partition>,
105 table_info: TableInfo,
106 ) -> Self {
107 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
108 }
109
110 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self {
112 DdlTask::CreateLogicalTables(
113 table_data
114 .into_iter()
115 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
116 .collect(),
117 )
118 }
119
120 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
122 DdlTask::AlterLogicalTables(
123 table_data
124 .into_iter()
125 .map(|alter_table| AlterTableTask { alter_table })
126 .collect(),
127 )
128 }
129
130 pub fn new_drop_table(
132 catalog: String,
133 schema: String,
134 table: String,
135 table_id: TableId,
136 drop_if_exists: bool,
137 ) -> Self {
138 DdlTask::DropTable(DropTableTask {
139 catalog,
140 schema,
141 table,
142 table_id,
143 drop_if_exists,
144 })
145 }
146
147 pub fn new_create_database(
149 catalog: String,
150 schema: String,
151 create_if_not_exists: bool,
152 options: HashMap<String, String>,
153 ) -> Self {
154 DdlTask::CreateDatabase(CreateDatabaseTask {
155 catalog,
156 schema,
157 create_if_not_exists,
158 options,
159 })
160 }
161
162 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
164 DdlTask::DropDatabase(DropDatabaseTask {
165 catalog,
166 schema,
167 drop_if_exists,
168 })
169 }
170
171 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
173 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
174 }
175
176 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
178 DdlTask::AlterTable(AlterTableTask { alter_table })
179 }
180
181 pub fn new_truncate_table(
183 catalog: String,
184 schema: String,
185 table: String,
186 table_id: TableId,
187 time_ranges: Vec<(Timestamp, Timestamp)>,
188 ) -> Self {
189 DdlTask::TruncateTable(TruncateTableTask {
190 catalog,
191 schema,
192 table,
193 table_id,
194 time_ranges,
195 })
196 }
197
198 pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self {
200 DdlTask::CreateView(CreateViewTask {
201 create_view,
202 view_info,
203 })
204 }
205
206 pub fn new_comment_on(task: CommentOnTask) -> Self {
208 DdlTask::CommentOn(task)
209 }
210}
211
212impl TryFrom<Task> for DdlTask {
213 type Error = error::Error;
214 fn try_from(task: Task) -> Result<Self> {
215 match task {
216 Task::CreateTableTask(create_table) => {
217 Ok(DdlTask::CreateTable(create_table.try_into()?))
218 }
219 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
220 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
221 Task::TruncateTableTask(truncate_table) => {
222 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
223 }
224 Task::CreateTableTasks(create_tables) => {
225 let tasks = create_tables
226 .tasks
227 .into_iter()
228 .map(|task| task.try_into())
229 .collect::<Result<Vec<_>>>()?;
230
231 Ok(DdlTask::CreateLogicalTables(tasks))
232 }
233 Task::DropTableTasks(drop_tables) => {
234 let tasks = drop_tables
235 .tasks
236 .into_iter()
237 .map(|task| task.try_into())
238 .collect::<Result<Vec<_>>>()?;
239
240 Ok(DdlTask::DropLogicalTables(tasks))
241 }
242 Task::AlterTableTasks(alter_tables) => {
243 let tasks = alter_tables
244 .tasks
245 .into_iter()
246 .map(|task| task.try_into())
247 .collect::<Result<Vec<_>>>()?;
248
249 Ok(DdlTask::AlterLogicalTables(tasks))
250 }
251 Task::CreateDatabaseTask(create_database) => {
252 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
253 }
254 Task::DropDatabaseTask(drop_database) => {
255 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
256 }
257 Task::AlterDatabaseTask(alter_database) => {
258 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
259 }
260 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
261 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
262 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
263 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
264 Task::CreateTriggerTask(create_trigger) => {
265 #[cfg(feature = "enterprise")]
266 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
267 #[cfg(not(feature = "enterprise"))]
268 {
269 let _ = create_trigger;
270 crate::error::UnsupportedSnafu {
271 operation: "create trigger",
272 }
273 .fail()
274 }
275 }
276 Task::DropTriggerTask(drop_trigger) => {
277 #[cfg(feature = "enterprise")]
278 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
279 #[cfg(not(feature = "enterprise"))]
280 {
281 let _ = drop_trigger;
282 crate::error::UnsupportedSnafu {
283 operation: "drop trigger",
284 }
285 .fail()
286 }
287 }
288 Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
289 }
290 }
291}
292
293#[derive(Clone)]
294pub struct SubmitDdlTaskRequest {
295 pub query_context: QueryContext,
296 pub wait: bool,
297 pub timeout: Duration,
298 pub task: DdlTask,
299}
300
301impl SubmitDdlTaskRequest {
302 pub fn new(query_context: QueryContext, task: DdlTask) -> Self {
304 Self {
305 query_context,
306 wait: Self::default_wait(),
307 timeout: Self::default_timeout(),
308 task,
309 }
310 }
311
312 pub fn default_timeout() -> Duration {
314 Duration::from_secs(60)
315 }
316
317 pub fn default_wait() -> bool {
319 true
320 }
321}
322
323impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
324 type Error = error::Error;
325
326 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
327 let task = match request.task {
328 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
329 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
330 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
331 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
332 DdlTask::CreateLogicalTables(tasks) => {
333 let tasks = tasks
334 .into_iter()
335 .map(|task| task.try_into())
336 .collect::<Result<Vec<_>>>()?;
337
338 Task::CreateTableTasks(PbCreateTableTasks { tasks })
339 }
340 DdlTask::DropLogicalTables(tasks) => {
341 let tasks = tasks
342 .into_iter()
343 .map(|task| task.into())
344 .collect::<Vec<_>>();
345
346 Task::DropTableTasks(PbDropTableTasks { tasks })
347 }
348 DdlTask::AlterLogicalTables(tasks) => {
349 let tasks = tasks
350 .into_iter()
351 .map(|task| task.try_into())
352 .collect::<Result<Vec<_>>>()?;
353
354 Task::AlterTableTasks(PbAlterTableTasks { tasks })
355 }
356 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
357 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
358 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
359 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
360 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
361 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
362 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
363 #[cfg(feature = "enterprise")]
364 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
365 #[cfg(feature = "enterprise")]
366 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
367 DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
368 };
369
370 Ok(Self {
371 header: None,
372 query_context: Some(request.query_context.into()),
373 timeout_secs: request.timeout.as_secs() as u32,
374 wait: request.wait,
375 task: Some(task),
376 })
377 }
378}
379
380#[derive(Debug, Default)]
381pub struct SubmitDdlTaskResponse {
382 pub key: Vec<u8>,
383 pub table_ids: Vec<TableId>,
385}
386
387impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
388 type Error = error::Error;
389
390 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
391 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
392 Ok(Self {
393 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
394 table_ids,
395 })
396 }
397}
398
399impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
400 fn from(val: SubmitDdlTaskResponse) -> Self {
401 Self {
402 pid: Some(ProcedureId { key: val.key }),
403 table_ids: val
404 .table_ids
405 .into_iter()
406 .map(|id| api::v1::TableId { id })
407 .collect(),
408 ..Default::default()
409 }
410 }
411}
412
413#[derive(Debug, PartialEq, Clone)]
415pub struct CreateViewTask {
416 pub create_view: CreateViewExpr,
417 pub view_info: TableInfo,
418}
419
420impl CreateViewTask {
421 pub fn table_ref(&self) -> TableReference<'_> {
423 TableReference {
424 catalog: &self.create_view.catalog_name,
425 schema: &self.create_view.schema_name,
426 table: &self.create_view.view_name,
427 }
428 }
429
430 pub fn raw_logical_plan(&self) -> &Vec<u8> {
432 &self.create_view.logical_plan
433 }
434
435 pub fn view_definition(&self) -> &str {
437 &self.create_view.definition
438 }
439
440 pub fn table_names(&self) -> HashSet<TableName> {
442 self.create_view
443 .table_names
444 .iter()
445 .map(|t| t.clone().into())
446 .collect()
447 }
448
449 pub fn columns(&self) -> &Vec<String> {
451 &self.create_view.columns
452 }
453
454 pub fn plan_columns(&self) -> &Vec<String> {
456 &self.create_view.plan_columns
457 }
458}
459
460impl TryFrom<PbCreateViewTask> for CreateViewTask {
461 type Error = error::Error;
462
463 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
464 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
465
466 Ok(CreateViewTask {
467 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
468 err_msg: "expected create view",
469 })?,
470 view_info,
471 })
472 }
473}
474
475impl TryFrom<CreateViewTask> for PbCreateViewTask {
476 type Error = error::Error;
477
478 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
479 Ok(PbCreateViewTask {
480 create_view: Some(task.create_view),
481 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
482 })
483 }
484}
485
486impl Serialize for CreateViewTask {
487 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
488 where
489 S: serde::Serializer,
490 {
491 let view_info = serde_json::to_vec(&self.view_info)
492 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
493
494 let pb = PbCreateViewTask {
495 create_view: Some(self.create_view.clone()),
496 view_info,
497 };
498 let buf = pb.encode_to_vec();
499 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
500 serializer.serialize_str(&encoded)
501 }
502}
503
504impl<'de> Deserialize<'de> for CreateViewTask {
505 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
506 where
507 D: serde::Deserializer<'de>,
508 {
509 let encoded = String::deserialize(deserializer)?;
510 let buf = general_purpose::STANDARD_NO_PAD
511 .decode(encoded)
512 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
513 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
514 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
515
516 let expr = CreateViewTask::try_from(expr)
517 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
518
519 Ok(expr)
520 }
521}
522
523#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
525pub struct DropViewTask {
526 pub catalog: String,
527 pub schema: String,
528 pub view: String,
529 pub view_id: TableId,
530 pub drop_if_exists: bool,
531}
532
533impl DropViewTask {
534 pub fn table_ref(&self) -> TableReference<'_> {
536 TableReference {
537 catalog: &self.catalog,
538 schema: &self.schema,
539 table: &self.view,
540 }
541 }
542}
543
544impl TryFrom<PbDropViewTask> for DropViewTask {
545 type Error = error::Error;
546
547 fn try_from(pb: PbDropViewTask) -> Result<Self> {
548 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
549 err_msg: "expected drop view",
550 })?;
551
552 Ok(DropViewTask {
553 catalog: expr.catalog_name,
554 schema: expr.schema_name,
555 view: expr.view_name,
556 view_id: expr
557 .view_id
558 .context(error::InvalidProtoMsgSnafu {
559 err_msg: "expected view_id",
560 })?
561 .id,
562 drop_if_exists: expr.drop_if_exists,
563 })
564 }
565}
566
567impl From<DropViewTask> for PbDropViewTask {
568 fn from(task: DropViewTask) -> Self {
569 PbDropViewTask {
570 drop_view: Some(DropViewExpr {
571 catalog_name: task.catalog,
572 schema_name: task.schema,
573 view_name: task.view,
574 view_id: Some(api::v1::TableId { id: task.view_id }),
575 drop_if_exists: task.drop_if_exists,
576 }),
577 }
578 }
579}
580
581#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
582pub struct DropTableTask {
583 pub catalog: String,
584 pub schema: String,
585 pub table: String,
586 pub table_id: TableId,
587 #[serde(default)]
588 pub drop_if_exists: bool,
589}
590
591impl DropTableTask {
592 pub fn table_ref(&self) -> TableReference<'_> {
593 TableReference {
594 catalog: &self.catalog,
595 schema: &self.schema,
596 table: &self.table,
597 }
598 }
599
600 pub fn table_name(&self) -> TableName {
601 TableName {
602 catalog_name: self.catalog.clone(),
603 schema_name: self.schema.clone(),
604 table_name: self.table.clone(),
605 }
606 }
607}
608
609impl TryFrom<PbDropTableTask> for DropTableTask {
610 type Error = error::Error;
611
612 fn try_from(pb: PbDropTableTask) -> Result<Self> {
613 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
614 err_msg: "expected drop table",
615 })?;
616
617 Ok(Self {
618 catalog: drop_table.catalog_name,
619 schema: drop_table.schema_name,
620 table: drop_table.table_name,
621 table_id: drop_table
622 .table_id
623 .context(error::InvalidProtoMsgSnafu {
624 err_msg: "expected table_id",
625 })?
626 .id,
627 drop_if_exists: drop_table.drop_if_exists,
628 })
629 }
630}
631
632impl From<DropTableTask> for PbDropTableTask {
633 fn from(task: DropTableTask) -> Self {
634 PbDropTableTask {
635 drop_table: Some(DropTableExpr {
636 catalog_name: task.catalog,
637 schema_name: task.schema,
638 table_name: task.table,
639 table_id: Some(api::v1::TableId { id: task.table_id }),
640 drop_if_exists: task.drop_if_exists,
641 }),
642 }
643 }
644}
645
646#[derive(Debug, PartialEq, Clone)]
647pub struct CreateTableTask {
648 pub create_table: CreateTableExpr,
649 pub partitions: Vec<Partition>,
650 pub table_info: TableInfo,
651}
652
653impl TryFrom<PbCreateTableTask> for CreateTableTask {
654 type Error = error::Error;
655
656 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
657 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
658
659 Ok(CreateTableTask::new(
660 pb.create_table.context(error::InvalidProtoMsgSnafu {
661 err_msg: "expected create table",
662 })?,
663 pb.partitions,
664 table_info,
665 ))
666 }
667}
668
669impl TryFrom<CreateTableTask> for PbCreateTableTask {
670 type Error = error::Error;
671
672 fn try_from(task: CreateTableTask) -> Result<Self> {
673 Ok(PbCreateTableTask {
674 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
675 create_table: Some(task.create_table),
676 partitions: task.partitions,
677 })
678 }
679}
680
681impl CreateTableTask {
682 pub fn new(
683 expr: CreateTableExpr,
684 partitions: Vec<Partition>,
685 table_info: TableInfo,
686 ) -> CreateTableTask {
687 CreateTableTask {
688 create_table: expr,
689 partitions,
690 table_info,
691 }
692 }
693
694 pub fn table_name(&self) -> TableName {
695 let table = &self.create_table;
696
697 TableName {
698 catalog_name: table.catalog_name.clone(),
699 schema_name: table.schema_name.clone(),
700 table_name: table.table_name.clone(),
701 }
702 }
703
704 pub fn table_ref(&self) -> TableReference<'_> {
705 let table = &self.create_table;
706
707 TableReference {
708 catalog: &table.catalog_name,
709 schema: &table.schema_name,
710 table: &table.table_name,
711 }
712 }
713
714 pub fn set_table_id(&mut self, table_id: TableId) {
716 self.table_info.ident.table_id = table_id;
717 }
718
719 pub fn sort_columns(&mut self) {
724 self.create_table
727 .column_defs
728 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
729
730 self.table_info.sort_columns();
731 }
732}
733
734impl Serialize for CreateTableTask {
735 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
736 where
737 S: serde::Serializer,
738 {
739 let table_info = serde_json::to_vec(&self.table_info)
740 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
741
742 let pb = PbCreateTableTask {
743 create_table: Some(self.create_table.clone()),
744 partitions: self.partitions.clone(),
745 table_info,
746 };
747 let buf = pb.encode_to_vec();
748 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
749 serializer.serialize_str(&encoded)
750 }
751}
752
753impl<'de> Deserialize<'de> for CreateTableTask {
754 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
755 where
756 D: serde::Deserializer<'de>,
757 {
758 let encoded = String::deserialize(deserializer)?;
759 let buf = general_purpose::STANDARD_NO_PAD
760 .decode(encoded)
761 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
762 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
763 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
764
765 let expr = CreateTableTask::try_from(expr)
766 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
767
768 Ok(expr)
769 }
770}
771
772#[derive(Debug, PartialEq, Clone)]
773pub struct AlterTableTask {
774 pub alter_table: AlterTableExpr,
776}
777
778impl AlterTableTask {
779 pub fn validate(&self) -> Result<()> {
780 self.alter_table
781 .kind
782 .as_ref()
783 .context(error::UnexpectedSnafu {
784 err_msg: "'kind' is absent",
785 })?;
786 Ok(())
787 }
788
789 pub fn table_ref(&self) -> TableReference<'_> {
790 TableReference {
791 catalog: &self.alter_table.catalog_name,
792 schema: &self.alter_table.schema_name,
793 table: &self.alter_table.table_name,
794 }
795 }
796
797 pub fn table_name(&self) -> TableName {
798 let table = &self.alter_table;
799
800 TableName {
801 catalog_name: table.catalog_name.clone(),
802 schema_name: table.schema_name.clone(),
803 table_name: table.table_name.clone(),
804 }
805 }
806}
807
808impl TryFrom<PbAlterTableTask> for AlterTableTask {
809 type Error = error::Error;
810
811 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
812 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
813 err_msg: "expected alter_table",
814 })?;
815
816 Ok(AlterTableTask { alter_table })
817 }
818}
819
820impl TryFrom<AlterTableTask> for PbAlterTableTask {
821 type Error = error::Error;
822
823 fn try_from(task: AlterTableTask) -> Result<Self> {
824 Ok(PbAlterTableTask {
825 alter_table: Some(task.alter_table),
826 })
827 }
828}
829
830impl Serialize for AlterTableTask {
831 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
832 where
833 S: serde::Serializer,
834 {
835 let pb = PbAlterTableTask {
836 alter_table: Some(self.alter_table.clone()),
837 };
838 let buf = pb.encode_to_vec();
839 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
840 serializer.serialize_str(&encoded)
841 }
842}
843
844impl<'de> Deserialize<'de> for AlterTableTask {
845 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
846 where
847 D: serde::Deserializer<'de>,
848 {
849 let encoded = String::deserialize(deserializer)?;
850 let buf = general_purpose::STANDARD_NO_PAD
851 .decode(encoded)
852 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
853 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
854 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
855
856 let expr = AlterTableTask::try_from(expr)
857 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
858
859 Ok(expr)
860 }
861}
862
863#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
864pub struct TruncateTableTask {
865 pub catalog: String,
866 pub schema: String,
867 pub table: String,
868 pub table_id: TableId,
869 pub time_ranges: Vec<(Timestamp, Timestamp)>,
870}
871
872impl TruncateTableTask {
873 pub fn table_ref(&self) -> TableReference<'_> {
874 TableReference {
875 catalog: &self.catalog,
876 schema: &self.schema,
877 table: &self.table,
878 }
879 }
880
881 pub fn table_name(&self) -> TableName {
882 TableName {
883 catalog_name: self.catalog.clone(),
884 schema_name: self.schema.clone(),
885 table_name: self.table.clone(),
886 }
887 }
888}
889
890impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
891 type Error = error::Error;
892
893 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
894 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
895 err_msg: "expected truncate table",
896 })?;
897
898 Ok(Self {
899 catalog: truncate_table.catalog_name,
900 schema: truncate_table.schema_name,
901 table: truncate_table.table_name,
902 table_id: truncate_table
903 .table_id
904 .context(error::InvalidProtoMsgSnafu {
905 err_msg: "expected table_id",
906 })?
907 .id,
908 time_ranges: truncate_table
909 .time_ranges
910 .map(from_pb_time_ranges)
911 .transpose()
912 .map_err(BoxedError::new)
913 .context(ExternalSnafu)?
914 .unwrap_or_default(),
915 })
916 }
917}
918
919impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
920 type Error = error::Error;
921
922 fn try_from(task: TruncateTableTask) -> Result<Self> {
923 Ok(PbTruncateTableTask {
924 truncate_table: Some(TruncateTableExpr {
925 catalog_name: task.catalog,
926 schema_name: task.schema,
927 table_name: task.table,
928 table_id: Some(api::v1::TableId { id: task.table_id }),
929 time_ranges: Some(
930 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
931 ),
932 }),
933 })
934 }
935}
936
937#[serde_as]
938#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
939pub struct CreateDatabaseTask {
940 pub catalog: String,
941 pub schema: String,
942 pub create_if_not_exists: bool,
943 #[serde_as(deserialize_as = "DefaultOnNull")]
944 pub options: HashMap<String, String>,
945}
946
947impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
948 type Error = error::Error;
949
950 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
951 let CreateDatabaseExpr {
952 catalog_name,
953 schema_name,
954 create_if_not_exists,
955 options,
956 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
957 err_msg: "expected create database",
958 })?;
959
960 Ok(CreateDatabaseTask {
961 catalog: catalog_name,
962 schema: schema_name,
963 create_if_not_exists,
964 options,
965 })
966 }
967}
968
969impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
970 type Error = error::Error;
971
972 fn try_from(
973 CreateDatabaseTask {
974 catalog,
975 schema,
976 create_if_not_exists,
977 options,
978 }: CreateDatabaseTask,
979 ) -> Result<Self> {
980 Ok(PbCreateDatabaseTask {
981 create_database: Some(CreateDatabaseExpr {
982 catalog_name: catalog,
983 schema_name: schema,
984 create_if_not_exists,
985 options,
986 }),
987 })
988 }
989}
990
991#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
992pub struct DropDatabaseTask {
993 pub catalog: String,
994 pub schema: String,
995 pub drop_if_exists: bool,
996}
997
998impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
999 type Error = error::Error;
1000
1001 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1002 let DropDatabaseExpr {
1003 catalog_name,
1004 schema_name,
1005 drop_if_exists,
1006 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1007 err_msg: "expected drop database",
1008 })?;
1009
1010 Ok(DropDatabaseTask {
1011 catalog: catalog_name,
1012 schema: schema_name,
1013 drop_if_exists,
1014 })
1015 }
1016}
1017
1018impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1019 type Error = error::Error;
1020
1021 fn try_from(
1022 DropDatabaseTask {
1023 catalog,
1024 schema,
1025 drop_if_exists,
1026 }: DropDatabaseTask,
1027 ) -> Result<Self> {
1028 Ok(PbDropDatabaseTask {
1029 drop_database: Some(DropDatabaseExpr {
1030 catalog_name: catalog,
1031 schema_name: schema,
1032 drop_if_exists,
1033 }),
1034 })
1035 }
1036}
1037
1038#[derive(Debug, PartialEq, Clone)]
1039pub struct AlterDatabaseTask {
1040 pub alter_expr: AlterDatabaseExpr,
1041}
1042
1043impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1044 type Error = error::Error;
1045
1046 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1047 Ok(PbAlterDatabaseTask {
1048 task: Some(task.alter_expr),
1049 })
1050 }
1051}
1052
1053impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1054 type Error = error::Error;
1055
1056 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1057 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1058 err_msg: "expected alter database",
1059 })?;
1060
1061 Ok(AlterDatabaseTask { alter_expr })
1062 }
1063}
1064
1065impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1066 type Error = error::Error;
1067
1068 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1069 match pb {
1070 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1071 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1072 options
1073 .set_database_options
1074 .into_iter()
1075 .map(SetDatabaseOption::try_from)
1076 .collect::<Result<Vec<_>>>()?,
1077 )))
1078 }
1079 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1080 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1081 options
1082 .keys
1083 .iter()
1084 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1085 .collect::<Result<Vec<_>>>()?,
1086 )),
1087 ),
1088 }
1089 }
1090}
1091
1092const TTL_KEY: &str = "ttl";
1093
1094impl TryFrom<PbOption> for SetDatabaseOption {
1095 type Error = error::Error;
1096
1097 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1098 let key_lower = key.to_ascii_lowercase();
1099 match key_lower.as_str() {
1100 TTL_KEY => {
1101 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1102 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1103
1104 Ok(SetDatabaseOption::Ttl(ttl))
1105 }
1106 _ => {
1107 if validate_database_option(&key_lower) {
1108 Ok(SetDatabaseOption::Other(key_lower, value))
1109 } else {
1110 InvalidSetDatabaseOptionSnafu { key, value }.fail()
1111 }
1112 }
1113 }
1114 }
1115}
1116
1117#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1118pub enum SetDatabaseOption {
1119 Ttl(DatabaseTimeToLive),
1120 Other(String, String),
1121}
1122
1123#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1124pub enum UnsetDatabaseOption {
1125 Ttl,
1126 Other(String),
1127}
1128
1129impl TryFrom<&str> for UnsetDatabaseOption {
1130 type Error = error::Error;
1131
1132 fn try_from(key: &str) -> Result<Self> {
1133 let key_lower = key.to_ascii_lowercase();
1134 match key_lower.as_str() {
1135 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1136 _ => {
1137 if validate_database_option(&key_lower) {
1138 Ok(UnsetDatabaseOption::Other(key_lower))
1139 } else {
1140 InvalidUnsetDatabaseOptionSnafu { key }.fail()
1141 }
1142 }
1143 }
1144 }
1145}
1146
1147#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1148pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1149
1150#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1151pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1152
1153#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1154pub enum AlterDatabaseKind {
1155 SetDatabaseOptions(SetDatabaseOptions),
1156 UnsetDatabaseOptions(UnsetDatabaseOptions),
1157}
1158
1159impl AlterDatabaseTask {
1160 pub fn catalog(&self) -> &str {
1161 &self.alter_expr.catalog_name
1162 }
1163
1164 pub fn schema(&self) -> &str {
1165 &self.alter_expr.catalog_name
1166 }
1167}
1168
1169#[derive(Debug, Clone, Serialize, Deserialize)]
1171pub struct CreateFlowTask {
1172 pub catalog_name: String,
1173 pub flow_name: String,
1174 pub source_table_names: Vec<TableName>,
1175 pub sink_table_name: TableName,
1176 pub or_replace: bool,
1177 pub create_if_not_exists: bool,
1178 pub expire_after: Option<i64>,
1180 pub eval_interval_secs: Option<i64>,
1181 pub comment: String,
1182 pub sql: String,
1183 pub flow_options: HashMap<String, String>,
1184}
1185
1186impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1187 type Error = error::Error;
1188
1189 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1190 let CreateFlowExpr {
1191 catalog_name,
1192 flow_name,
1193 source_table_names,
1194 sink_table_name,
1195 or_replace,
1196 create_if_not_exists,
1197 expire_after,
1198 eval_interval,
1199 comment,
1200 sql,
1201 flow_options,
1202 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1203 err_msg: "expected create_flow",
1204 })?;
1205
1206 Ok(CreateFlowTask {
1207 catalog_name,
1208 flow_name,
1209 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1210 sink_table_name: sink_table_name
1211 .context(error::InvalidProtoMsgSnafu {
1212 err_msg: "expected sink_table_name",
1213 })?
1214 .into(),
1215 or_replace,
1216 create_if_not_exists,
1217 expire_after: expire_after.map(|e| e.value),
1218 eval_interval_secs: eval_interval.map(|e| e.seconds),
1219 comment,
1220 sql,
1221 flow_options,
1222 })
1223 }
1224}
1225
1226impl From<CreateFlowTask> for PbCreateFlowTask {
1227 fn from(
1228 CreateFlowTask {
1229 catalog_name,
1230 flow_name,
1231 source_table_names,
1232 sink_table_name,
1233 or_replace,
1234 create_if_not_exists,
1235 expire_after,
1236 eval_interval_secs: eval_interval,
1237 comment,
1238 sql,
1239 flow_options,
1240 }: CreateFlowTask,
1241 ) -> Self {
1242 PbCreateFlowTask {
1243 create_flow: Some(CreateFlowExpr {
1244 catalog_name,
1245 flow_name,
1246 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1247 sink_table_name: Some(sink_table_name.into()),
1248 or_replace,
1249 create_if_not_exists,
1250 expire_after: expire_after.map(|value| ExpireAfter { value }),
1251 eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1252 comment,
1253 sql,
1254 flow_options,
1255 }),
1256 }
1257 }
1258}
1259
1260#[derive(Debug, Clone, Serialize, Deserialize)]
1262pub struct DropFlowTask {
1263 pub catalog_name: String,
1264 pub flow_name: String,
1265 pub flow_id: FlowId,
1266 pub drop_if_exists: bool,
1267}
1268
1269impl TryFrom<PbDropFlowTask> for DropFlowTask {
1270 type Error = error::Error;
1271
1272 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1273 let DropFlowExpr {
1274 catalog_name,
1275 flow_name,
1276 flow_id,
1277 drop_if_exists,
1278 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1279 err_msg: "expected drop_flow",
1280 })?;
1281 let flow_id = flow_id
1282 .context(error::InvalidProtoMsgSnafu {
1283 err_msg: "expected flow_id",
1284 })?
1285 .id;
1286 Ok(DropFlowTask {
1287 catalog_name,
1288 flow_name,
1289 flow_id,
1290 drop_if_exists,
1291 })
1292 }
1293}
1294
1295impl From<DropFlowTask> for PbDropFlowTask {
1296 fn from(
1297 DropFlowTask {
1298 catalog_name,
1299 flow_name,
1300 flow_id,
1301 drop_if_exists,
1302 }: DropFlowTask,
1303 ) -> Self {
1304 PbDropFlowTask {
1305 drop_flow: Some(DropFlowExpr {
1306 catalog_name,
1307 flow_name,
1308 flow_id: Some(api::v1::FlowId { id: flow_id }),
1309 drop_if_exists,
1310 }),
1311 }
1312 }
1313}
1314
1315#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1317pub enum CommentObjectId {
1318 Table(TableId),
1319 Flow(FlowId),
1320}
1321
1322#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1324pub struct CommentOnTask {
1325 pub catalog_name: String,
1326 pub schema_name: String,
1327 pub object_type: CommentObjectType,
1328 pub object_name: String,
1329 pub column_name: Option<String>,
1331 pub object_id: Option<CommentObjectId>,
1333 pub comment: Option<String>,
1334}
1335
1336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1337pub enum CommentObjectType {
1338 Table,
1339 Column,
1340 Flow,
1341}
1342
1343impl CommentOnTask {
1344 pub fn table_ref(&self) -> TableReference<'_> {
1345 TableReference {
1346 catalog: &self.catalog_name,
1347 schema: &self.schema_name,
1348 table: &self.object_name,
1349 }
1350 }
1351}
1352
1353impl From<CommentObjectType> for PbCommentObjectType {
1355 fn from(object_type: CommentObjectType) -> Self {
1356 match object_type {
1357 CommentObjectType::Table => PbCommentObjectType::Table,
1358 CommentObjectType::Column => PbCommentObjectType::Column,
1359 CommentObjectType::Flow => PbCommentObjectType::Flow,
1360 }
1361 }
1362}
1363
1364impl TryFrom<i32> for CommentObjectType {
1365 type Error = error::Error;
1366
1367 fn try_from(value: i32) -> Result<Self> {
1368 match value {
1369 0 => Ok(CommentObjectType::Table),
1370 1 => Ok(CommentObjectType::Column),
1371 2 => Ok(CommentObjectType::Flow),
1372 _ => error::InvalidProtoMsgSnafu {
1373 err_msg: format!(
1374 "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1375 value
1376 ),
1377 }
1378 .fail(),
1379 }
1380 }
1381}
1382
1383impl TryFrom<PbCommentOnTask> for CommentOnTask {
1385 type Error = error::Error;
1386
1387 fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1388 let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1389 err_msg: "expected comment_on",
1390 })?;
1391
1392 Ok(CommentOnTask {
1393 catalog_name: comment_on.catalog_name,
1394 schema_name: comment_on.schema_name,
1395 object_type: comment_on.object_type.try_into()?,
1396 object_name: comment_on.object_name,
1397 column_name: if comment_on.column_name.is_empty() {
1398 None
1399 } else {
1400 Some(comment_on.column_name)
1401 },
1402 comment: if comment_on.comment.is_empty() {
1403 None
1404 } else {
1405 Some(comment_on.comment)
1406 },
1407 object_id: None,
1408 })
1409 }
1410}
1411
1412impl From<CommentOnTask> for PbCommentOnTask {
1413 fn from(task: CommentOnTask) -> Self {
1414 let pb_object_type: PbCommentObjectType = task.object_type.into();
1415 PbCommentOnTask {
1416 comment_on: Some(CommentOnExpr {
1417 catalog_name: task.catalog_name,
1418 schema_name: task.schema_name,
1419 object_type: pb_object_type as i32,
1420 object_name: task.object_name,
1421 column_name: task.column_name.unwrap_or_default(),
1422 comment: task.comment.unwrap_or_default(),
1423 }),
1424 }
1425 }
1426}
1427
1428#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1429pub struct QueryContext {
1430 pub current_catalog: String,
1431 pub current_schema: String,
1432 pub timezone: String,
1433 pub extensions: HashMap<String, String>,
1434 pub channel: u8,
1435 #[serde(default)]
1437 pub snapshot_seqs: HashMap<u64, u64>,
1438 #[serde(default)]
1440 pub sst_min_sequences: HashMap<u64, u64>,
1441}
1442
1443impl QueryContext {
1444 pub fn current_catalog(&self) -> &str {
1446 &self.current_catalog
1447 }
1448
1449 pub fn current_schema(&self) -> &str {
1451 &self.current_schema
1452 }
1453
1454 pub fn timezone(&self) -> &str {
1456 &self.timezone
1457 }
1458
1459 pub fn extensions(&self) -> &HashMap<String, String> {
1461 &self.extensions
1462 }
1463
1464 pub fn channel(&self) -> u8 {
1466 self.channel
1467 }
1468
1469 pub fn snapshot_seqs(&self) -> &HashMap<u64, u64> {
1470 &self.snapshot_seqs
1471 }
1472
1473 pub fn sst_min_sequences(&self) -> &HashMap<u64, u64> {
1474 &self.sst_min_sequences
1475 }
1476}
1477
1478#[derive(Debug, Clone, Serialize, PartialEq)]
1482pub struct FlowQueryContext {
1483 pub catalog: String,
1485 pub schema: String,
1487 pub timezone: String,
1489 #[serde(default)]
1491 pub extensions: HashMap<String, String>,
1492 #[serde(default)]
1494 pub channel: u8,
1495 #[serde(default)]
1497 pub snapshot_seqs: HashMap<u64, u64>,
1498 #[serde(default)]
1500 pub sst_min_sequences: HashMap<u64, u64>,
1501}
1502
1503impl<'de> Deserialize<'de> for FlowQueryContext {
1504 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1505 where
1506 D: serde::Deserializer<'de>,
1507 {
1508 #[derive(Deserialize)]
1510 #[serde(untagged)]
1511 enum ContextCompat {
1512 Flow(FlowQueryContextHelper),
1513 Full(QueryContext),
1514 }
1515
1516 #[derive(Deserialize)]
1517 struct FlowQueryContextHelper {
1518 catalog: String,
1519 schema: String,
1520 timezone: String,
1521 #[serde(default)]
1522 extensions: HashMap<String, String>,
1523 #[serde(default)]
1524 channel: u8,
1525 #[serde(default)]
1526 snapshot_seqs: HashMap<u64, u64>,
1527 #[serde(default)]
1528 sst_min_sequences: HashMap<u64, u64>,
1529 }
1530
1531 match ContextCompat::deserialize(deserializer)? {
1532 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1533 catalog: helper.catalog,
1534 schema: helper.schema,
1535 timezone: helper.timezone,
1536 extensions: helper.extensions,
1537 channel: helper.channel,
1538 snapshot_seqs: helper.snapshot_seqs,
1539 sst_min_sequences: helper.sst_min_sequences,
1540 }),
1541 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1542 }
1543 }
1544}
1545
1546impl From<PbQueryContext> for QueryContext {
1547 fn from(pb_ctx: PbQueryContext) -> Self {
1548 let (snapshot_seqs, sst_min_sequences) = pb_ctx
1549 .snapshot_seqs
1550 .map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences))
1551 .unwrap_or_default();
1552
1553 Self {
1554 current_catalog: pb_ctx.current_catalog,
1555 current_schema: pb_ctx.current_schema,
1556 timezone: pb_ctx.timezone,
1557 extensions: pb_ctx.extensions,
1558 channel: pb_ctx.channel as u8,
1559 snapshot_seqs,
1560 sst_min_sequences,
1561 }
1562 }
1563}
1564
1565impl From<QueryContext> for PbQueryContext {
1566 fn from(
1567 QueryContext {
1568 current_catalog,
1569 current_schema,
1570 timezone,
1571 extensions,
1572 channel,
1573 snapshot_seqs,
1574 sst_min_sequences,
1575 }: QueryContext,
1576 ) -> Self {
1577 PbQueryContext {
1578 current_catalog,
1579 current_schema,
1580 timezone,
1581 extensions,
1582 channel: channel as u32,
1583 snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some(
1584 api::v1::SnapshotSequences {
1585 snapshot_seqs,
1586 sst_min_sequences,
1587 },
1588 ),
1589 explain: None,
1590 }
1591 }
1592}
1593
1594impl From<QueryContext> for FlowQueryContext {
1595 fn from(ctx: QueryContext) -> Self {
1596 Self {
1597 catalog: ctx.current_catalog,
1598 schema: ctx.current_schema,
1599 timezone: ctx.timezone,
1600 extensions: ctx.extensions,
1601 channel: ctx.channel,
1602 snapshot_seqs: ctx.snapshot_seqs,
1603 sst_min_sequences: ctx.sst_min_sequences,
1604 }
1605 }
1606}
1607
1608impl From<FlowQueryContext> for QueryContext {
1609 fn from(flow_ctx: FlowQueryContext) -> Self {
1610 Self {
1611 current_catalog: flow_ctx.catalog,
1612 current_schema: flow_ctx.schema,
1613 timezone: flow_ctx.timezone,
1614 extensions: flow_ctx.extensions,
1615 channel: flow_ctx.channel,
1616 snapshot_seqs: flow_ctx.snapshot_seqs,
1617 sst_min_sequences: flow_ctx.sst_min_sequences,
1618 }
1619 }
1620}
1621
1622impl From<FlowQueryContext> for PbQueryContext {
1623 fn from(flow_ctx: FlowQueryContext) -> Self {
1624 let query_ctx: QueryContext = flow_ctx.into();
1625 query_ctx.into()
1626 }
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631 use std::sync::Arc;
1632
1633 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1634 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
1635 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1636 use store_api::storage::ConcreteDataType;
1637 use table::metadata::{TableInfo, TableMeta, TableType};
1638 use table::test_util::table_info::test_table_info;
1639
1640 use super::{AlterTableTask, CreateTableTask, *};
1641
1642 #[test]
1643 fn test_basic_ser_de_create_table_task() {
1644 let schema = SchemaBuilder::default().build().unwrap();
1645 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1646 let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info);
1647
1648 let output = serde_json::to_vec(&task).unwrap();
1649
1650 let de = serde_json::from_slice(&output).unwrap();
1651 assert_eq!(task, de);
1652 }
1653
1654 #[test]
1655 fn test_basic_ser_de_alter_table_task() {
1656 let task = AlterTableTask {
1657 alter_table: AlterTableExpr::default(),
1658 };
1659
1660 let output = serde_json::to_vec(&task).unwrap();
1661
1662 let de = serde_json::from_slice(&output).unwrap();
1663 assert_eq!(task, de);
1664 }
1665
1666 #[test]
1667 fn test_sort_columns() {
1668 let schema = Arc::new(Schema::new(vec![
1670 ColumnSchema::new(
1671 "column3".to_string(),
1672 ConcreteDataType::string_datatype(),
1673 true,
1674 ),
1675 ColumnSchema::new(
1676 "column1".to_string(),
1677 ConcreteDataType::timestamp_millisecond_datatype(),
1678 false,
1679 )
1680 .with_time_index(true),
1681 ColumnSchema::new(
1682 "column2".to_string(),
1683 ConcreteDataType::float64_datatype(),
1684 true,
1685 ),
1686 ]));
1687
1688 let meta = TableMeta {
1690 schema,
1691 primary_key_indices: vec![0],
1692 value_indices: vec![2],
1693 engine: METRIC_ENGINE_NAME.to_string(),
1694 next_column_id: 0,
1695 options: Default::default(),
1696 created_on: Default::default(),
1697 updated_on: Default::default(),
1698 partition_key_indices: Default::default(),
1699 column_ids: Default::default(),
1700 };
1701
1702 let raw_table_info = TableInfo {
1704 ident: Default::default(),
1705 meta,
1706 name: Default::default(),
1707 desc: Default::default(),
1708 catalog_name: Default::default(),
1709 schema_name: Default::default(),
1710 table_type: TableType::Base,
1711 };
1712
1713 let create_table_expr = CreateTableExpr {
1715 column_defs: vec![
1716 ColumnDef {
1717 name: "column3".to_string(),
1718 semantic_type: SemanticType::Tag as i32,
1719 ..Default::default()
1720 },
1721 ColumnDef {
1722 name: "column1".to_string(),
1723 semantic_type: SemanticType::Timestamp as i32,
1724 ..Default::default()
1725 },
1726 ColumnDef {
1727 name: "column2".to_string(),
1728 semantic_type: SemanticType::Field as i32,
1729 ..Default::default()
1730 },
1731 ],
1732 primary_keys: vec!["column3".to_string()],
1733 ..Default::default()
1734 };
1735
1736 let mut create_table_task =
1737 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1738
1739 create_table_task.sort_columns();
1741
1742 assert_eq!(
1744 create_table_task.create_table.column_defs[0].name,
1745 "column1".to_string()
1746 );
1747 assert_eq!(
1748 create_table_task.create_table.column_defs[1].name,
1749 "column2".to_string()
1750 );
1751 assert_eq!(
1752 create_table_task.create_table.column_defs[2].name,
1753 "column3".to_string()
1754 );
1755
1756 assert_eq!(
1758 create_table_task.table_info.meta.schema.timestamp_index(),
1759 Some(0)
1760 );
1761 assert_eq!(
1762 create_table_task.table_info.meta.primary_key_indices,
1763 vec![2]
1764 );
1765 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1766 }
1767
1768 #[test]
1769 fn test_flow_query_context_conversion_from_query_context() {
1770 use std::collections::HashMap;
1771 let mut extensions = HashMap::new();
1772 extensions.insert("key1".to_string(), "value1".to_string());
1773 extensions.insert("key2".to_string(), "value2".to_string());
1774
1775 let query_ctx = QueryContext {
1776 current_catalog: "test_catalog".to_string(),
1777 current_schema: "test_schema".to_string(),
1778 timezone: "UTC".to_string(),
1779 extensions,
1780 channel: 5,
1781 snapshot_seqs: HashMap::from([(10, 100)]),
1782 sst_min_sequences: HashMap::from([(10, 90)]),
1783 };
1784
1785 let flow_ctx: FlowQueryContext = query_ctx.into();
1786
1787 assert_eq!(flow_ctx.catalog, "test_catalog");
1788 assert_eq!(flow_ctx.schema, "test_schema");
1789 assert_eq!(flow_ctx.timezone, "UTC");
1790 assert_eq!(flow_ctx.channel, 5);
1791 assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)]));
1792 assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)]));
1793 }
1794
1795 #[test]
1796 fn test_flow_query_context_conversion_to_query_context() {
1797 let flow_ctx = FlowQueryContext {
1798 catalog: "prod_catalog".to_string(),
1799 schema: "public".to_string(),
1800 timezone: "America/New_York".to_string(),
1801 extensions: HashMap::from([("k".to_string(), "v".to_string())]),
1802 channel: 7,
1803 snapshot_seqs: HashMap::from([(11, 111)]),
1804 sst_min_sequences: HashMap::from([(11, 101)]),
1805 };
1806
1807 let query_ctx: QueryContext = flow_ctx.clone().into();
1808
1809 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1810 assert_eq!(query_ctx.current_schema, "public");
1811 assert_eq!(query_ctx.timezone, "America/New_York");
1812 assert_eq!(
1813 query_ctx.extensions,
1814 HashMap::from([("k".to_string(), "v".to_string())])
1815 );
1816 assert_eq!(query_ctx.channel, 7);
1817 assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)]));
1818 assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)]));
1819
1820 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1822 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1823 }
1824
1825 #[test]
1826 fn test_flow_query_context_serialization() {
1827 let flow_ctx = FlowQueryContext {
1828 catalog: "test_catalog".to_string(),
1829 schema: "test_schema".to_string(),
1830 timezone: "UTC".to_string(),
1831 extensions: HashMap::new(),
1832 channel: 0,
1833 snapshot_seqs: HashMap::new(),
1834 sst_min_sequences: HashMap::new(),
1835 };
1836
1837 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1838 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1839
1840 assert_eq!(flow_ctx, deserialized);
1841
1842 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1844 assert_eq!(json_value["catalog"], "test_catalog");
1845 assert_eq!(json_value["schema"], "test_schema");
1846 assert_eq!(json_value["timezone"], "UTC");
1847 }
1848
1849 #[test]
1850 fn test_flow_query_context_conversion_to_pb() {
1851 let flow_ctx = FlowQueryContext {
1852 catalog: "pb_catalog".to_string(),
1853 schema: "pb_schema".to_string(),
1854 timezone: "Asia/Tokyo".to_string(),
1855 extensions: HashMap::from([("x".to_string(), "y".to_string())]),
1856 channel: 6,
1857 snapshot_seqs: HashMap::from([(3, 30)]),
1858 sst_min_sequences: HashMap::from([(3, 21)]),
1859 };
1860
1861 let pb_ctx: PbQueryContext = flow_ctx.into();
1862
1863 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1864 assert_eq!(pb_ctx.current_schema, "pb_schema");
1865 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1866 assert_eq!(
1867 pb_ctx.extensions,
1868 HashMap::from([("x".to_string(), "y".to_string())])
1869 );
1870 assert_eq!(pb_ctx.channel, 6);
1871 assert_eq!(
1872 pb_ctx.snapshot_seqs,
1873 Some(api::v1::SnapshotSequences {
1874 snapshot_seqs: HashMap::from([(3, 30)]),
1875 sst_min_sequences: HashMap::from([(3, 21)]),
1876 })
1877 );
1878 assert!(pb_ctx.explain.is_none());
1879 }
1880
1881 #[test]
1882 fn test_pb_query_context_roundtrip_with_snapshot_sequences() {
1883 let pb = PbQueryContext {
1884 current_catalog: "c1".to_string(),
1885 current_schema: "s1".to_string(),
1886 timezone: "UTC".to_string(),
1887 extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
1888 channel: 3,
1889 snapshot_seqs: Some(api::v1::SnapshotSequences {
1890 snapshot_seqs: HashMap::from([(1, 100)]),
1891 sst_min_sequences: HashMap::from([(1, 90)]),
1892 }),
1893 explain: None,
1894 };
1895
1896 let query_ctx: QueryContext = pb.clone().into();
1897 let pb_roundtrip: PbQueryContext = query_ctx.into();
1898
1899 assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog);
1900 assert_eq!(pb_roundtrip.current_schema, pb.current_schema);
1901 assert_eq!(pb_roundtrip.timezone, pb.timezone);
1902 assert_eq!(pb_roundtrip.extensions, pb.extensions);
1903 assert_eq!(pb_roundtrip.channel, pb.channel);
1904 assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs);
1905 }
1906}