1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27 AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28 CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29 CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30 CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31 DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32 DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33 DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34 TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37 AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38 CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39 DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40 QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use session::context::{QueryContextBuilder, QueryContextRef};
50use snafu::{OptionExt, ResultExt};
51use table::metadata::{RawTableInfo, TableId};
52use table::requests::validate_database_option;
53use table::table_name::TableName;
54use table::table_reference::TableReference;
55
56use crate::error::{
57 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
58 InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
59};
60use crate::key::FlowId;
61
62#[derive(Debug, Clone)]
64pub enum DdlTask {
65 CreateTable(CreateTableTask),
66 DropTable(DropTableTask),
67 AlterTable(AlterTableTask),
68 TruncateTable(TruncateTableTask),
69 CreateLogicalTables(Vec<CreateTableTask>),
70 DropLogicalTables(Vec<DropTableTask>),
71 AlterLogicalTables(Vec<AlterTableTask>),
72 CreateDatabase(CreateDatabaseTask),
73 DropDatabase(DropDatabaseTask),
74 AlterDatabase(AlterDatabaseTask),
75 CreateFlow(CreateFlowTask),
76 DropFlow(DropFlowTask),
77 #[cfg(feature = "enterprise")]
78 DropTrigger(trigger::DropTriggerTask),
79 CreateView(CreateViewTask),
80 DropView(DropViewTask),
81 #[cfg(feature = "enterprise")]
82 CreateTrigger(trigger::CreateTriggerTask),
83 CommentOn(CommentOnTask),
84}
85
86impl DdlTask {
87 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
89 DdlTask::CreateFlow(expr)
90 }
91
92 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
94 DdlTask::DropFlow(expr)
95 }
96
97 pub fn new_drop_view(expr: DropViewTask) -> Self {
99 DdlTask::DropView(expr)
100 }
101
102 pub fn new_create_table(
104 expr: CreateTableExpr,
105 partitions: Vec<Partition>,
106 table_info: RawTableInfo,
107 ) -> Self {
108 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
109 }
110
111 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
113 DdlTask::CreateLogicalTables(
114 table_data
115 .into_iter()
116 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
117 .collect(),
118 )
119 }
120
121 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
123 DdlTask::AlterLogicalTables(
124 table_data
125 .into_iter()
126 .map(|alter_table| AlterTableTask { alter_table })
127 .collect(),
128 )
129 }
130
131 pub fn new_drop_table(
133 catalog: String,
134 schema: String,
135 table: String,
136 table_id: TableId,
137 drop_if_exists: bool,
138 ) -> Self {
139 DdlTask::DropTable(DropTableTask {
140 catalog,
141 schema,
142 table,
143 table_id,
144 drop_if_exists,
145 })
146 }
147
148 pub fn new_create_database(
150 catalog: String,
151 schema: String,
152 create_if_not_exists: bool,
153 options: HashMap<String, String>,
154 ) -> Self {
155 DdlTask::CreateDatabase(CreateDatabaseTask {
156 catalog,
157 schema,
158 create_if_not_exists,
159 options,
160 })
161 }
162
163 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
165 DdlTask::DropDatabase(DropDatabaseTask {
166 catalog,
167 schema,
168 drop_if_exists,
169 })
170 }
171
172 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
174 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
175 }
176
177 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
179 DdlTask::AlterTable(AlterTableTask { alter_table })
180 }
181
182 pub fn new_truncate_table(
184 catalog: String,
185 schema: String,
186 table: String,
187 table_id: TableId,
188 time_ranges: Vec<(Timestamp, Timestamp)>,
189 ) -> Self {
190 DdlTask::TruncateTable(TruncateTableTask {
191 catalog,
192 schema,
193 table,
194 table_id,
195 time_ranges,
196 })
197 }
198
199 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
201 DdlTask::CreateView(CreateViewTask {
202 create_view,
203 view_info,
204 })
205 }
206
207 pub fn new_comment_on(task: CommentOnTask) -> Self {
209 DdlTask::CommentOn(task)
210 }
211}
212
213impl TryFrom<Task> for DdlTask {
214 type Error = error::Error;
215 fn try_from(task: Task) -> Result<Self> {
216 match task {
217 Task::CreateTableTask(create_table) => {
218 Ok(DdlTask::CreateTable(create_table.try_into()?))
219 }
220 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
221 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
222 Task::TruncateTableTask(truncate_table) => {
223 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
224 }
225 Task::CreateTableTasks(create_tables) => {
226 let tasks = create_tables
227 .tasks
228 .into_iter()
229 .map(|task| task.try_into())
230 .collect::<Result<Vec<_>>>()?;
231
232 Ok(DdlTask::CreateLogicalTables(tasks))
233 }
234 Task::DropTableTasks(drop_tables) => {
235 let tasks = drop_tables
236 .tasks
237 .into_iter()
238 .map(|task| task.try_into())
239 .collect::<Result<Vec<_>>>()?;
240
241 Ok(DdlTask::DropLogicalTables(tasks))
242 }
243 Task::AlterTableTasks(alter_tables) => {
244 let tasks = alter_tables
245 .tasks
246 .into_iter()
247 .map(|task| task.try_into())
248 .collect::<Result<Vec<_>>>()?;
249
250 Ok(DdlTask::AlterLogicalTables(tasks))
251 }
252 Task::CreateDatabaseTask(create_database) => {
253 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
254 }
255 Task::DropDatabaseTask(drop_database) => {
256 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
257 }
258 Task::AlterDatabaseTask(alter_database) => {
259 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
260 }
261 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
262 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
263 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
264 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
265 Task::CreateTriggerTask(create_trigger) => {
266 #[cfg(feature = "enterprise")]
267 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
268 #[cfg(not(feature = "enterprise"))]
269 {
270 let _ = create_trigger;
271 crate::error::UnsupportedSnafu {
272 operation: "create trigger",
273 }
274 .fail()
275 }
276 }
277 Task::DropTriggerTask(drop_trigger) => {
278 #[cfg(feature = "enterprise")]
279 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
280 #[cfg(not(feature = "enterprise"))]
281 {
282 let _ = drop_trigger;
283 crate::error::UnsupportedSnafu {
284 operation: "drop trigger",
285 }
286 .fail()
287 }
288 }
289 Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
290 }
291 }
292}
293
294#[derive(Clone)]
295pub struct SubmitDdlTaskRequest {
296 pub query_context: QueryContextRef,
297 pub wait: bool,
298 pub timeout: Duration,
299 pub task: DdlTask,
300}
301
302impl SubmitDdlTaskRequest {
303 pub fn new(query_context: QueryContextRef, task: DdlTask) -> Self {
305 Self {
306 query_context,
307 wait: Self::default_wait(),
308 timeout: Self::default_timeout(),
309 task,
310 }
311 }
312
313 pub fn default_timeout() -> Duration {
315 Duration::from_secs(60)
316 }
317
318 pub fn default_wait() -> bool {
320 true
321 }
322}
323
324impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
325 type Error = error::Error;
326
327 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
328 let task = match request.task {
329 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
330 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
331 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
332 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
333 DdlTask::CreateLogicalTables(tasks) => {
334 let tasks = tasks
335 .into_iter()
336 .map(|task| task.try_into())
337 .collect::<Result<Vec<_>>>()?;
338
339 Task::CreateTableTasks(PbCreateTableTasks { tasks })
340 }
341 DdlTask::DropLogicalTables(tasks) => {
342 let tasks = tasks
343 .into_iter()
344 .map(|task| task.into())
345 .collect::<Vec<_>>();
346
347 Task::DropTableTasks(PbDropTableTasks { tasks })
348 }
349 DdlTask::AlterLogicalTables(tasks) => {
350 let tasks = tasks
351 .into_iter()
352 .map(|task| task.try_into())
353 .collect::<Result<Vec<_>>>()?;
354
355 Task::AlterTableTasks(PbAlterTableTasks { tasks })
356 }
357 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
358 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
359 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
360 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
361 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
362 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
363 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
364 #[cfg(feature = "enterprise")]
365 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
366 #[cfg(feature = "enterprise")]
367 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
368 DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
369 };
370
371 Ok(Self {
372 header: None,
373 query_context: Some((*request.query_context).clone().into()),
374 timeout_secs: request.timeout.as_secs() as u32,
375 wait: request.wait,
376 task: Some(task),
377 })
378 }
379}
380
381#[derive(Debug, Default)]
382pub struct SubmitDdlTaskResponse {
383 pub key: Vec<u8>,
384 pub table_ids: Vec<TableId>,
386}
387
388impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
389 type Error = error::Error;
390
391 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
392 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
393 Ok(Self {
394 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
395 table_ids,
396 })
397 }
398}
399
400impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
401 fn from(val: SubmitDdlTaskResponse) -> Self {
402 Self {
403 pid: Some(ProcedureId { key: val.key }),
404 table_ids: val
405 .table_ids
406 .into_iter()
407 .map(|id| api::v1::TableId { id })
408 .collect(),
409 ..Default::default()
410 }
411 }
412}
413
414#[derive(Debug, PartialEq, Clone)]
416pub struct CreateViewTask {
417 pub create_view: CreateViewExpr,
418 pub view_info: RawTableInfo,
419}
420
421impl CreateViewTask {
422 pub fn table_ref(&self) -> TableReference<'_> {
424 TableReference {
425 catalog: &self.create_view.catalog_name,
426 schema: &self.create_view.schema_name,
427 table: &self.create_view.view_name,
428 }
429 }
430
431 pub fn raw_logical_plan(&self) -> &Vec<u8> {
433 &self.create_view.logical_plan
434 }
435
436 pub fn view_definition(&self) -> &str {
438 &self.create_view.definition
439 }
440
441 pub fn table_names(&self) -> HashSet<TableName> {
443 self.create_view
444 .table_names
445 .iter()
446 .map(|t| t.clone().into())
447 .collect()
448 }
449
450 pub fn columns(&self) -> &Vec<String> {
452 &self.create_view.columns
453 }
454
455 pub fn plan_columns(&self) -> &Vec<String> {
457 &self.create_view.plan_columns
458 }
459}
460
461impl TryFrom<PbCreateViewTask> for CreateViewTask {
462 type Error = error::Error;
463
464 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
465 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
466
467 Ok(CreateViewTask {
468 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
469 err_msg: "expected create view",
470 })?,
471 view_info,
472 })
473 }
474}
475
476impl TryFrom<CreateViewTask> for PbCreateViewTask {
477 type Error = error::Error;
478
479 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
480 Ok(PbCreateViewTask {
481 create_view: Some(task.create_view),
482 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
483 })
484 }
485}
486
487impl Serialize for CreateViewTask {
488 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
489 where
490 S: serde::Serializer,
491 {
492 let view_info = serde_json::to_vec(&self.view_info)
493 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
494
495 let pb = PbCreateViewTask {
496 create_view: Some(self.create_view.clone()),
497 view_info,
498 };
499 let buf = pb.encode_to_vec();
500 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
501 serializer.serialize_str(&encoded)
502 }
503}
504
505impl<'de> Deserialize<'de> for CreateViewTask {
506 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
507 where
508 D: serde::Deserializer<'de>,
509 {
510 let encoded = String::deserialize(deserializer)?;
511 let buf = general_purpose::STANDARD_NO_PAD
512 .decode(encoded)
513 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
514 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
515 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
516
517 let expr = CreateViewTask::try_from(expr)
518 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
519
520 Ok(expr)
521 }
522}
523
524#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
526pub struct DropViewTask {
527 pub catalog: String,
528 pub schema: String,
529 pub view: String,
530 pub view_id: TableId,
531 pub drop_if_exists: bool,
532}
533
534impl DropViewTask {
535 pub fn table_ref(&self) -> TableReference<'_> {
537 TableReference {
538 catalog: &self.catalog,
539 schema: &self.schema,
540 table: &self.view,
541 }
542 }
543}
544
545impl TryFrom<PbDropViewTask> for DropViewTask {
546 type Error = error::Error;
547
548 fn try_from(pb: PbDropViewTask) -> Result<Self> {
549 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
550 err_msg: "expected drop view",
551 })?;
552
553 Ok(DropViewTask {
554 catalog: expr.catalog_name,
555 schema: expr.schema_name,
556 view: expr.view_name,
557 view_id: expr
558 .view_id
559 .context(error::InvalidProtoMsgSnafu {
560 err_msg: "expected view_id",
561 })?
562 .id,
563 drop_if_exists: expr.drop_if_exists,
564 })
565 }
566}
567
568impl From<DropViewTask> for PbDropViewTask {
569 fn from(task: DropViewTask) -> Self {
570 PbDropViewTask {
571 drop_view: Some(DropViewExpr {
572 catalog_name: task.catalog,
573 schema_name: task.schema,
574 view_name: task.view,
575 view_id: Some(api::v1::TableId { id: task.view_id }),
576 drop_if_exists: task.drop_if_exists,
577 }),
578 }
579 }
580}
581
582#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
583pub struct DropTableTask {
584 pub catalog: String,
585 pub schema: String,
586 pub table: String,
587 pub table_id: TableId,
588 #[serde(default)]
589 pub drop_if_exists: bool,
590}
591
592impl DropTableTask {
593 pub fn table_ref(&self) -> TableReference<'_> {
594 TableReference {
595 catalog: &self.catalog,
596 schema: &self.schema,
597 table: &self.table,
598 }
599 }
600
601 pub fn table_name(&self) -> TableName {
602 TableName {
603 catalog_name: self.catalog.clone(),
604 schema_name: self.schema.clone(),
605 table_name: self.table.clone(),
606 }
607 }
608}
609
610impl TryFrom<PbDropTableTask> for DropTableTask {
611 type Error = error::Error;
612
613 fn try_from(pb: PbDropTableTask) -> Result<Self> {
614 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
615 err_msg: "expected drop table",
616 })?;
617
618 Ok(Self {
619 catalog: drop_table.catalog_name,
620 schema: drop_table.schema_name,
621 table: drop_table.table_name,
622 table_id: drop_table
623 .table_id
624 .context(error::InvalidProtoMsgSnafu {
625 err_msg: "expected table_id",
626 })?
627 .id,
628 drop_if_exists: drop_table.drop_if_exists,
629 })
630 }
631}
632
633impl From<DropTableTask> for PbDropTableTask {
634 fn from(task: DropTableTask) -> Self {
635 PbDropTableTask {
636 drop_table: Some(DropTableExpr {
637 catalog_name: task.catalog,
638 schema_name: task.schema,
639 table_name: task.table,
640 table_id: Some(api::v1::TableId { id: task.table_id }),
641 drop_if_exists: task.drop_if_exists,
642 }),
643 }
644 }
645}
646
647#[derive(Debug, PartialEq, Clone)]
648pub struct CreateTableTask {
649 pub create_table: CreateTableExpr,
650 pub partitions: Vec<Partition>,
651 pub table_info: RawTableInfo,
652}
653
654impl TryFrom<PbCreateTableTask> for CreateTableTask {
655 type Error = error::Error;
656
657 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
658 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
659
660 Ok(CreateTableTask::new(
661 pb.create_table.context(error::InvalidProtoMsgSnafu {
662 err_msg: "expected create table",
663 })?,
664 pb.partitions,
665 table_info,
666 ))
667 }
668}
669
670impl TryFrom<CreateTableTask> for PbCreateTableTask {
671 type Error = error::Error;
672
673 fn try_from(task: CreateTableTask) -> Result<Self> {
674 Ok(PbCreateTableTask {
675 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
676 create_table: Some(task.create_table),
677 partitions: task.partitions,
678 })
679 }
680}
681
682impl CreateTableTask {
683 pub fn new(
684 expr: CreateTableExpr,
685 partitions: Vec<Partition>,
686 table_info: RawTableInfo,
687 ) -> CreateTableTask {
688 CreateTableTask {
689 create_table: expr,
690 partitions,
691 table_info,
692 }
693 }
694
695 pub fn table_name(&self) -> TableName {
696 let table = &self.create_table;
697
698 TableName {
699 catalog_name: table.catalog_name.clone(),
700 schema_name: table.schema_name.clone(),
701 table_name: table.table_name.clone(),
702 }
703 }
704
705 pub fn table_ref(&self) -> TableReference<'_> {
706 let table = &self.create_table;
707
708 TableReference {
709 catalog: &table.catalog_name,
710 schema: &table.schema_name,
711 table: &table.table_name,
712 }
713 }
714
715 pub fn set_table_id(&mut self, table_id: TableId) {
717 self.table_info.ident.table_id = table_id;
718 }
719
720 pub fn sort_columns(&mut self) {
725 self.create_table
728 .column_defs
729 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
730
731 self.table_info.sort_columns();
732 }
733}
734
735impl Serialize for CreateTableTask {
736 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
737 where
738 S: serde::Serializer,
739 {
740 let table_info = serde_json::to_vec(&self.table_info)
741 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
742
743 let pb = PbCreateTableTask {
744 create_table: Some(self.create_table.clone()),
745 partitions: self.partitions.clone(),
746 table_info,
747 };
748 let buf = pb.encode_to_vec();
749 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
750 serializer.serialize_str(&encoded)
751 }
752}
753
754impl<'de> Deserialize<'de> for CreateTableTask {
755 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
756 where
757 D: serde::Deserializer<'de>,
758 {
759 let encoded = String::deserialize(deserializer)?;
760 let buf = general_purpose::STANDARD_NO_PAD
761 .decode(encoded)
762 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
763 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
764 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
765
766 let expr = CreateTableTask::try_from(expr)
767 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
768
769 Ok(expr)
770 }
771}
772
773#[derive(Debug, PartialEq, Clone)]
774pub struct AlterTableTask {
775 pub alter_table: AlterTableExpr,
777}
778
779impl AlterTableTask {
780 pub fn validate(&self) -> Result<()> {
781 self.alter_table
782 .kind
783 .as_ref()
784 .context(error::UnexpectedSnafu {
785 err_msg: "'kind' is absent",
786 })?;
787 Ok(())
788 }
789
790 pub fn table_ref(&self) -> TableReference<'_> {
791 TableReference {
792 catalog: &self.alter_table.catalog_name,
793 schema: &self.alter_table.schema_name,
794 table: &self.alter_table.table_name,
795 }
796 }
797
798 pub fn table_name(&self) -> TableName {
799 let table = &self.alter_table;
800
801 TableName {
802 catalog_name: table.catalog_name.clone(),
803 schema_name: table.schema_name.clone(),
804 table_name: table.table_name.clone(),
805 }
806 }
807}
808
809impl TryFrom<PbAlterTableTask> for AlterTableTask {
810 type Error = error::Error;
811
812 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
813 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
814 err_msg: "expected alter_table",
815 })?;
816
817 Ok(AlterTableTask { alter_table })
818 }
819}
820
821impl TryFrom<AlterTableTask> for PbAlterTableTask {
822 type Error = error::Error;
823
824 fn try_from(task: AlterTableTask) -> Result<Self> {
825 Ok(PbAlterTableTask {
826 alter_table: Some(task.alter_table),
827 })
828 }
829}
830
831impl Serialize for AlterTableTask {
832 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
833 where
834 S: serde::Serializer,
835 {
836 let pb = PbAlterTableTask {
837 alter_table: Some(self.alter_table.clone()),
838 };
839 let buf = pb.encode_to_vec();
840 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
841 serializer.serialize_str(&encoded)
842 }
843}
844
845impl<'de> Deserialize<'de> for AlterTableTask {
846 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
847 where
848 D: serde::Deserializer<'de>,
849 {
850 let encoded = String::deserialize(deserializer)?;
851 let buf = general_purpose::STANDARD_NO_PAD
852 .decode(encoded)
853 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
854 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
855 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
856
857 let expr = AlterTableTask::try_from(expr)
858 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
859
860 Ok(expr)
861 }
862}
863
864#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
865pub struct TruncateTableTask {
866 pub catalog: String,
867 pub schema: String,
868 pub table: String,
869 pub table_id: TableId,
870 pub time_ranges: Vec<(Timestamp, Timestamp)>,
871}
872
873impl TruncateTableTask {
874 pub fn table_ref(&self) -> TableReference<'_> {
875 TableReference {
876 catalog: &self.catalog,
877 schema: &self.schema,
878 table: &self.table,
879 }
880 }
881
882 pub fn table_name(&self) -> TableName {
883 TableName {
884 catalog_name: self.catalog.clone(),
885 schema_name: self.schema.clone(),
886 table_name: self.table.clone(),
887 }
888 }
889}
890
891impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
892 type Error = error::Error;
893
894 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
895 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
896 err_msg: "expected truncate table",
897 })?;
898
899 Ok(Self {
900 catalog: truncate_table.catalog_name,
901 schema: truncate_table.schema_name,
902 table: truncate_table.table_name,
903 table_id: truncate_table
904 .table_id
905 .context(error::InvalidProtoMsgSnafu {
906 err_msg: "expected table_id",
907 })?
908 .id,
909 time_ranges: truncate_table
910 .time_ranges
911 .map(from_pb_time_ranges)
912 .transpose()
913 .map_err(BoxedError::new)
914 .context(ExternalSnafu)?
915 .unwrap_or_default(),
916 })
917 }
918}
919
920impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
921 type Error = error::Error;
922
923 fn try_from(task: TruncateTableTask) -> Result<Self> {
924 Ok(PbTruncateTableTask {
925 truncate_table: Some(TruncateTableExpr {
926 catalog_name: task.catalog,
927 schema_name: task.schema,
928 table_name: task.table,
929 table_id: Some(api::v1::TableId { id: task.table_id }),
930 time_ranges: Some(
931 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
932 ),
933 }),
934 })
935 }
936}
937
938#[serde_as]
939#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
940pub struct CreateDatabaseTask {
941 pub catalog: String,
942 pub schema: String,
943 pub create_if_not_exists: bool,
944 #[serde_as(deserialize_as = "DefaultOnNull")]
945 pub options: HashMap<String, String>,
946}
947
948impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
949 type Error = error::Error;
950
951 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
952 let CreateDatabaseExpr {
953 catalog_name,
954 schema_name,
955 create_if_not_exists,
956 options,
957 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
958 err_msg: "expected create database",
959 })?;
960
961 Ok(CreateDatabaseTask {
962 catalog: catalog_name,
963 schema: schema_name,
964 create_if_not_exists,
965 options,
966 })
967 }
968}
969
970impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
971 type Error = error::Error;
972
973 fn try_from(
974 CreateDatabaseTask {
975 catalog,
976 schema,
977 create_if_not_exists,
978 options,
979 }: CreateDatabaseTask,
980 ) -> Result<Self> {
981 Ok(PbCreateDatabaseTask {
982 create_database: Some(CreateDatabaseExpr {
983 catalog_name: catalog,
984 schema_name: schema,
985 create_if_not_exists,
986 options,
987 }),
988 })
989 }
990}
991
992#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
993pub struct DropDatabaseTask {
994 pub catalog: String,
995 pub schema: String,
996 pub drop_if_exists: bool,
997}
998
999impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
1000 type Error = error::Error;
1001
1002 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1003 let DropDatabaseExpr {
1004 catalog_name,
1005 schema_name,
1006 drop_if_exists,
1007 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1008 err_msg: "expected drop database",
1009 })?;
1010
1011 Ok(DropDatabaseTask {
1012 catalog: catalog_name,
1013 schema: schema_name,
1014 drop_if_exists,
1015 })
1016 }
1017}
1018
1019impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1020 type Error = error::Error;
1021
1022 fn try_from(
1023 DropDatabaseTask {
1024 catalog,
1025 schema,
1026 drop_if_exists,
1027 }: DropDatabaseTask,
1028 ) -> Result<Self> {
1029 Ok(PbDropDatabaseTask {
1030 drop_database: Some(DropDatabaseExpr {
1031 catalog_name: catalog,
1032 schema_name: schema,
1033 drop_if_exists,
1034 }),
1035 })
1036 }
1037}
1038
1039#[derive(Debug, PartialEq, Clone)]
1040pub struct AlterDatabaseTask {
1041 pub alter_expr: AlterDatabaseExpr,
1042}
1043
1044impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1045 type Error = error::Error;
1046
1047 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1048 Ok(PbAlterDatabaseTask {
1049 task: Some(task.alter_expr),
1050 })
1051 }
1052}
1053
1054impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1055 type Error = error::Error;
1056
1057 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1058 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1059 err_msg: "expected alter database",
1060 })?;
1061
1062 Ok(AlterDatabaseTask { alter_expr })
1063 }
1064}
1065
1066impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1067 type Error = error::Error;
1068
1069 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1070 match pb {
1071 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1072 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1073 options
1074 .set_database_options
1075 .into_iter()
1076 .map(SetDatabaseOption::try_from)
1077 .collect::<Result<Vec<_>>>()?,
1078 )))
1079 }
1080 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1081 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1082 options
1083 .keys
1084 .iter()
1085 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1086 .collect::<Result<Vec<_>>>()?,
1087 )),
1088 ),
1089 }
1090 }
1091}
1092
1093const TTL_KEY: &str = "ttl";
1094
1095impl TryFrom<PbOption> for SetDatabaseOption {
1096 type Error = error::Error;
1097
1098 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1099 let key_lower = key.to_ascii_lowercase();
1100 match key_lower.as_str() {
1101 TTL_KEY => {
1102 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1103 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1104
1105 Ok(SetDatabaseOption::Ttl(ttl))
1106 }
1107 _ => {
1108 if validate_database_option(&key_lower) {
1109 Ok(SetDatabaseOption::Other(key_lower, value))
1110 } else {
1111 InvalidSetDatabaseOptionSnafu { key, value }.fail()
1112 }
1113 }
1114 }
1115 }
1116}
1117
1118#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1119pub enum SetDatabaseOption {
1120 Ttl(DatabaseTimeToLive),
1121 Other(String, String),
1122}
1123
1124#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1125pub enum UnsetDatabaseOption {
1126 Ttl,
1127 Other(String),
1128}
1129
1130impl TryFrom<&str> for UnsetDatabaseOption {
1131 type Error = error::Error;
1132
1133 fn try_from(key: &str) -> Result<Self> {
1134 let key_lower = key.to_ascii_lowercase();
1135 match key_lower.as_str() {
1136 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1137 _ => {
1138 if validate_database_option(&key_lower) {
1139 Ok(UnsetDatabaseOption::Other(key_lower))
1140 } else {
1141 InvalidUnsetDatabaseOptionSnafu { key }.fail()
1142 }
1143 }
1144 }
1145 }
1146}
1147
1148#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1149pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1150
1151#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1152pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1153
1154#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1155pub enum AlterDatabaseKind {
1156 SetDatabaseOptions(SetDatabaseOptions),
1157 UnsetDatabaseOptions(UnsetDatabaseOptions),
1158}
1159
1160impl AlterDatabaseTask {
1161 pub fn catalog(&self) -> &str {
1162 &self.alter_expr.catalog_name
1163 }
1164
1165 pub fn schema(&self) -> &str {
1166 &self.alter_expr.catalog_name
1167 }
1168}
1169
1170#[derive(Debug, Clone, Serialize, Deserialize)]
1172pub struct CreateFlowTask {
1173 pub catalog_name: String,
1174 pub flow_name: String,
1175 pub source_table_names: Vec<TableName>,
1176 pub sink_table_name: TableName,
1177 pub or_replace: bool,
1178 pub create_if_not_exists: bool,
1179 pub expire_after: Option<i64>,
1181 pub eval_interval_secs: Option<i64>,
1182 pub comment: String,
1183 pub sql: String,
1184 pub flow_options: HashMap<String, String>,
1185}
1186
1187impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1188 type Error = error::Error;
1189
1190 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1191 let CreateFlowExpr {
1192 catalog_name,
1193 flow_name,
1194 source_table_names,
1195 sink_table_name,
1196 or_replace,
1197 create_if_not_exists,
1198 expire_after,
1199 eval_interval,
1200 comment,
1201 sql,
1202 flow_options,
1203 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1204 err_msg: "expected create_flow",
1205 })?;
1206
1207 Ok(CreateFlowTask {
1208 catalog_name,
1209 flow_name,
1210 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1211 sink_table_name: sink_table_name
1212 .context(error::InvalidProtoMsgSnafu {
1213 err_msg: "expected sink_table_name",
1214 })?
1215 .into(),
1216 or_replace,
1217 create_if_not_exists,
1218 expire_after: expire_after.map(|e| e.value),
1219 eval_interval_secs: eval_interval.map(|e| e.seconds),
1220 comment,
1221 sql,
1222 flow_options,
1223 })
1224 }
1225}
1226
1227impl From<CreateFlowTask> for PbCreateFlowTask {
1228 fn from(
1229 CreateFlowTask {
1230 catalog_name,
1231 flow_name,
1232 source_table_names,
1233 sink_table_name,
1234 or_replace,
1235 create_if_not_exists,
1236 expire_after,
1237 eval_interval_secs: eval_interval,
1238 comment,
1239 sql,
1240 flow_options,
1241 }: CreateFlowTask,
1242 ) -> Self {
1243 PbCreateFlowTask {
1244 create_flow: Some(CreateFlowExpr {
1245 catalog_name,
1246 flow_name,
1247 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1248 sink_table_name: Some(sink_table_name.into()),
1249 or_replace,
1250 create_if_not_exists,
1251 expire_after: expire_after.map(|value| ExpireAfter { value }),
1252 eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1253 comment,
1254 sql,
1255 flow_options,
1256 }),
1257 }
1258 }
1259}
1260
1261#[derive(Debug, Clone, Serialize, Deserialize)]
1263pub struct DropFlowTask {
1264 pub catalog_name: String,
1265 pub flow_name: String,
1266 pub flow_id: FlowId,
1267 pub drop_if_exists: bool,
1268}
1269
1270impl TryFrom<PbDropFlowTask> for DropFlowTask {
1271 type Error = error::Error;
1272
1273 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1274 let DropFlowExpr {
1275 catalog_name,
1276 flow_name,
1277 flow_id,
1278 drop_if_exists,
1279 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1280 err_msg: "expected drop_flow",
1281 })?;
1282 let flow_id = flow_id
1283 .context(error::InvalidProtoMsgSnafu {
1284 err_msg: "expected flow_id",
1285 })?
1286 .id;
1287 Ok(DropFlowTask {
1288 catalog_name,
1289 flow_name,
1290 flow_id,
1291 drop_if_exists,
1292 })
1293 }
1294}
1295
1296impl From<DropFlowTask> for PbDropFlowTask {
1297 fn from(
1298 DropFlowTask {
1299 catalog_name,
1300 flow_name,
1301 flow_id,
1302 drop_if_exists,
1303 }: DropFlowTask,
1304 ) -> Self {
1305 PbDropFlowTask {
1306 drop_flow: Some(DropFlowExpr {
1307 catalog_name,
1308 flow_name,
1309 flow_id: Some(api::v1::FlowId { id: flow_id }),
1310 drop_if_exists,
1311 }),
1312 }
1313 }
1314}
1315
1316#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1318pub enum CommentObjectId {
1319 Table(TableId),
1320 Flow(FlowId),
1321}
1322
1323#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1325pub struct CommentOnTask {
1326 pub catalog_name: String,
1327 pub schema_name: String,
1328 pub object_type: CommentObjectType,
1329 pub object_name: String,
1330 pub column_name: Option<String>,
1332 pub object_id: Option<CommentObjectId>,
1334 pub comment: Option<String>,
1335}
1336
1337#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1338pub enum CommentObjectType {
1339 Table,
1340 Column,
1341 Flow,
1342}
1343
1344impl CommentOnTask {
1345 pub fn table_ref(&self) -> TableReference<'_> {
1346 TableReference {
1347 catalog: &self.catalog_name,
1348 schema: &self.schema_name,
1349 table: &self.object_name,
1350 }
1351 }
1352}
1353
1354impl From<CommentObjectType> for PbCommentObjectType {
1356 fn from(object_type: CommentObjectType) -> Self {
1357 match object_type {
1358 CommentObjectType::Table => PbCommentObjectType::Table,
1359 CommentObjectType::Column => PbCommentObjectType::Column,
1360 CommentObjectType::Flow => PbCommentObjectType::Flow,
1361 }
1362 }
1363}
1364
1365impl TryFrom<i32> for CommentObjectType {
1366 type Error = error::Error;
1367
1368 fn try_from(value: i32) -> Result<Self> {
1369 match value {
1370 0 => Ok(CommentObjectType::Table),
1371 1 => Ok(CommentObjectType::Column),
1372 2 => Ok(CommentObjectType::Flow),
1373 _ => error::InvalidProtoMsgSnafu {
1374 err_msg: format!(
1375 "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1376 value
1377 ),
1378 }
1379 .fail(),
1380 }
1381 }
1382}
1383
1384impl TryFrom<PbCommentOnTask> for CommentOnTask {
1386 type Error = error::Error;
1387
1388 fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1389 let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1390 err_msg: "expected comment_on",
1391 })?;
1392
1393 Ok(CommentOnTask {
1394 catalog_name: comment_on.catalog_name,
1395 schema_name: comment_on.schema_name,
1396 object_type: comment_on.object_type.try_into()?,
1397 object_name: comment_on.object_name,
1398 column_name: if comment_on.column_name.is_empty() {
1399 None
1400 } else {
1401 Some(comment_on.column_name)
1402 },
1403 comment: if comment_on.comment.is_empty() {
1404 None
1405 } else {
1406 Some(comment_on.comment)
1407 },
1408 object_id: None,
1409 })
1410 }
1411}
1412
1413impl From<CommentOnTask> for PbCommentOnTask {
1414 fn from(task: CommentOnTask) -> Self {
1415 let pb_object_type: PbCommentObjectType = task.object_type.into();
1416 PbCommentOnTask {
1417 comment_on: Some(CommentOnExpr {
1418 catalog_name: task.catalog_name,
1419 schema_name: task.schema_name,
1420 object_type: pb_object_type as i32,
1421 object_name: task.object_name,
1422 column_name: task.column_name.unwrap_or_default(),
1423 comment: task.comment.unwrap_or_default(),
1424 }),
1425 }
1426 }
1427}
1428
1429#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1430pub struct QueryContext {
1431 pub(crate) current_catalog: String,
1432 pub(crate) current_schema: String,
1433 pub(crate) timezone: String,
1434 pub(crate) extensions: HashMap<String, String>,
1435 pub(crate) channel: u8,
1436}
1437
1438impl QueryContext {
1439 pub fn current_catalog(&self) -> &str {
1441 &self.current_catalog
1442 }
1443
1444 pub fn current_schema(&self) -> &str {
1446 &self.current_schema
1447 }
1448
1449 pub fn timezone(&self) -> &str {
1451 &self.timezone
1452 }
1453
1454 pub fn extensions(&self) -> &HashMap<String, String> {
1456 &self.extensions
1457 }
1458
1459 pub fn channel(&self) -> u8 {
1461 self.channel
1462 }
1463}
1464
1465#[derive(Debug, Clone, Serialize, PartialEq)]
1469pub struct FlowQueryContext {
1470 pub(crate) catalog: String,
1472 pub(crate) schema: String,
1474 pub(crate) timezone: String,
1476}
1477
1478impl<'de> Deserialize<'de> for FlowQueryContext {
1479 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1480 where
1481 D: serde::Deserializer<'de>,
1482 {
1483 #[derive(Deserialize)]
1485 #[serde(untagged)]
1486 enum ContextCompat {
1487 Flow(FlowQueryContextHelper),
1488 Full(QueryContext),
1489 }
1490
1491 #[derive(Deserialize)]
1492 struct FlowQueryContextHelper {
1493 catalog: String,
1494 schema: String,
1495 timezone: String,
1496 }
1497
1498 match ContextCompat::deserialize(deserializer)? {
1499 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1500 catalog: helper.catalog,
1501 schema: helper.schema,
1502 timezone: helper.timezone,
1503 }),
1504 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1505 }
1506 }
1507}
1508
1509impl From<QueryContextRef> for QueryContext {
1510 fn from(query_context: QueryContextRef) -> Self {
1511 QueryContext {
1512 current_catalog: query_context.current_catalog().to_string(),
1513 current_schema: query_context.current_schema().clone(),
1514 timezone: query_context.timezone().to_string(),
1515 extensions: query_context.extensions(),
1516 channel: query_context.channel() as u8,
1517 }
1518 }
1519}
1520
1521impl TryFrom<QueryContext> for session::context::QueryContext {
1522 type Error = error::Error;
1523 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1524 Ok(QueryContextBuilder::default()
1525 .current_catalog(value.current_catalog)
1526 .current_schema(value.current_schema)
1527 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1528 .extensions(value.extensions)
1529 .channel((value.channel as u32).into())
1530 .build())
1531 }
1532}
1533
1534impl From<QueryContext> for PbQueryContext {
1535 fn from(
1536 QueryContext {
1537 current_catalog,
1538 current_schema,
1539 timezone,
1540 extensions,
1541 channel,
1542 }: QueryContext,
1543 ) -> Self {
1544 PbQueryContext {
1545 current_catalog,
1546 current_schema,
1547 timezone,
1548 extensions,
1549 channel: channel as u32,
1550 snapshot_seqs: None,
1551 explain: None,
1552 }
1553 }
1554}
1555
1556impl From<QueryContext> for FlowQueryContext {
1557 fn from(ctx: QueryContext) -> Self {
1558 Self {
1559 catalog: ctx.current_catalog,
1560 schema: ctx.current_schema,
1561 timezone: ctx.timezone,
1562 }
1563 }
1564}
1565
1566impl From<QueryContextRef> for FlowQueryContext {
1567 fn from(ctx: QueryContextRef) -> Self {
1568 Self {
1569 catalog: ctx.current_catalog().to_string(),
1570 schema: ctx.current_schema().clone(),
1571 timezone: ctx.timezone().to_string(),
1572 }
1573 }
1574}
1575
1576impl From<FlowQueryContext> for QueryContext {
1577 fn from(flow_ctx: FlowQueryContext) -> Self {
1578 Self {
1579 current_catalog: flow_ctx.catalog,
1580 current_schema: flow_ctx.schema,
1581 timezone: flow_ctx.timezone,
1582 extensions: HashMap::new(),
1583 channel: 0, }
1585 }
1586}
1587
1588impl From<FlowQueryContext> for PbQueryContext {
1589 fn from(flow_ctx: FlowQueryContext) -> Self {
1590 let query_ctx: QueryContext = flow_ctx.into();
1591 query_ctx.into()
1592 }
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597 use std::sync::Arc;
1598
1599 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1600 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1601 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1602 use store_api::storage::ConcreteDataType;
1603 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1604 use table::test_util::table_info::test_table_info;
1605
1606 use super::{AlterTableTask, CreateTableTask, *};
1607
1608 #[test]
1609 fn test_basic_ser_de_create_table_task() {
1610 let schema = SchemaBuilder::default().build().unwrap();
1611 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1612 let task = CreateTableTask::new(
1613 CreateTableExpr::default(),
1614 Vec::new(),
1615 RawTableInfo::from(table_info),
1616 );
1617
1618 let output = serde_json::to_vec(&task).unwrap();
1619
1620 let de = serde_json::from_slice(&output).unwrap();
1621 assert_eq!(task, de);
1622 }
1623
1624 #[test]
1625 fn test_basic_ser_de_alter_table_task() {
1626 let task = AlterTableTask {
1627 alter_table: AlterTableExpr::default(),
1628 };
1629
1630 let output = serde_json::to_vec(&task).unwrap();
1631
1632 let de = serde_json::from_slice(&output).unwrap();
1633 assert_eq!(task, de);
1634 }
1635
1636 #[test]
1637 fn test_sort_columns() {
1638 let raw_schema = RawSchema {
1640 column_schemas: vec![
1641 ColumnSchema::new(
1642 "column3".to_string(),
1643 ConcreteDataType::string_datatype(),
1644 true,
1645 ),
1646 ColumnSchema::new(
1647 "column1".to_string(),
1648 ConcreteDataType::timestamp_millisecond_datatype(),
1649 false,
1650 )
1651 .with_time_index(true),
1652 ColumnSchema::new(
1653 "column2".to_string(),
1654 ConcreteDataType::float64_datatype(),
1655 true,
1656 ),
1657 ],
1658 timestamp_index: Some(1),
1659 version: 0,
1660 };
1661
1662 let raw_table_meta = RawTableMeta {
1664 schema: raw_schema,
1665 primary_key_indices: vec![0],
1666 value_indices: vec![2],
1667 engine: METRIC_ENGINE_NAME.to_string(),
1668 next_column_id: 0,
1669 options: Default::default(),
1670 created_on: Default::default(),
1671 updated_on: Default::default(),
1672 partition_key_indices: Default::default(),
1673 column_ids: Default::default(),
1674 };
1675
1676 let raw_table_info = RawTableInfo {
1678 ident: Default::default(),
1679 meta: raw_table_meta,
1680 name: Default::default(),
1681 desc: Default::default(),
1682 catalog_name: Default::default(),
1683 schema_name: Default::default(),
1684 table_type: TableType::Base,
1685 };
1686
1687 let create_table_expr = CreateTableExpr {
1689 column_defs: vec![
1690 ColumnDef {
1691 name: "column3".to_string(),
1692 semantic_type: SemanticType::Tag as i32,
1693 ..Default::default()
1694 },
1695 ColumnDef {
1696 name: "column1".to_string(),
1697 semantic_type: SemanticType::Timestamp as i32,
1698 ..Default::default()
1699 },
1700 ColumnDef {
1701 name: "column2".to_string(),
1702 semantic_type: SemanticType::Field as i32,
1703 ..Default::default()
1704 },
1705 ],
1706 primary_keys: vec!["column3".to_string()],
1707 ..Default::default()
1708 };
1709
1710 let mut create_table_task =
1711 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1712
1713 create_table_task.sort_columns();
1715
1716 assert_eq!(
1718 create_table_task.create_table.column_defs[0].name,
1719 "column1".to_string()
1720 );
1721 assert_eq!(
1722 create_table_task.create_table.column_defs[1].name,
1723 "column2".to_string()
1724 );
1725 assert_eq!(
1726 create_table_task.create_table.column_defs[2].name,
1727 "column3".to_string()
1728 );
1729
1730 assert_eq!(
1732 create_table_task.table_info.meta.schema.timestamp_index,
1733 Some(0)
1734 );
1735 assert_eq!(
1736 create_table_task.table_info.meta.primary_key_indices,
1737 vec![2]
1738 );
1739 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1740 }
1741
1742 #[test]
1743 fn test_flow_query_context_conversion_from_query_context() {
1744 use std::collections::HashMap;
1745 let mut extensions = HashMap::new();
1746 extensions.insert("key1".to_string(), "value1".to_string());
1747 extensions.insert("key2".to_string(), "value2".to_string());
1748
1749 let query_ctx = QueryContext {
1750 current_catalog: "test_catalog".to_string(),
1751 current_schema: "test_schema".to_string(),
1752 timezone: "UTC".to_string(),
1753 extensions,
1754 channel: 5,
1755 };
1756
1757 let flow_ctx: FlowQueryContext = query_ctx.into();
1758
1759 assert_eq!(flow_ctx.catalog, "test_catalog");
1760 assert_eq!(flow_ctx.schema, "test_schema");
1761 assert_eq!(flow_ctx.timezone, "UTC");
1762 }
1763
1764 #[test]
1765 fn test_flow_query_context_conversion_to_query_context() {
1766 let flow_ctx = FlowQueryContext {
1767 catalog: "prod_catalog".to_string(),
1768 schema: "public".to_string(),
1769 timezone: "America/New_York".to_string(),
1770 };
1771
1772 let query_ctx: QueryContext = flow_ctx.clone().into();
1773
1774 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1775 assert_eq!(query_ctx.current_schema, "public");
1776 assert_eq!(query_ctx.timezone, "America/New_York");
1777 assert!(query_ctx.extensions.is_empty());
1778 assert_eq!(query_ctx.channel, 0);
1779
1780 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1782 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1783 }
1784
1785 #[test]
1786 fn test_flow_query_context_conversion_from_query_context_ref() {
1787 use common_time::Timezone;
1788 use session::context::QueryContextBuilder;
1789
1790 let session_ctx = QueryContextBuilder::default()
1791 .current_catalog("session_catalog".to_string())
1792 .current_schema("session_schema".to_string())
1793 .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1794 .build();
1795
1796 let session_ctx_ref = Arc::new(session_ctx);
1797 let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1798
1799 assert_eq!(flow_ctx.catalog, "session_catalog");
1800 assert_eq!(flow_ctx.schema, "session_schema");
1801 assert_eq!(flow_ctx.timezone, "Europe/London");
1802 }
1803
1804 #[test]
1805 fn test_flow_query_context_serialization() {
1806 let flow_ctx = FlowQueryContext {
1807 catalog: "test_catalog".to_string(),
1808 schema: "test_schema".to_string(),
1809 timezone: "UTC".to_string(),
1810 };
1811
1812 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1813 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1814
1815 assert_eq!(flow_ctx, deserialized);
1816
1817 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1819 assert_eq!(json_value["catalog"], "test_catalog");
1820 assert_eq!(json_value["schema"], "test_schema");
1821 assert_eq!(json_value["timezone"], "UTC");
1822 }
1823
1824 #[test]
1825 fn test_flow_query_context_conversion_to_pb() {
1826 let flow_ctx = FlowQueryContext {
1827 catalog: "pb_catalog".to_string(),
1828 schema: "pb_schema".to_string(),
1829 timezone: "Asia/Tokyo".to_string(),
1830 };
1831
1832 let pb_ctx: PbQueryContext = flow_ctx.into();
1833
1834 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1835 assert_eq!(pb_ctx.current_schema, "pb_schema");
1836 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1837 assert!(pb_ctx.extensions.is_empty());
1838 assert_eq!(pb_ctx.channel, 0);
1839 assert!(pb_ctx.snapshot_seqs.is_none());
1840 assert!(pb_ctx.explain.is_none());
1841 }
1842}