1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26 AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
27 CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
28 CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
29 CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
30 DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
31 DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
32 DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
33 TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36 AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
37 CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
38 DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
39 QueryContext as PbQueryContext, TruncateTableExpr,
40};
41use base64::Engine as _;
42use base64::engine::general_purpose;
43use common_error::ext::BoxedError;
44use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
45use prost::Message;
46use serde::{Deserialize, Serialize};
47use serde_with::{DefaultOnNull, serde_as};
48use session::context::{QueryContextBuilder, QueryContextRef};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{RawTableInfo, TableId};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57 InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61#[derive(Debug, Clone)]
63pub enum DdlTask {
64 CreateTable(CreateTableTask),
65 DropTable(DropTableTask),
66 AlterTable(AlterTableTask),
67 TruncateTable(TruncateTableTask),
68 CreateLogicalTables(Vec<CreateTableTask>),
69 DropLogicalTables(Vec<DropTableTask>),
70 AlterLogicalTables(Vec<AlterTableTask>),
71 CreateDatabase(CreateDatabaseTask),
72 DropDatabase(DropDatabaseTask),
73 AlterDatabase(AlterDatabaseTask),
74 CreateFlow(CreateFlowTask),
75 DropFlow(DropFlowTask),
76 #[cfg(feature = "enterprise")]
77 DropTrigger(trigger::DropTriggerTask),
78 CreateView(CreateViewTask),
79 DropView(DropViewTask),
80 #[cfg(feature = "enterprise")]
81 CreateTrigger(trigger::CreateTriggerTask),
82 CommentOn(CommentOnTask),
83}
84
85impl DdlTask {
86 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
88 DdlTask::CreateFlow(expr)
89 }
90
91 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
93 DdlTask::DropFlow(expr)
94 }
95
96 pub fn new_drop_view(expr: DropViewTask) -> Self {
98 DdlTask::DropView(expr)
99 }
100
101 pub fn new_create_table(
103 expr: CreateTableExpr,
104 partitions: Vec<Partition>,
105 table_info: RawTableInfo,
106 ) -> Self {
107 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
108 }
109
110 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
112 DdlTask::CreateLogicalTables(
113 table_data
114 .into_iter()
115 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
116 .collect(),
117 )
118 }
119
120 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
122 DdlTask::AlterLogicalTables(
123 table_data
124 .into_iter()
125 .map(|alter_table| AlterTableTask { alter_table })
126 .collect(),
127 )
128 }
129
130 pub fn new_drop_table(
132 catalog: String,
133 schema: String,
134 table: String,
135 table_id: TableId,
136 drop_if_exists: bool,
137 ) -> Self {
138 DdlTask::DropTable(DropTableTask {
139 catalog,
140 schema,
141 table,
142 table_id,
143 drop_if_exists,
144 })
145 }
146
147 pub fn new_create_database(
149 catalog: String,
150 schema: String,
151 create_if_not_exists: bool,
152 options: HashMap<String, String>,
153 ) -> Self {
154 DdlTask::CreateDatabase(CreateDatabaseTask {
155 catalog,
156 schema,
157 create_if_not_exists,
158 options,
159 })
160 }
161
162 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
164 DdlTask::DropDatabase(DropDatabaseTask {
165 catalog,
166 schema,
167 drop_if_exists,
168 })
169 }
170
171 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
173 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
174 }
175
176 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
178 DdlTask::AlterTable(AlterTableTask { alter_table })
179 }
180
181 pub fn new_truncate_table(
183 catalog: String,
184 schema: String,
185 table: String,
186 table_id: TableId,
187 time_ranges: Vec<(Timestamp, Timestamp)>,
188 ) -> Self {
189 DdlTask::TruncateTable(TruncateTableTask {
190 catalog,
191 schema,
192 table,
193 table_id,
194 time_ranges,
195 })
196 }
197
198 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
200 DdlTask::CreateView(CreateViewTask {
201 create_view,
202 view_info,
203 })
204 }
205
206 pub fn new_comment_on(task: CommentOnTask) -> Self {
208 DdlTask::CommentOn(task)
209 }
210}
211
212impl TryFrom<Task> for DdlTask {
213 type Error = error::Error;
214 fn try_from(task: Task) -> Result<Self> {
215 match task {
216 Task::CreateTableTask(create_table) => {
217 Ok(DdlTask::CreateTable(create_table.try_into()?))
218 }
219 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
220 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
221 Task::TruncateTableTask(truncate_table) => {
222 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
223 }
224 Task::CreateTableTasks(create_tables) => {
225 let tasks = create_tables
226 .tasks
227 .into_iter()
228 .map(|task| task.try_into())
229 .collect::<Result<Vec<_>>>()?;
230
231 Ok(DdlTask::CreateLogicalTables(tasks))
232 }
233 Task::DropTableTasks(drop_tables) => {
234 let tasks = drop_tables
235 .tasks
236 .into_iter()
237 .map(|task| task.try_into())
238 .collect::<Result<Vec<_>>>()?;
239
240 Ok(DdlTask::DropLogicalTables(tasks))
241 }
242 Task::AlterTableTasks(alter_tables) => {
243 let tasks = alter_tables
244 .tasks
245 .into_iter()
246 .map(|task| task.try_into())
247 .collect::<Result<Vec<_>>>()?;
248
249 Ok(DdlTask::AlterLogicalTables(tasks))
250 }
251 Task::CreateDatabaseTask(create_database) => {
252 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
253 }
254 Task::DropDatabaseTask(drop_database) => {
255 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
256 }
257 Task::AlterDatabaseTask(alter_database) => {
258 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
259 }
260 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
261 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
262 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
263 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
264 Task::CreateTriggerTask(create_trigger) => {
265 #[cfg(feature = "enterprise")]
266 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
267 #[cfg(not(feature = "enterprise"))]
268 {
269 let _ = create_trigger;
270 crate::error::UnsupportedSnafu {
271 operation: "create trigger",
272 }
273 .fail()
274 }
275 }
276 Task::DropTriggerTask(drop_trigger) => {
277 #[cfg(feature = "enterprise")]
278 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
279 #[cfg(not(feature = "enterprise"))]
280 {
281 let _ = drop_trigger;
282 crate::error::UnsupportedSnafu {
283 operation: "drop trigger",
284 }
285 .fail()
286 }
287 }
288 Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
289 }
290 }
291}
292
293#[derive(Clone)]
294pub struct SubmitDdlTaskRequest {
295 pub query_context: QueryContextRef,
296 pub task: DdlTask,
297}
298
299impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
300 type Error = error::Error;
301
302 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
303 let task = match request.task {
304 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
305 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
306 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
307 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
308 DdlTask::CreateLogicalTables(tasks) => {
309 let tasks = tasks
310 .into_iter()
311 .map(|task| task.try_into())
312 .collect::<Result<Vec<_>>>()?;
313
314 Task::CreateTableTasks(PbCreateTableTasks { tasks })
315 }
316 DdlTask::DropLogicalTables(tasks) => {
317 let tasks = tasks
318 .into_iter()
319 .map(|task| task.into())
320 .collect::<Vec<_>>();
321
322 Task::DropTableTasks(PbDropTableTasks { tasks })
323 }
324 DdlTask::AlterLogicalTables(tasks) => {
325 let tasks = tasks
326 .into_iter()
327 .map(|task| task.try_into())
328 .collect::<Result<Vec<_>>>()?;
329
330 Task::AlterTableTasks(PbAlterTableTasks { tasks })
331 }
332 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
333 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
334 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
335 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
336 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
337 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
338 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
339 #[cfg(feature = "enterprise")]
340 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
341 #[cfg(feature = "enterprise")]
342 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
343 DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
344 };
345
346 Ok(Self {
347 header: None,
348 query_context: Some((*request.query_context).clone().into()),
349 task: Some(task),
350 })
351 }
352}
353
354#[derive(Debug, Default)]
355pub struct SubmitDdlTaskResponse {
356 pub key: Vec<u8>,
357 pub table_ids: Vec<TableId>,
359}
360
361impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
362 type Error = error::Error;
363
364 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
365 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
366 Ok(Self {
367 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
368 table_ids,
369 })
370 }
371}
372
373impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
374 fn from(val: SubmitDdlTaskResponse) -> Self {
375 Self {
376 pid: Some(ProcedureId { key: val.key }),
377 table_ids: val
378 .table_ids
379 .into_iter()
380 .map(|id| api::v1::TableId { id })
381 .collect(),
382 ..Default::default()
383 }
384 }
385}
386
387#[derive(Debug, PartialEq, Clone)]
389pub struct CreateViewTask {
390 pub create_view: CreateViewExpr,
391 pub view_info: RawTableInfo,
392}
393
394impl CreateViewTask {
395 pub fn table_ref(&self) -> TableReference<'_> {
397 TableReference {
398 catalog: &self.create_view.catalog_name,
399 schema: &self.create_view.schema_name,
400 table: &self.create_view.view_name,
401 }
402 }
403
404 pub fn raw_logical_plan(&self) -> &Vec<u8> {
406 &self.create_view.logical_plan
407 }
408
409 pub fn view_definition(&self) -> &str {
411 &self.create_view.definition
412 }
413
414 pub fn table_names(&self) -> HashSet<TableName> {
416 self.create_view
417 .table_names
418 .iter()
419 .map(|t| t.clone().into())
420 .collect()
421 }
422
423 pub fn columns(&self) -> &Vec<String> {
425 &self.create_view.columns
426 }
427
428 pub fn plan_columns(&self) -> &Vec<String> {
430 &self.create_view.plan_columns
431 }
432}
433
434impl TryFrom<PbCreateViewTask> for CreateViewTask {
435 type Error = error::Error;
436
437 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
438 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
439
440 Ok(CreateViewTask {
441 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
442 err_msg: "expected create view",
443 })?,
444 view_info,
445 })
446 }
447}
448
449impl TryFrom<CreateViewTask> for PbCreateViewTask {
450 type Error = error::Error;
451
452 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
453 Ok(PbCreateViewTask {
454 create_view: Some(task.create_view),
455 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
456 })
457 }
458}
459
460impl Serialize for CreateViewTask {
461 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
462 where
463 S: serde::Serializer,
464 {
465 let view_info = serde_json::to_vec(&self.view_info)
466 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
467
468 let pb = PbCreateViewTask {
469 create_view: Some(self.create_view.clone()),
470 view_info,
471 };
472 let buf = pb.encode_to_vec();
473 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
474 serializer.serialize_str(&encoded)
475 }
476}
477
478impl<'de> Deserialize<'de> for CreateViewTask {
479 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
480 where
481 D: serde::Deserializer<'de>,
482 {
483 let encoded = String::deserialize(deserializer)?;
484 let buf = general_purpose::STANDARD_NO_PAD
485 .decode(encoded)
486 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
487 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
488 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
489
490 let expr = CreateViewTask::try_from(expr)
491 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
492
493 Ok(expr)
494 }
495}
496
497#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
499pub struct DropViewTask {
500 pub catalog: String,
501 pub schema: String,
502 pub view: String,
503 pub view_id: TableId,
504 pub drop_if_exists: bool,
505}
506
507impl DropViewTask {
508 pub fn table_ref(&self) -> TableReference<'_> {
510 TableReference {
511 catalog: &self.catalog,
512 schema: &self.schema,
513 table: &self.view,
514 }
515 }
516}
517
518impl TryFrom<PbDropViewTask> for DropViewTask {
519 type Error = error::Error;
520
521 fn try_from(pb: PbDropViewTask) -> Result<Self> {
522 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
523 err_msg: "expected drop view",
524 })?;
525
526 Ok(DropViewTask {
527 catalog: expr.catalog_name,
528 schema: expr.schema_name,
529 view: expr.view_name,
530 view_id: expr
531 .view_id
532 .context(error::InvalidProtoMsgSnafu {
533 err_msg: "expected view_id",
534 })?
535 .id,
536 drop_if_exists: expr.drop_if_exists,
537 })
538 }
539}
540
541impl From<DropViewTask> for PbDropViewTask {
542 fn from(task: DropViewTask) -> Self {
543 PbDropViewTask {
544 drop_view: Some(DropViewExpr {
545 catalog_name: task.catalog,
546 schema_name: task.schema,
547 view_name: task.view,
548 view_id: Some(api::v1::TableId { id: task.view_id }),
549 drop_if_exists: task.drop_if_exists,
550 }),
551 }
552 }
553}
554
555#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
556pub struct DropTableTask {
557 pub catalog: String,
558 pub schema: String,
559 pub table: String,
560 pub table_id: TableId,
561 #[serde(default)]
562 pub drop_if_exists: bool,
563}
564
565impl DropTableTask {
566 pub fn table_ref(&self) -> TableReference<'_> {
567 TableReference {
568 catalog: &self.catalog,
569 schema: &self.schema,
570 table: &self.table,
571 }
572 }
573
574 pub fn table_name(&self) -> TableName {
575 TableName {
576 catalog_name: self.catalog.clone(),
577 schema_name: self.schema.clone(),
578 table_name: self.table.clone(),
579 }
580 }
581}
582
583impl TryFrom<PbDropTableTask> for DropTableTask {
584 type Error = error::Error;
585
586 fn try_from(pb: PbDropTableTask) -> Result<Self> {
587 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
588 err_msg: "expected drop table",
589 })?;
590
591 Ok(Self {
592 catalog: drop_table.catalog_name,
593 schema: drop_table.schema_name,
594 table: drop_table.table_name,
595 table_id: drop_table
596 .table_id
597 .context(error::InvalidProtoMsgSnafu {
598 err_msg: "expected table_id",
599 })?
600 .id,
601 drop_if_exists: drop_table.drop_if_exists,
602 })
603 }
604}
605
606impl From<DropTableTask> for PbDropTableTask {
607 fn from(task: DropTableTask) -> Self {
608 PbDropTableTask {
609 drop_table: Some(DropTableExpr {
610 catalog_name: task.catalog,
611 schema_name: task.schema,
612 table_name: task.table,
613 table_id: Some(api::v1::TableId { id: task.table_id }),
614 drop_if_exists: task.drop_if_exists,
615 }),
616 }
617 }
618}
619
620#[derive(Debug, PartialEq, Clone)]
621pub struct CreateTableTask {
622 pub create_table: CreateTableExpr,
623 pub partitions: Vec<Partition>,
624 pub table_info: RawTableInfo,
625}
626
627impl TryFrom<PbCreateTableTask> for CreateTableTask {
628 type Error = error::Error;
629
630 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
631 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
632
633 Ok(CreateTableTask::new(
634 pb.create_table.context(error::InvalidProtoMsgSnafu {
635 err_msg: "expected create table",
636 })?,
637 pb.partitions,
638 table_info,
639 ))
640 }
641}
642
643impl TryFrom<CreateTableTask> for PbCreateTableTask {
644 type Error = error::Error;
645
646 fn try_from(task: CreateTableTask) -> Result<Self> {
647 Ok(PbCreateTableTask {
648 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
649 create_table: Some(task.create_table),
650 partitions: task.partitions,
651 })
652 }
653}
654
655impl CreateTableTask {
656 pub fn new(
657 expr: CreateTableExpr,
658 partitions: Vec<Partition>,
659 table_info: RawTableInfo,
660 ) -> CreateTableTask {
661 CreateTableTask {
662 create_table: expr,
663 partitions,
664 table_info,
665 }
666 }
667
668 pub fn table_name(&self) -> TableName {
669 let table = &self.create_table;
670
671 TableName {
672 catalog_name: table.catalog_name.clone(),
673 schema_name: table.schema_name.clone(),
674 table_name: table.table_name.clone(),
675 }
676 }
677
678 pub fn table_ref(&self) -> TableReference<'_> {
679 let table = &self.create_table;
680
681 TableReference {
682 catalog: &table.catalog_name,
683 schema: &table.schema_name,
684 table: &table.table_name,
685 }
686 }
687
688 pub fn set_table_id(&mut self, table_id: TableId) {
690 self.table_info.ident.table_id = table_id;
691 }
692
693 pub fn sort_columns(&mut self) {
698 self.create_table
701 .column_defs
702 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
703
704 self.table_info.sort_columns();
705 }
706}
707
708impl Serialize for CreateTableTask {
709 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
710 where
711 S: serde::Serializer,
712 {
713 let table_info = serde_json::to_vec(&self.table_info)
714 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
715
716 let pb = PbCreateTableTask {
717 create_table: Some(self.create_table.clone()),
718 partitions: self.partitions.clone(),
719 table_info,
720 };
721 let buf = pb.encode_to_vec();
722 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
723 serializer.serialize_str(&encoded)
724 }
725}
726
727impl<'de> Deserialize<'de> for CreateTableTask {
728 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
729 where
730 D: serde::Deserializer<'de>,
731 {
732 let encoded = String::deserialize(deserializer)?;
733 let buf = general_purpose::STANDARD_NO_PAD
734 .decode(encoded)
735 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
736 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
737 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
738
739 let expr = CreateTableTask::try_from(expr)
740 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
741
742 Ok(expr)
743 }
744}
745
746#[derive(Debug, PartialEq, Clone)]
747pub struct AlterTableTask {
748 pub alter_table: AlterTableExpr,
750}
751
752impl AlterTableTask {
753 pub fn validate(&self) -> Result<()> {
754 self.alter_table
755 .kind
756 .as_ref()
757 .context(error::UnexpectedSnafu {
758 err_msg: "'kind' is absent",
759 })?;
760 Ok(())
761 }
762
763 pub fn table_ref(&self) -> TableReference<'_> {
764 TableReference {
765 catalog: &self.alter_table.catalog_name,
766 schema: &self.alter_table.schema_name,
767 table: &self.alter_table.table_name,
768 }
769 }
770
771 pub fn table_name(&self) -> TableName {
772 let table = &self.alter_table;
773
774 TableName {
775 catalog_name: table.catalog_name.clone(),
776 schema_name: table.schema_name.clone(),
777 table_name: table.table_name.clone(),
778 }
779 }
780}
781
782impl TryFrom<PbAlterTableTask> for AlterTableTask {
783 type Error = error::Error;
784
785 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
786 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
787 err_msg: "expected alter_table",
788 })?;
789
790 Ok(AlterTableTask { alter_table })
791 }
792}
793
794impl TryFrom<AlterTableTask> for PbAlterTableTask {
795 type Error = error::Error;
796
797 fn try_from(task: AlterTableTask) -> Result<Self> {
798 Ok(PbAlterTableTask {
799 alter_table: Some(task.alter_table),
800 })
801 }
802}
803
804impl Serialize for AlterTableTask {
805 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
806 where
807 S: serde::Serializer,
808 {
809 let pb = PbAlterTableTask {
810 alter_table: Some(self.alter_table.clone()),
811 };
812 let buf = pb.encode_to_vec();
813 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
814 serializer.serialize_str(&encoded)
815 }
816}
817
818impl<'de> Deserialize<'de> for AlterTableTask {
819 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
820 where
821 D: serde::Deserializer<'de>,
822 {
823 let encoded = String::deserialize(deserializer)?;
824 let buf = general_purpose::STANDARD_NO_PAD
825 .decode(encoded)
826 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
827 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
828 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
829
830 let expr = AlterTableTask::try_from(expr)
831 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
832
833 Ok(expr)
834 }
835}
836
837#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
838pub struct TruncateTableTask {
839 pub catalog: String,
840 pub schema: String,
841 pub table: String,
842 pub table_id: TableId,
843 pub time_ranges: Vec<(Timestamp, Timestamp)>,
844}
845
846impl TruncateTableTask {
847 pub fn table_ref(&self) -> TableReference<'_> {
848 TableReference {
849 catalog: &self.catalog,
850 schema: &self.schema,
851 table: &self.table,
852 }
853 }
854
855 pub fn table_name(&self) -> TableName {
856 TableName {
857 catalog_name: self.catalog.clone(),
858 schema_name: self.schema.clone(),
859 table_name: self.table.clone(),
860 }
861 }
862}
863
864impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
865 type Error = error::Error;
866
867 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
868 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
869 err_msg: "expected truncate table",
870 })?;
871
872 Ok(Self {
873 catalog: truncate_table.catalog_name,
874 schema: truncate_table.schema_name,
875 table: truncate_table.table_name,
876 table_id: truncate_table
877 .table_id
878 .context(error::InvalidProtoMsgSnafu {
879 err_msg: "expected table_id",
880 })?
881 .id,
882 time_ranges: truncate_table
883 .time_ranges
884 .map(from_pb_time_ranges)
885 .transpose()
886 .map_err(BoxedError::new)
887 .context(ExternalSnafu)?
888 .unwrap_or_default(),
889 })
890 }
891}
892
893impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
894 type Error = error::Error;
895
896 fn try_from(task: TruncateTableTask) -> Result<Self> {
897 Ok(PbTruncateTableTask {
898 truncate_table: Some(TruncateTableExpr {
899 catalog_name: task.catalog,
900 schema_name: task.schema,
901 table_name: task.table,
902 table_id: Some(api::v1::TableId { id: task.table_id }),
903 time_ranges: Some(
904 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
905 ),
906 }),
907 })
908 }
909}
910
911#[serde_as]
912#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
913pub struct CreateDatabaseTask {
914 pub catalog: String,
915 pub schema: String,
916 pub create_if_not_exists: bool,
917 #[serde_as(deserialize_as = "DefaultOnNull")]
918 pub options: HashMap<String, String>,
919}
920
921impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
922 type Error = error::Error;
923
924 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
925 let CreateDatabaseExpr {
926 catalog_name,
927 schema_name,
928 create_if_not_exists,
929 options,
930 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
931 err_msg: "expected create database",
932 })?;
933
934 Ok(CreateDatabaseTask {
935 catalog: catalog_name,
936 schema: schema_name,
937 create_if_not_exists,
938 options,
939 })
940 }
941}
942
943impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
944 type Error = error::Error;
945
946 fn try_from(
947 CreateDatabaseTask {
948 catalog,
949 schema,
950 create_if_not_exists,
951 options,
952 }: CreateDatabaseTask,
953 ) -> Result<Self> {
954 Ok(PbCreateDatabaseTask {
955 create_database: Some(CreateDatabaseExpr {
956 catalog_name: catalog,
957 schema_name: schema,
958 create_if_not_exists,
959 options,
960 }),
961 })
962 }
963}
964
965#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
966pub struct DropDatabaseTask {
967 pub catalog: String,
968 pub schema: String,
969 pub drop_if_exists: bool,
970}
971
972impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
973 type Error = error::Error;
974
975 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
976 let DropDatabaseExpr {
977 catalog_name,
978 schema_name,
979 drop_if_exists,
980 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
981 err_msg: "expected drop database",
982 })?;
983
984 Ok(DropDatabaseTask {
985 catalog: catalog_name,
986 schema: schema_name,
987 drop_if_exists,
988 })
989 }
990}
991
992impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
993 type Error = error::Error;
994
995 fn try_from(
996 DropDatabaseTask {
997 catalog,
998 schema,
999 drop_if_exists,
1000 }: DropDatabaseTask,
1001 ) -> Result<Self> {
1002 Ok(PbDropDatabaseTask {
1003 drop_database: Some(DropDatabaseExpr {
1004 catalog_name: catalog,
1005 schema_name: schema,
1006 drop_if_exists,
1007 }),
1008 })
1009 }
1010}
1011
1012#[derive(Debug, PartialEq, Clone)]
1013pub struct AlterDatabaseTask {
1014 pub alter_expr: AlterDatabaseExpr,
1015}
1016
1017impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1018 type Error = error::Error;
1019
1020 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1021 Ok(PbAlterDatabaseTask {
1022 task: Some(task.alter_expr),
1023 })
1024 }
1025}
1026
1027impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1028 type Error = error::Error;
1029
1030 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1031 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1032 err_msg: "expected alter database",
1033 })?;
1034
1035 Ok(AlterDatabaseTask { alter_expr })
1036 }
1037}
1038
1039impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1040 type Error = error::Error;
1041
1042 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1043 match pb {
1044 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1045 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1046 options
1047 .set_database_options
1048 .into_iter()
1049 .map(SetDatabaseOption::try_from)
1050 .collect::<Result<Vec<_>>>()?,
1051 )))
1052 }
1053 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1054 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1055 options
1056 .keys
1057 .iter()
1058 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1059 .collect::<Result<Vec<_>>>()?,
1060 )),
1061 ),
1062 }
1063 }
1064}
1065
1066const TTL_KEY: &str = "ttl";
1067
1068impl TryFrom<PbOption> for SetDatabaseOption {
1069 type Error = error::Error;
1070
1071 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1072 let key_lower = key.to_ascii_lowercase();
1073 match key_lower.as_str() {
1074 TTL_KEY => {
1075 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1076 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1077
1078 Ok(SetDatabaseOption::Ttl(ttl))
1079 }
1080 _ => {
1081 if validate_database_option(&key_lower) {
1082 Ok(SetDatabaseOption::Other(key_lower, value))
1083 } else {
1084 InvalidSetDatabaseOptionSnafu { key, value }.fail()
1085 }
1086 }
1087 }
1088 }
1089}
1090
1091#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1092pub enum SetDatabaseOption {
1093 Ttl(DatabaseTimeToLive),
1094 Other(String, String),
1095}
1096
1097#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1098pub enum UnsetDatabaseOption {
1099 Ttl,
1100 Other(String),
1101}
1102
1103impl TryFrom<&str> for UnsetDatabaseOption {
1104 type Error = error::Error;
1105
1106 fn try_from(key: &str) -> Result<Self> {
1107 let key_lower = key.to_ascii_lowercase();
1108 match key_lower.as_str() {
1109 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1110 _ => {
1111 if validate_database_option(&key_lower) {
1112 Ok(UnsetDatabaseOption::Other(key_lower))
1113 } else {
1114 InvalidUnsetDatabaseOptionSnafu { key }.fail()
1115 }
1116 }
1117 }
1118 }
1119}
1120
1121#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1122pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1123
1124#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1125pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1126
1127#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1128pub enum AlterDatabaseKind {
1129 SetDatabaseOptions(SetDatabaseOptions),
1130 UnsetDatabaseOptions(UnsetDatabaseOptions),
1131}
1132
1133impl AlterDatabaseTask {
1134 pub fn catalog(&self) -> &str {
1135 &self.alter_expr.catalog_name
1136 }
1137
1138 pub fn schema(&self) -> &str {
1139 &self.alter_expr.catalog_name
1140 }
1141}
1142
1143#[derive(Debug, Clone, Serialize, Deserialize)]
1145pub struct CreateFlowTask {
1146 pub catalog_name: String,
1147 pub flow_name: String,
1148 pub source_table_names: Vec<TableName>,
1149 pub sink_table_name: TableName,
1150 pub or_replace: bool,
1151 pub create_if_not_exists: bool,
1152 pub expire_after: Option<i64>,
1154 pub eval_interval_secs: Option<i64>,
1155 pub comment: String,
1156 pub sql: String,
1157 pub flow_options: HashMap<String, String>,
1158}
1159
1160impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1161 type Error = error::Error;
1162
1163 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1164 let CreateFlowExpr {
1165 catalog_name,
1166 flow_name,
1167 source_table_names,
1168 sink_table_name,
1169 or_replace,
1170 create_if_not_exists,
1171 expire_after,
1172 eval_interval,
1173 comment,
1174 sql,
1175 flow_options,
1176 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1177 err_msg: "expected create_flow",
1178 })?;
1179
1180 Ok(CreateFlowTask {
1181 catalog_name,
1182 flow_name,
1183 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1184 sink_table_name: sink_table_name
1185 .context(error::InvalidProtoMsgSnafu {
1186 err_msg: "expected sink_table_name",
1187 })?
1188 .into(),
1189 or_replace,
1190 create_if_not_exists,
1191 expire_after: expire_after.map(|e| e.value),
1192 eval_interval_secs: eval_interval.map(|e| e.seconds),
1193 comment,
1194 sql,
1195 flow_options,
1196 })
1197 }
1198}
1199
1200impl From<CreateFlowTask> for PbCreateFlowTask {
1201 fn from(
1202 CreateFlowTask {
1203 catalog_name,
1204 flow_name,
1205 source_table_names,
1206 sink_table_name,
1207 or_replace,
1208 create_if_not_exists,
1209 expire_after,
1210 eval_interval_secs: eval_interval,
1211 comment,
1212 sql,
1213 flow_options,
1214 }: CreateFlowTask,
1215 ) -> Self {
1216 PbCreateFlowTask {
1217 create_flow: Some(CreateFlowExpr {
1218 catalog_name,
1219 flow_name,
1220 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1221 sink_table_name: Some(sink_table_name.into()),
1222 or_replace,
1223 create_if_not_exists,
1224 expire_after: expire_after.map(|value| ExpireAfter { value }),
1225 eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1226 comment,
1227 sql,
1228 flow_options,
1229 }),
1230 }
1231 }
1232}
1233
1234#[derive(Debug, Clone, Serialize, Deserialize)]
1236pub struct DropFlowTask {
1237 pub catalog_name: String,
1238 pub flow_name: String,
1239 pub flow_id: FlowId,
1240 pub drop_if_exists: bool,
1241}
1242
1243impl TryFrom<PbDropFlowTask> for DropFlowTask {
1244 type Error = error::Error;
1245
1246 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1247 let DropFlowExpr {
1248 catalog_name,
1249 flow_name,
1250 flow_id,
1251 drop_if_exists,
1252 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1253 err_msg: "expected drop_flow",
1254 })?;
1255 let flow_id = flow_id
1256 .context(error::InvalidProtoMsgSnafu {
1257 err_msg: "expected flow_id",
1258 })?
1259 .id;
1260 Ok(DropFlowTask {
1261 catalog_name,
1262 flow_name,
1263 flow_id,
1264 drop_if_exists,
1265 })
1266 }
1267}
1268
1269impl From<DropFlowTask> for PbDropFlowTask {
1270 fn from(
1271 DropFlowTask {
1272 catalog_name,
1273 flow_name,
1274 flow_id,
1275 drop_if_exists,
1276 }: DropFlowTask,
1277 ) -> Self {
1278 PbDropFlowTask {
1279 drop_flow: Some(DropFlowExpr {
1280 catalog_name,
1281 flow_name,
1282 flow_id: Some(api::v1::FlowId { id: flow_id }),
1283 drop_if_exists,
1284 }),
1285 }
1286 }
1287}
1288
1289#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1291pub enum CommentObjectId {
1292 Table(TableId),
1293 Flow(FlowId),
1294}
1295
1296#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1298pub struct CommentOnTask {
1299 pub catalog_name: String,
1300 pub schema_name: String,
1301 pub object_type: CommentObjectType,
1302 pub object_name: String,
1303 pub column_name: Option<String>,
1305 pub object_id: Option<CommentObjectId>,
1307 pub comment: Option<String>,
1308}
1309
1310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1311pub enum CommentObjectType {
1312 Table,
1313 Column,
1314 Flow,
1315}
1316
1317impl CommentOnTask {
1318 pub fn table_ref(&self) -> TableReference<'_> {
1319 TableReference {
1320 catalog: &self.catalog_name,
1321 schema: &self.schema_name,
1322 table: &self.object_name,
1323 }
1324 }
1325}
1326
1327impl From<CommentObjectType> for PbCommentObjectType {
1329 fn from(object_type: CommentObjectType) -> Self {
1330 match object_type {
1331 CommentObjectType::Table => PbCommentObjectType::Table,
1332 CommentObjectType::Column => PbCommentObjectType::Column,
1333 CommentObjectType::Flow => PbCommentObjectType::Flow,
1334 }
1335 }
1336}
1337
1338impl TryFrom<i32> for CommentObjectType {
1339 type Error = error::Error;
1340
1341 fn try_from(value: i32) -> Result<Self> {
1342 match value {
1343 0 => Ok(CommentObjectType::Table),
1344 1 => Ok(CommentObjectType::Column),
1345 2 => Ok(CommentObjectType::Flow),
1346 _ => error::InvalidProtoMsgSnafu {
1347 err_msg: format!(
1348 "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1349 value
1350 ),
1351 }
1352 .fail(),
1353 }
1354 }
1355}
1356
1357impl TryFrom<PbCommentOnTask> for CommentOnTask {
1359 type Error = error::Error;
1360
1361 fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1362 let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1363 err_msg: "expected comment_on",
1364 })?;
1365
1366 Ok(CommentOnTask {
1367 catalog_name: comment_on.catalog_name,
1368 schema_name: comment_on.schema_name,
1369 object_type: comment_on.object_type.try_into()?,
1370 object_name: comment_on.object_name,
1371 column_name: if comment_on.column_name.is_empty() {
1372 None
1373 } else {
1374 Some(comment_on.column_name)
1375 },
1376 comment: if comment_on.comment.is_empty() {
1377 None
1378 } else {
1379 Some(comment_on.comment)
1380 },
1381 object_id: None,
1382 })
1383 }
1384}
1385
1386impl From<CommentOnTask> for PbCommentOnTask {
1387 fn from(task: CommentOnTask) -> Self {
1388 let pb_object_type: PbCommentObjectType = task.object_type.into();
1389 PbCommentOnTask {
1390 comment_on: Some(CommentOnExpr {
1391 catalog_name: task.catalog_name,
1392 schema_name: task.schema_name,
1393 object_type: pb_object_type as i32,
1394 object_name: task.object_name,
1395 column_name: task.column_name.unwrap_or_default(),
1396 comment: task.comment.unwrap_or_default(),
1397 }),
1398 }
1399 }
1400}
1401
1402#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1403pub struct QueryContext {
1404 pub(crate) current_catalog: String,
1405 pub(crate) current_schema: String,
1406 pub(crate) timezone: String,
1407 pub(crate) extensions: HashMap<String, String>,
1408 pub(crate) channel: u8,
1409}
1410
1411impl QueryContext {
1412 pub fn current_catalog(&self) -> &str {
1414 &self.current_catalog
1415 }
1416
1417 pub fn current_schema(&self) -> &str {
1419 &self.current_schema
1420 }
1421
1422 pub fn timezone(&self) -> &str {
1424 &self.timezone
1425 }
1426
1427 pub fn extensions(&self) -> &HashMap<String, String> {
1429 &self.extensions
1430 }
1431
1432 pub fn channel(&self) -> u8 {
1434 self.channel
1435 }
1436}
1437
1438#[derive(Debug, Clone, Serialize, PartialEq)]
1442pub struct FlowQueryContext {
1443 pub(crate) catalog: String,
1445 pub(crate) schema: String,
1447 pub(crate) timezone: String,
1449}
1450
1451impl<'de> Deserialize<'de> for FlowQueryContext {
1452 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1453 where
1454 D: serde::Deserializer<'de>,
1455 {
1456 #[derive(Deserialize)]
1458 #[serde(untagged)]
1459 enum ContextCompat {
1460 Flow(FlowQueryContextHelper),
1461 Full(QueryContext),
1462 }
1463
1464 #[derive(Deserialize)]
1465 struct FlowQueryContextHelper {
1466 catalog: String,
1467 schema: String,
1468 timezone: String,
1469 }
1470
1471 match ContextCompat::deserialize(deserializer)? {
1472 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1473 catalog: helper.catalog,
1474 schema: helper.schema,
1475 timezone: helper.timezone,
1476 }),
1477 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1478 }
1479 }
1480}
1481
1482impl From<QueryContextRef> for QueryContext {
1483 fn from(query_context: QueryContextRef) -> Self {
1484 QueryContext {
1485 current_catalog: query_context.current_catalog().to_string(),
1486 current_schema: query_context.current_schema().clone(),
1487 timezone: query_context.timezone().to_string(),
1488 extensions: query_context.extensions(),
1489 channel: query_context.channel() as u8,
1490 }
1491 }
1492}
1493
1494impl TryFrom<QueryContext> for session::context::QueryContext {
1495 type Error = error::Error;
1496 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1497 Ok(QueryContextBuilder::default()
1498 .current_catalog(value.current_catalog)
1499 .current_schema(value.current_schema)
1500 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1501 .extensions(value.extensions)
1502 .channel((value.channel as u32).into())
1503 .build())
1504 }
1505}
1506
1507impl From<QueryContext> for PbQueryContext {
1508 fn from(
1509 QueryContext {
1510 current_catalog,
1511 current_schema,
1512 timezone,
1513 extensions,
1514 channel,
1515 }: QueryContext,
1516 ) -> Self {
1517 PbQueryContext {
1518 current_catalog,
1519 current_schema,
1520 timezone,
1521 extensions,
1522 channel: channel as u32,
1523 snapshot_seqs: None,
1524 explain: None,
1525 }
1526 }
1527}
1528
1529impl From<QueryContext> for FlowQueryContext {
1530 fn from(ctx: QueryContext) -> Self {
1531 Self {
1532 catalog: ctx.current_catalog,
1533 schema: ctx.current_schema,
1534 timezone: ctx.timezone,
1535 }
1536 }
1537}
1538
1539impl From<QueryContextRef> for FlowQueryContext {
1540 fn from(ctx: QueryContextRef) -> Self {
1541 Self {
1542 catalog: ctx.current_catalog().to_string(),
1543 schema: ctx.current_schema().clone(),
1544 timezone: ctx.timezone().to_string(),
1545 }
1546 }
1547}
1548
1549impl From<FlowQueryContext> for QueryContext {
1550 fn from(flow_ctx: FlowQueryContext) -> Self {
1551 Self {
1552 current_catalog: flow_ctx.catalog,
1553 current_schema: flow_ctx.schema,
1554 timezone: flow_ctx.timezone,
1555 extensions: HashMap::new(),
1556 channel: 0, }
1558 }
1559}
1560
1561impl From<FlowQueryContext> for PbQueryContext {
1562 fn from(flow_ctx: FlowQueryContext) -> Self {
1563 let query_ctx: QueryContext = flow_ctx.into();
1564 query_ctx.into()
1565 }
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570 use std::sync::Arc;
1571
1572 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1573 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1574 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1575 use store_api::storage::ConcreteDataType;
1576 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1577 use table::test_util::table_info::test_table_info;
1578
1579 use super::{AlterTableTask, CreateTableTask, *};
1580
1581 #[test]
1582 fn test_basic_ser_de_create_table_task() {
1583 let schema = SchemaBuilder::default().build().unwrap();
1584 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1585 let task = CreateTableTask::new(
1586 CreateTableExpr::default(),
1587 Vec::new(),
1588 RawTableInfo::from(table_info),
1589 );
1590
1591 let output = serde_json::to_vec(&task).unwrap();
1592
1593 let de = serde_json::from_slice(&output).unwrap();
1594 assert_eq!(task, de);
1595 }
1596
1597 #[test]
1598 fn test_basic_ser_de_alter_table_task() {
1599 let task = AlterTableTask {
1600 alter_table: AlterTableExpr::default(),
1601 };
1602
1603 let output = serde_json::to_vec(&task).unwrap();
1604
1605 let de = serde_json::from_slice(&output).unwrap();
1606 assert_eq!(task, de);
1607 }
1608
1609 #[test]
1610 fn test_sort_columns() {
1611 let raw_schema = RawSchema {
1613 column_schemas: vec![
1614 ColumnSchema::new(
1615 "column3".to_string(),
1616 ConcreteDataType::string_datatype(),
1617 true,
1618 ),
1619 ColumnSchema::new(
1620 "column1".to_string(),
1621 ConcreteDataType::timestamp_millisecond_datatype(),
1622 false,
1623 )
1624 .with_time_index(true),
1625 ColumnSchema::new(
1626 "column2".to_string(),
1627 ConcreteDataType::float64_datatype(),
1628 true,
1629 ),
1630 ],
1631 timestamp_index: Some(1),
1632 version: 0,
1633 };
1634
1635 let raw_table_meta = RawTableMeta {
1637 schema: raw_schema,
1638 primary_key_indices: vec![0],
1639 value_indices: vec![2],
1640 engine: METRIC_ENGINE_NAME.to_string(),
1641 next_column_id: 0,
1642 options: Default::default(),
1643 created_on: Default::default(),
1644 updated_on: Default::default(),
1645 partition_key_indices: Default::default(),
1646 column_ids: Default::default(),
1647 };
1648
1649 let raw_table_info = RawTableInfo {
1651 ident: Default::default(),
1652 meta: raw_table_meta,
1653 name: Default::default(),
1654 desc: Default::default(),
1655 catalog_name: Default::default(),
1656 schema_name: Default::default(),
1657 table_type: TableType::Base,
1658 };
1659
1660 let create_table_expr = CreateTableExpr {
1662 column_defs: vec![
1663 ColumnDef {
1664 name: "column3".to_string(),
1665 semantic_type: SemanticType::Tag as i32,
1666 ..Default::default()
1667 },
1668 ColumnDef {
1669 name: "column1".to_string(),
1670 semantic_type: SemanticType::Timestamp as i32,
1671 ..Default::default()
1672 },
1673 ColumnDef {
1674 name: "column2".to_string(),
1675 semantic_type: SemanticType::Field as i32,
1676 ..Default::default()
1677 },
1678 ],
1679 primary_keys: vec!["column3".to_string()],
1680 ..Default::default()
1681 };
1682
1683 let mut create_table_task =
1684 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1685
1686 create_table_task.sort_columns();
1688
1689 assert_eq!(
1691 create_table_task.create_table.column_defs[0].name,
1692 "column1".to_string()
1693 );
1694 assert_eq!(
1695 create_table_task.create_table.column_defs[1].name,
1696 "column2".to_string()
1697 );
1698 assert_eq!(
1699 create_table_task.create_table.column_defs[2].name,
1700 "column3".to_string()
1701 );
1702
1703 assert_eq!(
1705 create_table_task.table_info.meta.schema.timestamp_index,
1706 Some(0)
1707 );
1708 assert_eq!(
1709 create_table_task.table_info.meta.primary_key_indices,
1710 vec![2]
1711 );
1712 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1713 }
1714
1715 #[test]
1716 fn test_flow_query_context_conversion_from_query_context() {
1717 use std::collections::HashMap;
1718 let mut extensions = HashMap::new();
1719 extensions.insert("key1".to_string(), "value1".to_string());
1720 extensions.insert("key2".to_string(), "value2".to_string());
1721
1722 let query_ctx = QueryContext {
1723 current_catalog: "test_catalog".to_string(),
1724 current_schema: "test_schema".to_string(),
1725 timezone: "UTC".to_string(),
1726 extensions,
1727 channel: 5,
1728 };
1729
1730 let flow_ctx: FlowQueryContext = query_ctx.into();
1731
1732 assert_eq!(flow_ctx.catalog, "test_catalog");
1733 assert_eq!(flow_ctx.schema, "test_schema");
1734 assert_eq!(flow_ctx.timezone, "UTC");
1735 }
1736
1737 #[test]
1738 fn test_flow_query_context_conversion_to_query_context() {
1739 let flow_ctx = FlowQueryContext {
1740 catalog: "prod_catalog".to_string(),
1741 schema: "public".to_string(),
1742 timezone: "America/New_York".to_string(),
1743 };
1744
1745 let query_ctx: QueryContext = flow_ctx.clone().into();
1746
1747 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1748 assert_eq!(query_ctx.current_schema, "public");
1749 assert_eq!(query_ctx.timezone, "America/New_York");
1750 assert!(query_ctx.extensions.is_empty());
1751 assert_eq!(query_ctx.channel, 0);
1752
1753 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1755 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1756 }
1757
1758 #[test]
1759 fn test_flow_query_context_conversion_from_query_context_ref() {
1760 use common_time::Timezone;
1761 use session::context::QueryContextBuilder;
1762
1763 let session_ctx = QueryContextBuilder::default()
1764 .current_catalog("session_catalog".to_string())
1765 .current_schema("session_schema".to_string())
1766 .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1767 .build();
1768
1769 let session_ctx_ref = Arc::new(session_ctx);
1770 let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1771
1772 assert_eq!(flow_ctx.catalog, "session_catalog");
1773 assert_eq!(flow_ctx.schema, "session_schema");
1774 assert_eq!(flow_ctx.timezone, "Europe/London");
1775 }
1776
1777 #[test]
1778 fn test_flow_query_context_serialization() {
1779 let flow_ctx = FlowQueryContext {
1780 catalog: "test_catalog".to_string(),
1781 schema: "test_schema".to_string(),
1782 timezone: "UTC".to_string(),
1783 };
1784
1785 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1786 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1787
1788 assert_eq!(flow_ctx, deserialized);
1789
1790 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1792 assert_eq!(json_value["catalog"], "test_catalog");
1793 assert_eq!(json_value["schema"], "test_schema");
1794 assert_eq!(json_value["timezone"], "UTC");
1795 }
1796
1797 #[test]
1798 fn test_flow_query_context_conversion_to_pb() {
1799 let flow_ctx = FlowQueryContext {
1800 catalog: "pb_catalog".to_string(),
1801 schema: "pb_schema".to_string(),
1802 timezone: "Asia/Tokyo".to_string(),
1803 };
1804
1805 let pb_ctx: PbQueryContext = flow_ctx.into();
1806
1807 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1808 assert_eq!(pb_ctx.current_schema, "pb_schema");
1809 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1810 assert!(pb_ctx.extensions.is_empty());
1811 assert_eq!(pb_ctx.channel, 0);
1812 assert!(pb_ctx.snapshot_seqs.is_none());
1813 assert!(pb_ctx.explain.is_none());
1814 }
1815}