1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27 AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28 CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29 CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30 CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31 DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32 DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33 DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34 TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37 AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38 CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39 DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40 QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{TableId, TableInfo};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57 InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61pub const ORIGIN_FRONTEND_ADDR_EXTENSION_KEY: &str = "__greptime_origin_frontend.addr";
63
64#[derive(Debug, Clone)]
66pub enum DdlTask {
67 CreateTable(CreateTableTask),
68 DropTable(DropTableTask),
69 AlterTable(AlterTableTask),
70 TruncateTable(TruncateTableTask),
71 CreateLogicalTables(Vec<CreateTableTask>),
72 DropLogicalTables(Vec<DropTableTask>),
73 AlterLogicalTables(Vec<AlterTableTask>),
74 CreateDatabase(CreateDatabaseTask),
75 DropDatabase(DropDatabaseTask),
76 AlterDatabase(AlterDatabaseTask),
77 CreateFlow(CreateFlowTask),
78 DropFlow(DropFlowTask),
79 #[cfg(feature = "enterprise")]
80 DropTrigger(trigger::DropTriggerTask),
81 CreateView(CreateViewTask),
82 DropView(DropViewTask),
83 #[cfg(feature = "enterprise")]
84 CreateTrigger(trigger::CreateTriggerTask),
85 CommentOn(CommentOnTask),
86}
87
88impl DdlTask {
89 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
91 DdlTask::CreateFlow(expr)
92 }
93
94 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
96 DdlTask::DropFlow(expr)
97 }
98
99 pub fn new_drop_view(expr: DropViewTask) -> Self {
101 DdlTask::DropView(expr)
102 }
103
104 pub fn new_create_table(
106 expr: CreateTableExpr,
107 partitions: Vec<Partition>,
108 table_info: TableInfo,
109 ) -> Self {
110 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
111 }
112
113 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self {
115 DdlTask::CreateLogicalTables(
116 table_data
117 .into_iter()
118 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
119 .collect(),
120 )
121 }
122
123 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
125 DdlTask::AlterLogicalTables(
126 table_data
127 .into_iter()
128 .map(|alter_table| AlterTableTask { alter_table })
129 .collect(),
130 )
131 }
132
133 pub fn new_drop_table(
135 catalog: String,
136 schema: String,
137 table: String,
138 table_id: TableId,
139 drop_if_exists: bool,
140 ) -> Self {
141 DdlTask::DropTable(DropTableTask {
142 catalog,
143 schema,
144 table,
145 table_id,
146 drop_if_exists,
147 })
148 }
149
150 pub fn new_create_database(
152 catalog: String,
153 schema: String,
154 create_if_not_exists: bool,
155 options: HashMap<String, String>,
156 ) -> Self {
157 DdlTask::CreateDatabase(CreateDatabaseTask {
158 catalog,
159 schema,
160 create_if_not_exists,
161 options,
162 })
163 }
164
165 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
167 DdlTask::DropDatabase(DropDatabaseTask {
168 catalog,
169 schema,
170 drop_if_exists,
171 })
172 }
173
174 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
176 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
177 }
178
179 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
181 DdlTask::AlterTable(AlterTableTask { alter_table })
182 }
183
184 pub fn new_truncate_table(
186 catalog: String,
187 schema: String,
188 table: String,
189 table_id: TableId,
190 time_ranges: Vec<(Timestamp, Timestamp)>,
191 ) -> Self {
192 DdlTask::TruncateTable(TruncateTableTask {
193 catalog,
194 schema,
195 table,
196 table_id,
197 time_ranges,
198 })
199 }
200
201 pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self {
203 DdlTask::CreateView(CreateViewTask {
204 create_view,
205 view_info,
206 })
207 }
208
209 pub fn new_comment_on(task: CommentOnTask) -> Self {
211 DdlTask::CommentOn(task)
212 }
213}
214
215impl TryFrom<Task> for DdlTask {
216 type Error = error::Error;
217 fn try_from(task: Task) -> Result<Self> {
218 match task {
219 Task::CreateTableTask(create_table) => {
220 Ok(DdlTask::CreateTable(create_table.try_into()?))
221 }
222 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
223 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
224 Task::TruncateTableTask(truncate_table) => {
225 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
226 }
227 Task::CreateTableTasks(create_tables) => {
228 let tasks = create_tables
229 .tasks
230 .into_iter()
231 .map(|task| task.try_into())
232 .collect::<Result<Vec<_>>>()?;
233
234 Ok(DdlTask::CreateLogicalTables(tasks))
235 }
236 Task::DropTableTasks(drop_tables) => {
237 let tasks = drop_tables
238 .tasks
239 .into_iter()
240 .map(|task| task.try_into())
241 .collect::<Result<Vec<_>>>()?;
242
243 Ok(DdlTask::DropLogicalTables(tasks))
244 }
245 Task::AlterTableTasks(alter_tables) => {
246 let tasks = alter_tables
247 .tasks
248 .into_iter()
249 .map(|task| task.try_into())
250 .collect::<Result<Vec<_>>>()?;
251
252 Ok(DdlTask::AlterLogicalTables(tasks))
253 }
254 Task::CreateDatabaseTask(create_database) => {
255 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
256 }
257 Task::DropDatabaseTask(drop_database) => {
258 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
259 }
260 Task::AlterDatabaseTask(alter_database) => {
261 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
262 }
263 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
264 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
265 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
266 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
267 Task::CreateTriggerTask(create_trigger) => {
268 #[cfg(feature = "enterprise")]
269 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
270 #[cfg(not(feature = "enterprise"))]
271 {
272 let _ = create_trigger;
273 crate::error::UnsupportedSnafu {
274 operation: "create trigger",
275 }
276 .fail()
277 }
278 }
279 Task::DropTriggerTask(drop_trigger) => {
280 #[cfg(feature = "enterprise")]
281 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
282 #[cfg(not(feature = "enterprise"))]
283 {
284 let _ = drop_trigger;
285 crate::error::UnsupportedSnafu {
286 operation: "drop trigger",
287 }
288 .fail()
289 }
290 }
291 Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
292 }
293 }
294}
295
296#[derive(Clone)]
297pub struct SubmitDdlTaskRequest {
298 pub query_context: QueryContext,
299 pub wait: bool,
300 pub timeout: Duration,
301 pub task: DdlTask,
302}
303
304impl SubmitDdlTaskRequest {
305 pub fn new(query_context: QueryContext, task: DdlTask) -> Self {
307 Self {
308 query_context,
309 wait: Self::default_wait(),
310 timeout: Self::default_timeout(),
311 task,
312 }
313 }
314
315 pub fn default_timeout() -> Duration {
317 Duration::from_secs(60)
318 }
319
320 pub fn default_wait() -> bool {
322 true
323 }
324}
325
326impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
327 type Error = error::Error;
328
329 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
330 let task = match request.task {
331 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
332 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
333 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
334 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
335 DdlTask::CreateLogicalTables(tasks) => {
336 let tasks = tasks
337 .into_iter()
338 .map(|task| task.try_into())
339 .collect::<Result<Vec<_>>>()?;
340
341 Task::CreateTableTasks(PbCreateTableTasks { tasks })
342 }
343 DdlTask::DropLogicalTables(tasks) => {
344 let tasks = tasks
345 .into_iter()
346 .map(|task| task.into())
347 .collect::<Vec<_>>();
348
349 Task::DropTableTasks(PbDropTableTasks { tasks })
350 }
351 DdlTask::AlterLogicalTables(tasks) => {
352 let tasks = tasks
353 .into_iter()
354 .map(|task| task.try_into())
355 .collect::<Result<Vec<_>>>()?;
356
357 Task::AlterTableTasks(PbAlterTableTasks { tasks })
358 }
359 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
360 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
361 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
362 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
363 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
364 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
365 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
366 #[cfg(feature = "enterprise")]
367 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
368 #[cfg(feature = "enterprise")]
369 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
370 DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
371 };
372
373 Ok(Self {
374 header: None,
375 query_context: Some(request.query_context.into()),
376 timeout_secs: request.timeout.as_secs() as u32,
377 wait: request.wait,
378 task: Some(task),
379 })
380 }
381}
382
383#[derive(Debug, Default)]
384pub struct SubmitDdlTaskResponse {
385 pub key: Vec<u8>,
386 pub table_ids: Vec<TableId>,
388}
389
390impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
391 type Error = error::Error;
392
393 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
394 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
395 Ok(Self {
396 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
397 table_ids,
398 })
399 }
400}
401
402impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
403 fn from(val: SubmitDdlTaskResponse) -> Self {
404 Self {
405 pid: Some(ProcedureId { key: val.key }),
406 table_ids: val
407 .table_ids
408 .into_iter()
409 .map(|id| api::v1::TableId { id })
410 .collect(),
411 ..Default::default()
412 }
413 }
414}
415
416#[derive(Debug, PartialEq, Clone)]
418pub struct CreateViewTask {
419 pub create_view: CreateViewExpr,
420 pub view_info: TableInfo,
421}
422
423impl CreateViewTask {
424 pub fn table_ref(&self) -> TableReference<'_> {
426 TableReference {
427 catalog: &self.create_view.catalog_name,
428 schema: &self.create_view.schema_name,
429 table: &self.create_view.view_name,
430 }
431 }
432
433 pub fn raw_logical_plan(&self) -> &Vec<u8> {
435 &self.create_view.logical_plan
436 }
437
438 pub fn view_definition(&self) -> &str {
440 &self.create_view.definition
441 }
442
443 pub fn table_names(&self) -> HashSet<TableName> {
445 self.create_view
446 .table_names
447 .iter()
448 .map(|t| t.clone().into())
449 .collect()
450 }
451
452 pub fn columns(&self) -> &Vec<String> {
454 &self.create_view.columns
455 }
456
457 pub fn plan_columns(&self) -> &Vec<String> {
459 &self.create_view.plan_columns
460 }
461}
462
463impl TryFrom<PbCreateViewTask> for CreateViewTask {
464 type Error = error::Error;
465
466 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
467 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
468
469 Ok(CreateViewTask {
470 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
471 err_msg: "expected create view",
472 })?,
473 view_info,
474 })
475 }
476}
477
478impl TryFrom<CreateViewTask> for PbCreateViewTask {
479 type Error = error::Error;
480
481 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
482 Ok(PbCreateViewTask {
483 create_view: Some(task.create_view),
484 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
485 })
486 }
487}
488
489impl Serialize for CreateViewTask {
490 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
491 where
492 S: serde::Serializer,
493 {
494 let view_info = serde_json::to_vec(&self.view_info)
495 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
496
497 let pb = PbCreateViewTask {
498 create_view: Some(self.create_view.clone()),
499 view_info,
500 };
501 let buf = pb.encode_to_vec();
502 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
503 serializer.serialize_str(&encoded)
504 }
505}
506
507impl<'de> Deserialize<'de> for CreateViewTask {
508 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
509 where
510 D: serde::Deserializer<'de>,
511 {
512 let encoded = String::deserialize(deserializer)?;
513 let buf = general_purpose::STANDARD_NO_PAD
514 .decode(encoded)
515 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
516 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
517 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
518
519 let expr = CreateViewTask::try_from(expr)
520 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
521
522 Ok(expr)
523 }
524}
525
526#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
528pub struct DropViewTask {
529 pub catalog: String,
530 pub schema: String,
531 pub view: String,
532 pub view_id: TableId,
533 pub drop_if_exists: bool,
534}
535
536impl DropViewTask {
537 pub fn table_ref(&self) -> TableReference<'_> {
539 TableReference {
540 catalog: &self.catalog,
541 schema: &self.schema,
542 table: &self.view,
543 }
544 }
545}
546
547impl TryFrom<PbDropViewTask> for DropViewTask {
548 type Error = error::Error;
549
550 fn try_from(pb: PbDropViewTask) -> Result<Self> {
551 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
552 err_msg: "expected drop view",
553 })?;
554
555 Ok(DropViewTask {
556 catalog: expr.catalog_name,
557 schema: expr.schema_name,
558 view: expr.view_name,
559 view_id: expr
560 .view_id
561 .context(error::InvalidProtoMsgSnafu {
562 err_msg: "expected view_id",
563 })?
564 .id,
565 drop_if_exists: expr.drop_if_exists,
566 })
567 }
568}
569
570impl From<DropViewTask> for PbDropViewTask {
571 fn from(task: DropViewTask) -> Self {
572 PbDropViewTask {
573 drop_view: Some(DropViewExpr {
574 catalog_name: task.catalog,
575 schema_name: task.schema,
576 view_name: task.view,
577 view_id: Some(api::v1::TableId { id: task.view_id }),
578 drop_if_exists: task.drop_if_exists,
579 }),
580 }
581 }
582}
583
584#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
585pub struct DropTableTask {
586 pub catalog: String,
587 pub schema: String,
588 pub table: String,
589 pub table_id: TableId,
590 #[serde(default)]
591 pub drop_if_exists: bool,
592}
593
594impl DropTableTask {
595 pub fn table_ref(&self) -> TableReference<'_> {
596 TableReference {
597 catalog: &self.catalog,
598 schema: &self.schema,
599 table: &self.table,
600 }
601 }
602
603 pub fn table_name(&self) -> TableName {
604 TableName {
605 catalog_name: self.catalog.clone(),
606 schema_name: self.schema.clone(),
607 table_name: self.table.clone(),
608 }
609 }
610}
611
612impl TryFrom<PbDropTableTask> for DropTableTask {
613 type Error = error::Error;
614
615 fn try_from(pb: PbDropTableTask) -> Result<Self> {
616 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
617 err_msg: "expected drop table",
618 })?;
619
620 Ok(Self {
621 catalog: drop_table.catalog_name,
622 schema: drop_table.schema_name,
623 table: drop_table.table_name,
624 table_id: drop_table
625 .table_id
626 .context(error::InvalidProtoMsgSnafu {
627 err_msg: "expected table_id",
628 })?
629 .id,
630 drop_if_exists: drop_table.drop_if_exists,
631 })
632 }
633}
634
635impl From<DropTableTask> for PbDropTableTask {
636 fn from(task: DropTableTask) -> Self {
637 PbDropTableTask {
638 drop_table: Some(DropTableExpr {
639 catalog_name: task.catalog,
640 schema_name: task.schema,
641 table_name: task.table,
642 table_id: Some(api::v1::TableId { id: task.table_id }),
643 drop_if_exists: task.drop_if_exists,
644 }),
645 }
646 }
647}
648
649#[derive(Debug, PartialEq, Clone)]
650pub struct CreateTableTask {
651 pub create_table: CreateTableExpr,
652 pub partitions: Vec<Partition>,
653 pub table_info: TableInfo,
654}
655
656impl TryFrom<PbCreateTableTask> for CreateTableTask {
657 type Error = error::Error;
658
659 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
660 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
661
662 Ok(CreateTableTask::new(
663 pb.create_table.context(error::InvalidProtoMsgSnafu {
664 err_msg: "expected create table",
665 })?,
666 pb.partitions,
667 table_info,
668 ))
669 }
670}
671
672impl TryFrom<CreateTableTask> for PbCreateTableTask {
673 type Error = error::Error;
674
675 fn try_from(task: CreateTableTask) -> Result<Self> {
676 Ok(PbCreateTableTask {
677 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
678 create_table: Some(task.create_table),
679 partitions: task.partitions,
680 })
681 }
682}
683
684impl CreateTableTask {
685 pub fn new(
686 expr: CreateTableExpr,
687 partitions: Vec<Partition>,
688 table_info: TableInfo,
689 ) -> CreateTableTask {
690 CreateTableTask {
691 create_table: expr,
692 partitions,
693 table_info,
694 }
695 }
696
697 pub fn table_name(&self) -> TableName {
698 let table = &self.create_table;
699
700 TableName {
701 catalog_name: table.catalog_name.clone(),
702 schema_name: table.schema_name.clone(),
703 table_name: table.table_name.clone(),
704 }
705 }
706
707 pub fn table_ref(&self) -> TableReference<'_> {
708 let table = &self.create_table;
709
710 TableReference {
711 catalog: &table.catalog_name,
712 schema: &table.schema_name,
713 table: &table.table_name,
714 }
715 }
716
717 pub fn set_table_id(&mut self, table_id: TableId) {
719 self.table_info.ident.table_id = table_id;
720 }
721
722 pub fn sort_columns(&mut self) {
727 self.create_table
730 .column_defs
731 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
732
733 self.table_info.sort_columns();
734 }
735}
736
737impl Serialize for CreateTableTask {
738 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
739 where
740 S: serde::Serializer,
741 {
742 let table_info = serde_json::to_vec(&self.table_info)
743 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
744
745 let pb = PbCreateTableTask {
746 create_table: Some(self.create_table.clone()),
747 partitions: self.partitions.clone(),
748 table_info,
749 };
750 let buf = pb.encode_to_vec();
751 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
752 serializer.serialize_str(&encoded)
753 }
754}
755
756impl<'de> Deserialize<'de> for CreateTableTask {
757 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
758 where
759 D: serde::Deserializer<'de>,
760 {
761 let encoded = String::deserialize(deserializer)?;
762 let buf = general_purpose::STANDARD_NO_PAD
763 .decode(encoded)
764 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
765 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
766 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
767
768 let expr = CreateTableTask::try_from(expr)
769 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
770
771 Ok(expr)
772 }
773}
774
775#[derive(Debug, PartialEq, Clone)]
776pub struct AlterTableTask {
777 pub alter_table: AlterTableExpr,
779}
780
781impl AlterTableTask {
782 pub fn validate(&self) -> Result<()> {
783 self.alter_table
784 .kind
785 .as_ref()
786 .context(error::UnexpectedSnafu {
787 err_msg: "'kind' is absent",
788 })?;
789 Ok(())
790 }
791
792 pub fn table_ref(&self) -> TableReference<'_> {
793 TableReference {
794 catalog: &self.alter_table.catalog_name,
795 schema: &self.alter_table.schema_name,
796 table: &self.alter_table.table_name,
797 }
798 }
799
800 pub fn table_name(&self) -> TableName {
801 let table = &self.alter_table;
802
803 TableName {
804 catalog_name: table.catalog_name.clone(),
805 schema_name: table.schema_name.clone(),
806 table_name: table.table_name.clone(),
807 }
808 }
809}
810
811impl TryFrom<PbAlterTableTask> for AlterTableTask {
812 type Error = error::Error;
813
814 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
815 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
816 err_msg: "expected alter_table",
817 })?;
818
819 Ok(AlterTableTask { alter_table })
820 }
821}
822
823impl TryFrom<AlterTableTask> for PbAlterTableTask {
824 type Error = error::Error;
825
826 fn try_from(task: AlterTableTask) -> Result<Self> {
827 Ok(PbAlterTableTask {
828 alter_table: Some(task.alter_table),
829 })
830 }
831}
832
833impl Serialize for AlterTableTask {
834 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
835 where
836 S: serde::Serializer,
837 {
838 let pb = PbAlterTableTask {
839 alter_table: Some(self.alter_table.clone()),
840 };
841 let buf = pb.encode_to_vec();
842 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
843 serializer.serialize_str(&encoded)
844 }
845}
846
847impl<'de> Deserialize<'de> for AlterTableTask {
848 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
849 where
850 D: serde::Deserializer<'de>,
851 {
852 let encoded = String::deserialize(deserializer)?;
853 let buf = general_purpose::STANDARD_NO_PAD
854 .decode(encoded)
855 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
856 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
857 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
858
859 let expr = AlterTableTask::try_from(expr)
860 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
861
862 Ok(expr)
863 }
864}
865
866#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
867pub struct TruncateTableTask {
868 pub catalog: String,
869 pub schema: String,
870 pub table: String,
871 pub table_id: TableId,
872 pub time_ranges: Vec<(Timestamp, Timestamp)>,
873}
874
875impl TruncateTableTask {
876 pub fn table_ref(&self) -> TableReference<'_> {
877 TableReference {
878 catalog: &self.catalog,
879 schema: &self.schema,
880 table: &self.table,
881 }
882 }
883
884 pub fn table_name(&self) -> TableName {
885 TableName {
886 catalog_name: self.catalog.clone(),
887 schema_name: self.schema.clone(),
888 table_name: self.table.clone(),
889 }
890 }
891}
892
893impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
894 type Error = error::Error;
895
896 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
897 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
898 err_msg: "expected truncate table",
899 })?;
900
901 Ok(Self {
902 catalog: truncate_table.catalog_name,
903 schema: truncate_table.schema_name,
904 table: truncate_table.table_name,
905 table_id: truncate_table
906 .table_id
907 .context(error::InvalidProtoMsgSnafu {
908 err_msg: "expected table_id",
909 })?
910 .id,
911 time_ranges: truncate_table
912 .time_ranges
913 .map(from_pb_time_ranges)
914 .transpose()
915 .map_err(BoxedError::new)
916 .context(ExternalSnafu)?
917 .unwrap_or_default(),
918 })
919 }
920}
921
922impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
923 type Error = error::Error;
924
925 fn try_from(task: TruncateTableTask) -> Result<Self> {
926 Ok(PbTruncateTableTask {
927 truncate_table: Some(TruncateTableExpr {
928 catalog_name: task.catalog,
929 schema_name: task.schema,
930 table_name: task.table,
931 table_id: Some(api::v1::TableId { id: task.table_id }),
932 time_ranges: Some(
933 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
934 ),
935 }),
936 })
937 }
938}
939
940#[serde_as]
941#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
942pub struct CreateDatabaseTask {
943 pub catalog: String,
944 pub schema: String,
945 pub create_if_not_exists: bool,
946 #[serde_as(deserialize_as = "DefaultOnNull")]
947 pub options: HashMap<String, String>,
948}
949
950impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
951 type Error = error::Error;
952
953 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
954 let CreateDatabaseExpr {
955 catalog_name,
956 schema_name,
957 create_if_not_exists,
958 options,
959 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
960 err_msg: "expected create database",
961 })?;
962
963 Ok(CreateDatabaseTask {
964 catalog: catalog_name,
965 schema: schema_name,
966 create_if_not_exists,
967 options,
968 })
969 }
970}
971
972impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
973 type Error = error::Error;
974
975 fn try_from(
976 CreateDatabaseTask {
977 catalog,
978 schema,
979 create_if_not_exists,
980 options,
981 }: CreateDatabaseTask,
982 ) -> Result<Self> {
983 Ok(PbCreateDatabaseTask {
984 create_database: Some(CreateDatabaseExpr {
985 catalog_name: catalog,
986 schema_name: schema,
987 create_if_not_exists,
988 options,
989 }),
990 })
991 }
992}
993
994#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
995pub struct DropDatabaseTask {
996 pub catalog: String,
997 pub schema: String,
998 pub drop_if_exists: bool,
999}
1000
1001impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
1002 type Error = error::Error;
1003
1004 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1005 let DropDatabaseExpr {
1006 catalog_name,
1007 schema_name,
1008 drop_if_exists,
1009 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1010 err_msg: "expected drop database",
1011 })?;
1012
1013 Ok(DropDatabaseTask {
1014 catalog: catalog_name,
1015 schema: schema_name,
1016 drop_if_exists,
1017 })
1018 }
1019}
1020
1021impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1022 type Error = error::Error;
1023
1024 fn try_from(
1025 DropDatabaseTask {
1026 catalog,
1027 schema,
1028 drop_if_exists,
1029 }: DropDatabaseTask,
1030 ) -> Result<Self> {
1031 Ok(PbDropDatabaseTask {
1032 drop_database: Some(DropDatabaseExpr {
1033 catalog_name: catalog,
1034 schema_name: schema,
1035 drop_if_exists,
1036 }),
1037 })
1038 }
1039}
1040
1041#[derive(Debug, PartialEq, Clone)]
1042pub struct AlterDatabaseTask {
1043 pub alter_expr: AlterDatabaseExpr,
1044}
1045
1046impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1047 type Error = error::Error;
1048
1049 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1050 Ok(PbAlterDatabaseTask {
1051 task: Some(task.alter_expr),
1052 })
1053 }
1054}
1055
1056impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1057 type Error = error::Error;
1058
1059 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1060 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1061 err_msg: "expected alter database",
1062 })?;
1063
1064 Ok(AlterDatabaseTask { alter_expr })
1065 }
1066}
1067
1068impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1069 type Error = error::Error;
1070
1071 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1072 match pb {
1073 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1074 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1075 options
1076 .set_database_options
1077 .into_iter()
1078 .map(SetDatabaseOption::try_from)
1079 .collect::<Result<Vec<_>>>()?,
1080 )))
1081 }
1082 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1083 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1084 options
1085 .keys
1086 .iter()
1087 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1088 .collect::<Result<Vec<_>>>()?,
1089 )),
1090 ),
1091 }
1092 }
1093}
1094
1095const TTL_KEY: &str = "ttl";
1096
1097impl TryFrom<PbOption> for SetDatabaseOption {
1098 type Error = error::Error;
1099
1100 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1101 let key_lower = key.to_ascii_lowercase();
1102 match key_lower.as_str() {
1103 TTL_KEY => {
1104 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1105 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1106
1107 Ok(SetDatabaseOption::Ttl(ttl))
1108 }
1109 _ => {
1110 if validate_database_option(&key_lower) {
1111 Ok(SetDatabaseOption::Other(key_lower, value))
1112 } else {
1113 InvalidSetDatabaseOptionSnafu { key, value }.fail()
1114 }
1115 }
1116 }
1117 }
1118}
1119
1120#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1121pub enum SetDatabaseOption {
1122 Ttl(DatabaseTimeToLive),
1123 Other(String, String),
1124}
1125
1126#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1127pub enum UnsetDatabaseOption {
1128 Ttl,
1129 Other(String),
1130}
1131
1132impl TryFrom<&str> for UnsetDatabaseOption {
1133 type Error = error::Error;
1134
1135 fn try_from(key: &str) -> Result<Self> {
1136 let key_lower = key.to_ascii_lowercase();
1137 match key_lower.as_str() {
1138 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1139 _ => {
1140 if validate_database_option(&key_lower) {
1141 Ok(UnsetDatabaseOption::Other(key_lower))
1142 } else {
1143 InvalidUnsetDatabaseOptionSnafu { key }.fail()
1144 }
1145 }
1146 }
1147 }
1148}
1149
1150#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1151pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1152
1153#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1154pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1155
1156#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1157pub enum AlterDatabaseKind {
1158 SetDatabaseOptions(SetDatabaseOptions),
1159 UnsetDatabaseOptions(UnsetDatabaseOptions),
1160}
1161
1162impl AlterDatabaseTask {
1163 pub fn catalog(&self) -> &str {
1164 &self.alter_expr.catalog_name
1165 }
1166
1167 pub fn schema(&self) -> &str {
1168 &self.alter_expr.catalog_name
1169 }
1170}
1171
1172#[derive(Debug, Clone, Serialize, Deserialize)]
1174pub struct CreateFlowTask {
1175 pub catalog_name: String,
1176 pub flow_name: String,
1177 pub source_table_names: Vec<TableName>,
1178 pub sink_table_name: TableName,
1179 pub or_replace: bool,
1180 pub create_if_not_exists: bool,
1181 pub expire_after: Option<i64>,
1183 pub eval_interval_secs: Option<i64>,
1184 pub comment: String,
1185 pub sql: String,
1186 pub flow_options: HashMap<String, String>,
1187}
1188
1189impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1190 type Error = error::Error;
1191
1192 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1193 let CreateFlowExpr {
1194 catalog_name,
1195 flow_name,
1196 source_table_names,
1197 sink_table_name,
1198 or_replace,
1199 create_if_not_exists,
1200 expire_after,
1201 eval_interval,
1202 comment,
1203 sql,
1204 flow_options,
1205 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1206 err_msg: "expected create_flow",
1207 })?;
1208
1209 Ok(CreateFlowTask {
1210 catalog_name,
1211 flow_name,
1212 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1213 sink_table_name: sink_table_name
1214 .context(error::InvalidProtoMsgSnafu {
1215 err_msg: "expected sink_table_name",
1216 })?
1217 .into(),
1218 or_replace,
1219 create_if_not_exists,
1220 expire_after: expire_after.map(|e| e.value),
1221 eval_interval_secs: eval_interval.map(|e| e.seconds),
1222 comment,
1223 sql,
1224 flow_options,
1225 })
1226 }
1227}
1228
1229impl From<CreateFlowTask> for PbCreateFlowTask {
1230 fn from(
1231 CreateFlowTask {
1232 catalog_name,
1233 flow_name,
1234 source_table_names,
1235 sink_table_name,
1236 or_replace,
1237 create_if_not_exists,
1238 expire_after,
1239 eval_interval_secs: eval_interval,
1240 comment,
1241 sql,
1242 flow_options,
1243 }: CreateFlowTask,
1244 ) -> Self {
1245 PbCreateFlowTask {
1246 create_flow: Some(CreateFlowExpr {
1247 catalog_name,
1248 flow_name,
1249 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1250 sink_table_name: Some(sink_table_name.into()),
1251 or_replace,
1252 create_if_not_exists,
1253 expire_after: expire_after.map(|value| ExpireAfter { value }),
1254 eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1255 comment,
1256 sql,
1257 flow_options,
1258 }),
1259 }
1260 }
1261}
1262
1263#[derive(Debug, Clone, Serialize, Deserialize)]
1265pub struct DropFlowTask {
1266 pub catalog_name: String,
1267 pub flow_name: String,
1268 pub flow_id: FlowId,
1269 pub drop_if_exists: bool,
1270}
1271
1272impl TryFrom<PbDropFlowTask> for DropFlowTask {
1273 type Error = error::Error;
1274
1275 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1276 let DropFlowExpr {
1277 catalog_name,
1278 flow_name,
1279 flow_id,
1280 drop_if_exists,
1281 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1282 err_msg: "expected drop_flow",
1283 })?;
1284 let flow_id = flow_id
1285 .context(error::InvalidProtoMsgSnafu {
1286 err_msg: "expected flow_id",
1287 })?
1288 .id;
1289 Ok(DropFlowTask {
1290 catalog_name,
1291 flow_name,
1292 flow_id,
1293 drop_if_exists,
1294 })
1295 }
1296}
1297
1298impl From<DropFlowTask> for PbDropFlowTask {
1299 fn from(
1300 DropFlowTask {
1301 catalog_name,
1302 flow_name,
1303 flow_id,
1304 drop_if_exists,
1305 }: DropFlowTask,
1306 ) -> Self {
1307 PbDropFlowTask {
1308 drop_flow: Some(DropFlowExpr {
1309 catalog_name,
1310 flow_name,
1311 flow_id: Some(api::v1::FlowId { id: flow_id }),
1312 drop_if_exists,
1313 }),
1314 }
1315 }
1316}
1317
1318#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1320pub enum CommentObjectId {
1321 Table(TableId),
1322 Flow(FlowId),
1323}
1324
1325#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1327pub struct CommentOnTask {
1328 pub catalog_name: String,
1329 pub schema_name: String,
1330 pub object_type: CommentObjectType,
1331 pub object_name: String,
1332 pub column_name: Option<String>,
1334 pub object_id: Option<CommentObjectId>,
1336 pub comment: Option<String>,
1337}
1338
1339#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1340pub enum CommentObjectType {
1341 Table,
1342 Column,
1343 Flow,
1344}
1345
1346impl CommentOnTask {
1347 pub fn table_ref(&self) -> TableReference<'_> {
1348 TableReference {
1349 catalog: &self.catalog_name,
1350 schema: &self.schema_name,
1351 table: &self.object_name,
1352 }
1353 }
1354}
1355
1356impl From<CommentObjectType> for PbCommentObjectType {
1358 fn from(object_type: CommentObjectType) -> Self {
1359 match object_type {
1360 CommentObjectType::Table => PbCommentObjectType::Table,
1361 CommentObjectType::Column => PbCommentObjectType::Column,
1362 CommentObjectType::Flow => PbCommentObjectType::Flow,
1363 }
1364 }
1365}
1366
1367impl TryFrom<i32> for CommentObjectType {
1368 type Error = error::Error;
1369
1370 fn try_from(value: i32) -> Result<Self> {
1371 match value {
1372 0 => Ok(CommentObjectType::Table),
1373 1 => Ok(CommentObjectType::Column),
1374 2 => Ok(CommentObjectType::Flow),
1375 _ => error::InvalidProtoMsgSnafu {
1376 err_msg: format!(
1377 "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1378 value
1379 ),
1380 }
1381 .fail(),
1382 }
1383 }
1384}
1385
1386impl TryFrom<PbCommentOnTask> for CommentOnTask {
1388 type Error = error::Error;
1389
1390 fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1391 let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1392 err_msg: "expected comment_on",
1393 })?;
1394
1395 Ok(CommentOnTask {
1396 catalog_name: comment_on.catalog_name,
1397 schema_name: comment_on.schema_name,
1398 object_type: comment_on.object_type.try_into()?,
1399 object_name: comment_on.object_name,
1400 column_name: if comment_on.column_name.is_empty() {
1401 None
1402 } else {
1403 Some(comment_on.column_name)
1404 },
1405 comment: if comment_on.comment.is_empty() {
1406 None
1407 } else {
1408 Some(comment_on.comment)
1409 },
1410 object_id: None,
1411 })
1412 }
1413}
1414
1415impl From<CommentOnTask> for PbCommentOnTask {
1416 fn from(task: CommentOnTask) -> Self {
1417 let pb_object_type: PbCommentObjectType = task.object_type.into();
1418 PbCommentOnTask {
1419 comment_on: Some(CommentOnExpr {
1420 catalog_name: task.catalog_name,
1421 schema_name: task.schema_name,
1422 object_type: pb_object_type as i32,
1423 object_name: task.object_name,
1424 column_name: task.column_name.unwrap_or_default(),
1425 comment: task.comment.unwrap_or_default(),
1426 }),
1427 }
1428 }
1429}
1430
1431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1432pub struct QueryContext {
1433 pub current_catalog: String,
1434 pub current_schema: String,
1435 pub timezone: String,
1436 pub extensions: HashMap<String, String>,
1437 pub channel: u8,
1438 #[serde(default)]
1440 pub snapshot_seqs: HashMap<u64, u64>,
1441 #[serde(default)]
1443 pub sst_min_sequences: HashMap<u64, u64>,
1444}
1445
1446impl QueryContext {
1447 pub fn current_catalog(&self) -> &str {
1449 &self.current_catalog
1450 }
1451
1452 pub fn current_schema(&self) -> &str {
1454 &self.current_schema
1455 }
1456
1457 pub fn timezone(&self) -> &str {
1459 &self.timezone
1460 }
1461
1462 pub fn extensions(&self) -> &HashMap<String, String> {
1464 &self.extensions
1465 }
1466
1467 pub fn channel(&self) -> u8 {
1469 self.channel
1470 }
1471
1472 pub fn snapshot_seqs(&self) -> &HashMap<u64, u64> {
1473 &self.snapshot_seqs
1474 }
1475
1476 pub fn sst_min_sequences(&self) -> &HashMap<u64, u64> {
1477 &self.sst_min_sequences
1478 }
1479}
1480
1481#[derive(Debug, Clone, Serialize, PartialEq)]
1485pub struct FlowQueryContext {
1486 pub catalog: String,
1488 pub schema: String,
1490 pub timezone: String,
1492 #[serde(default)]
1494 pub extensions: HashMap<String, String>,
1495 #[serde(default)]
1497 pub channel: u8,
1498 #[serde(default)]
1500 pub snapshot_seqs: HashMap<u64, u64>,
1501 #[serde(default)]
1503 pub sst_min_sequences: HashMap<u64, u64>,
1504}
1505
1506impl<'de> Deserialize<'de> for FlowQueryContext {
1507 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1508 where
1509 D: serde::Deserializer<'de>,
1510 {
1511 #[derive(Deserialize)]
1513 #[serde(untagged)]
1514 enum ContextCompat {
1515 Flow(FlowQueryContextHelper),
1516 Full(QueryContext),
1517 }
1518
1519 #[derive(Deserialize)]
1520 struct FlowQueryContextHelper {
1521 catalog: String,
1522 schema: String,
1523 timezone: String,
1524 #[serde(default)]
1525 extensions: HashMap<String, String>,
1526 #[serde(default)]
1527 channel: u8,
1528 #[serde(default)]
1529 snapshot_seqs: HashMap<u64, u64>,
1530 #[serde(default)]
1531 sst_min_sequences: HashMap<u64, u64>,
1532 }
1533
1534 match ContextCompat::deserialize(deserializer)? {
1535 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1536 catalog: helper.catalog,
1537 schema: helper.schema,
1538 timezone: helper.timezone,
1539 extensions: helper.extensions,
1540 channel: helper.channel,
1541 snapshot_seqs: helper.snapshot_seqs,
1542 sst_min_sequences: helper.sst_min_sequences,
1543 }),
1544 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1545 }
1546 }
1547}
1548
1549impl From<PbQueryContext> for QueryContext {
1550 fn from(pb_ctx: PbQueryContext) -> Self {
1551 let (snapshot_seqs, sst_min_sequences) = pb_ctx
1552 .snapshot_seqs
1553 .map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences))
1554 .unwrap_or_default();
1555
1556 Self {
1557 current_catalog: pb_ctx.current_catalog,
1558 current_schema: pb_ctx.current_schema,
1559 timezone: pb_ctx.timezone,
1560 extensions: pb_ctx.extensions,
1561 channel: pb_ctx.channel as u8,
1562 snapshot_seqs,
1563 sst_min_sequences,
1564 }
1565 }
1566}
1567
1568impl From<QueryContext> for PbQueryContext {
1569 fn from(
1570 QueryContext {
1571 current_catalog,
1572 current_schema,
1573 timezone,
1574 extensions,
1575 channel,
1576 snapshot_seqs,
1577 sst_min_sequences,
1578 }: QueryContext,
1579 ) -> Self {
1580 PbQueryContext {
1581 current_catalog,
1582 current_schema,
1583 timezone,
1584 extensions,
1585 channel: channel as u32,
1586 snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some(
1587 api::v1::SnapshotSequences {
1588 snapshot_seqs,
1589 sst_min_sequences,
1590 },
1591 ),
1592 explain: None,
1593 }
1594 }
1595}
1596
1597impl From<QueryContext> for FlowQueryContext {
1598 fn from(ctx: QueryContext) -> Self {
1599 Self {
1600 catalog: ctx.current_catalog,
1601 schema: ctx.current_schema,
1602 timezone: ctx.timezone,
1603 extensions: ctx.extensions,
1604 channel: ctx.channel,
1605 snapshot_seqs: ctx.snapshot_seqs,
1606 sst_min_sequences: ctx.sst_min_sequences,
1607 }
1608 }
1609}
1610
1611impl From<FlowQueryContext> for QueryContext {
1612 fn from(flow_ctx: FlowQueryContext) -> Self {
1613 Self {
1614 current_catalog: flow_ctx.catalog,
1615 current_schema: flow_ctx.schema,
1616 timezone: flow_ctx.timezone,
1617 extensions: flow_ctx.extensions,
1618 channel: flow_ctx.channel,
1619 snapshot_seqs: flow_ctx.snapshot_seqs,
1620 sst_min_sequences: flow_ctx.sst_min_sequences,
1621 }
1622 }
1623}
1624
1625impl From<FlowQueryContext> for PbQueryContext {
1626 fn from(flow_ctx: FlowQueryContext) -> Self {
1627 let query_ctx: QueryContext = flow_ctx.into();
1628 query_ctx.into()
1629 }
1630}
1631
1632#[cfg(test)]
1633mod tests {
1634 use std::sync::Arc;
1635
1636 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1637 use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
1638 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1639 use store_api::storage::ConcreteDataType;
1640 use table::metadata::{TableInfo, TableMeta, TableType};
1641 use table::test_util::table_info::test_table_info;
1642
1643 use super::{AlterTableTask, CreateTableTask, *};
1644
1645 #[test]
1646 fn test_basic_ser_de_create_table_task() {
1647 let schema = SchemaBuilder::default().build().unwrap();
1648 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1649 let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info);
1650
1651 let output = serde_json::to_vec(&task).unwrap();
1652
1653 let de = serde_json::from_slice(&output).unwrap();
1654 assert_eq!(task, de);
1655 }
1656
1657 #[test]
1658 fn test_basic_ser_de_alter_table_task() {
1659 let task = AlterTableTask {
1660 alter_table: AlterTableExpr::default(),
1661 };
1662
1663 let output = serde_json::to_vec(&task).unwrap();
1664
1665 let de = serde_json::from_slice(&output).unwrap();
1666 assert_eq!(task, de);
1667 }
1668
1669 #[test]
1670 fn test_sort_columns() {
1671 let schema = Arc::new(Schema::new(vec![
1673 ColumnSchema::new(
1674 "column3".to_string(),
1675 ConcreteDataType::string_datatype(),
1676 true,
1677 ),
1678 ColumnSchema::new(
1679 "column1".to_string(),
1680 ConcreteDataType::timestamp_millisecond_datatype(),
1681 false,
1682 )
1683 .with_time_index(true),
1684 ColumnSchema::new(
1685 "column2".to_string(),
1686 ConcreteDataType::float64_datatype(),
1687 true,
1688 ),
1689 ]));
1690
1691 let meta = TableMeta {
1693 schema,
1694 primary_key_indices: vec![0],
1695 value_indices: vec![2],
1696 engine: METRIC_ENGINE_NAME.to_string(),
1697 next_column_id: 0,
1698 options: Default::default(),
1699 created_on: Default::default(),
1700 updated_on: Default::default(),
1701 partition_key_indices: Default::default(),
1702 column_ids: Default::default(),
1703 };
1704
1705 let raw_table_info = TableInfo {
1707 ident: Default::default(),
1708 meta,
1709 name: Default::default(),
1710 desc: Default::default(),
1711 catalog_name: Default::default(),
1712 schema_name: Default::default(),
1713 table_type: TableType::Base,
1714 };
1715
1716 let create_table_expr = CreateTableExpr {
1718 column_defs: vec![
1719 ColumnDef {
1720 name: "column3".to_string(),
1721 semantic_type: SemanticType::Tag as i32,
1722 ..Default::default()
1723 },
1724 ColumnDef {
1725 name: "column1".to_string(),
1726 semantic_type: SemanticType::Timestamp as i32,
1727 ..Default::default()
1728 },
1729 ColumnDef {
1730 name: "column2".to_string(),
1731 semantic_type: SemanticType::Field as i32,
1732 ..Default::default()
1733 },
1734 ],
1735 primary_keys: vec!["column3".to_string()],
1736 ..Default::default()
1737 };
1738
1739 let mut create_table_task =
1740 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1741
1742 create_table_task.sort_columns();
1744
1745 assert_eq!(
1747 create_table_task.create_table.column_defs[0].name,
1748 "column1".to_string()
1749 );
1750 assert_eq!(
1751 create_table_task.create_table.column_defs[1].name,
1752 "column2".to_string()
1753 );
1754 assert_eq!(
1755 create_table_task.create_table.column_defs[2].name,
1756 "column3".to_string()
1757 );
1758
1759 assert_eq!(
1761 create_table_task.table_info.meta.schema.timestamp_index(),
1762 Some(0)
1763 );
1764 assert_eq!(
1765 create_table_task.table_info.meta.primary_key_indices,
1766 vec![2]
1767 );
1768 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1769 }
1770
1771 #[test]
1772 fn test_flow_query_context_conversion_from_query_context() {
1773 use std::collections::HashMap;
1774 let mut extensions = HashMap::new();
1775 extensions.insert("key1".to_string(), "value1".to_string());
1776 extensions.insert("key2".to_string(), "value2".to_string());
1777
1778 let query_ctx = QueryContext {
1779 current_catalog: "test_catalog".to_string(),
1780 current_schema: "test_schema".to_string(),
1781 timezone: "UTC".to_string(),
1782 extensions,
1783 channel: 5,
1784 snapshot_seqs: HashMap::from([(10, 100)]),
1785 sst_min_sequences: HashMap::from([(10, 90)]),
1786 };
1787
1788 let flow_ctx: FlowQueryContext = query_ctx.into();
1789
1790 assert_eq!(flow_ctx.catalog, "test_catalog");
1791 assert_eq!(flow_ctx.schema, "test_schema");
1792 assert_eq!(flow_ctx.timezone, "UTC");
1793 assert_eq!(flow_ctx.channel, 5);
1794 assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)]));
1795 assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)]));
1796 }
1797
1798 #[test]
1799 fn test_flow_query_context_conversion_to_query_context() {
1800 let flow_ctx = FlowQueryContext {
1801 catalog: "prod_catalog".to_string(),
1802 schema: "public".to_string(),
1803 timezone: "America/New_York".to_string(),
1804 extensions: HashMap::from([("k".to_string(), "v".to_string())]),
1805 channel: 7,
1806 snapshot_seqs: HashMap::from([(11, 111)]),
1807 sst_min_sequences: HashMap::from([(11, 101)]),
1808 };
1809
1810 let query_ctx: QueryContext = flow_ctx.clone().into();
1811
1812 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1813 assert_eq!(query_ctx.current_schema, "public");
1814 assert_eq!(query_ctx.timezone, "America/New_York");
1815 assert_eq!(
1816 query_ctx.extensions,
1817 HashMap::from([("k".to_string(), "v".to_string())])
1818 );
1819 assert_eq!(query_ctx.channel, 7);
1820 assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)]));
1821 assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)]));
1822
1823 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1825 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1826 }
1827
1828 #[test]
1829 fn test_flow_query_context_serialization() {
1830 let flow_ctx = FlowQueryContext {
1831 catalog: "test_catalog".to_string(),
1832 schema: "test_schema".to_string(),
1833 timezone: "UTC".to_string(),
1834 extensions: HashMap::new(),
1835 channel: 0,
1836 snapshot_seqs: HashMap::new(),
1837 sst_min_sequences: HashMap::new(),
1838 };
1839
1840 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1841 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1842
1843 assert_eq!(flow_ctx, deserialized);
1844
1845 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1847 assert_eq!(json_value["catalog"], "test_catalog");
1848 assert_eq!(json_value["schema"], "test_schema");
1849 assert_eq!(json_value["timezone"], "UTC");
1850 }
1851
1852 #[test]
1853 fn test_flow_query_context_conversion_to_pb() {
1854 let flow_ctx = FlowQueryContext {
1855 catalog: "pb_catalog".to_string(),
1856 schema: "pb_schema".to_string(),
1857 timezone: "Asia/Tokyo".to_string(),
1858 extensions: HashMap::from([("x".to_string(), "y".to_string())]),
1859 channel: 6,
1860 snapshot_seqs: HashMap::from([(3, 30)]),
1861 sst_min_sequences: HashMap::from([(3, 21)]),
1862 };
1863
1864 let pb_ctx: PbQueryContext = flow_ctx.into();
1865
1866 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1867 assert_eq!(pb_ctx.current_schema, "pb_schema");
1868 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1869 assert_eq!(
1870 pb_ctx.extensions,
1871 HashMap::from([("x".to_string(), "y".to_string())])
1872 );
1873 assert_eq!(pb_ctx.channel, 6);
1874 assert_eq!(
1875 pb_ctx.snapshot_seqs,
1876 Some(api::v1::SnapshotSequences {
1877 snapshot_seqs: HashMap::from([(3, 30)]),
1878 sst_min_sequences: HashMap::from([(3, 21)]),
1879 })
1880 );
1881 assert!(pb_ctx.explain.is_none());
1882 }
1883
1884 #[test]
1885 fn test_pb_query_context_roundtrip_with_snapshot_sequences() {
1886 let pb = PbQueryContext {
1887 current_catalog: "c1".to_string(),
1888 current_schema: "s1".to_string(),
1889 timezone: "UTC".to_string(),
1890 extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
1891 channel: 3,
1892 snapshot_seqs: Some(api::v1::SnapshotSequences {
1893 snapshot_seqs: HashMap::from([(1, 100)]),
1894 sst_min_sequences: HashMap::from([(1, 90)]),
1895 }),
1896 explain: None,
1897 };
1898
1899 let query_ctx: QueryContext = pb.clone().into();
1900 let pb_roundtrip: PbQueryContext = query_ctx.into();
1901
1902 assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog);
1903 assert_eq!(pb_roundtrip.current_schema, pb.current_schema);
1904 assert_eq!(pb_roundtrip.timezone, pb.timezone);
1905 assert_eq!(pb_roundtrip.extensions, pb.extensions);
1906 assert_eq!(pb_roundtrip.channel, pb.channel);
1907 assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs);
1908 }
1909}