1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26 AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
27 CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
28 CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
29 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
30 DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
31 DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
32 DropViewTask as PbDropViewTask, Partition, ProcedureId,
33 TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36 AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
37 CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval,
38 ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
39};
40use base64::Engine as _;
41use base64::engine::general_purpose;
42use common_error::ext::BoxedError;
43use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use serde_with::{DefaultOnNull, serde_as};
47use session::context::{QueryContextBuilder, QueryContextRef};
48use snafu::{OptionExt, ResultExt};
49use table::metadata::{RawTableInfo, TableId};
50use table::requests::validate_database_option;
51use table::table_name::TableName;
52use table::table_reference::TableReference;
53
54use crate::error::{
55 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
56 InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
57};
58use crate::key::FlowId;
59
60#[derive(Debug, Clone)]
62pub enum DdlTask {
63 CreateTable(CreateTableTask),
64 DropTable(DropTableTask),
65 AlterTable(AlterTableTask),
66 TruncateTable(TruncateTableTask),
67 CreateLogicalTables(Vec<CreateTableTask>),
68 DropLogicalTables(Vec<DropTableTask>),
69 AlterLogicalTables(Vec<AlterTableTask>),
70 CreateDatabase(CreateDatabaseTask),
71 DropDatabase(DropDatabaseTask),
72 AlterDatabase(AlterDatabaseTask),
73 CreateFlow(CreateFlowTask),
74 DropFlow(DropFlowTask),
75 #[cfg(feature = "enterprise")]
76 DropTrigger(trigger::DropTriggerTask),
77 CreateView(CreateViewTask),
78 DropView(DropViewTask),
79 #[cfg(feature = "enterprise")]
80 CreateTrigger(trigger::CreateTriggerTask),
81}
82
83impl DdlTask {
84 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
86 DdlTask::CreateFlow(expr)
87 }
88
89 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
91 DdlTask::DropFlow(expr)
92 }
93
94 pub fn new_drop_view(expr: DropViewTask) -> Self {
96 DdlTask::DropView(expr)
97 }
98
99 pub fn new_create_table(
101 expr: CreateTableExpr,
102 partitions: Vec<Partition>,
103 table_info: RawTableInfo,
104 ) -> Self {
105 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
106 }
107
108 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
110 DdlTask::CreateLogicalTables(
111 table_data
112 .into_iter()
113 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
114 .collect(),
115 )
116 }
117
118 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
120 DdlTask::AlterLogicalTables(
121 table_data
122 .into_iter()
123 .map(|alter_table| AlterTableTask { alter_table })
124 .collect(),
125 )
126 }
127
128 pub fn new_drop_table(
130 catalog: String,
131 schema: String,
132 table: String,
133 table_id: TableId,
134 drop_if_exists: bool,
135 ) -> Self {
136 DdlTask::DropTable(DropTableTask {
137 catalog,
138 schema,
139 table,
140 table_id,
141 drop_if_exists,
142 })
143 }
144
145 pub fn new_create_database(
147 catalog: String,
148 schema: String,
149 create_if_not_exists: bool,
150 options: HashMap<String, String>,
151 ) -> Self {
152 DdlTask::CreateDatabase(CreateDatabaseTask {
153 catalog,
154 schema,
155 create_if_not_exists,
156 options,
157 })
158 }
159
160 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
162 DdlTask::DropDatabase(DropDatabaseTask {
163 catalog,
164 schema,
165 drop_if_exists,
166 })
167 }
168
169 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
171 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
172 }
173
174 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
176 DdlTask::AlterTable(AlterTableTask { alter_table })
177 }
178
179 pub fn new_truncate_table(
181 catalog: String,
182 schema: String,
183 table: String,
184 table_id: TableId,
185 time_ranges: Vec<(Timestamp, Timestamp)>,
186 ) -> Self {
187 DdlTask::TruncateTable(TruncateTableTask {
188 catalog,
189 schema,
190 table,
191 table_id,
192 time_ranges,
193 })
194 }
195
196 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
198 DdlTask::CreateView(CreateViewTask {
199 create_view,
200 view_info,
201 })
202 }
203}
204
205impl TryFrom<Task> for DdlTask {
206 type Error = error::Error;
207 fn try_from(task: Task) -> Result<Self> {
208 match task {
209 Task::CreateTableTask(create_table) => {
210 Ok(DdlTask::CreateTable(create_table.try_into()?))
211 }
212 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
213 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
214 Task::TruncateTableTask(truncate_table) => {
215 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
216 }
217 Task::CreateTableTasks(create_tables) => {
218 let tasks = create_tables
219 .tasks
220 .into_iter()
221 .map(|task| task.try_into())
222 .collect::<Result<Vec<_>>>()?;
223
224 Ok(DdlTask::CreateLogicalTables(tasks))
225 }
226 Task::DropTableTasks(drop_tables) => {
227 let tasks = drop_tables
228 .tasks
229 .into_iter()
230 .map(|task| task.try_into())
231 .collect::<Result<Vec<_>>>()?;
232
233 Ok(DdlTask::DropLogicalTables(tasks))
234 }
235 Task::AlterTableTasks(alter_tables) => {
236 let tasks = alter_tables
237 .tasks
238 .into_iter()
239 .map(|task| task.try_into())
240 .collect::<Result<Vec<_>>>()?;
241
242 Ok(DdlTask::AlterLogicalTables(tasks))
243 }
244 Task::CreateDatabaseTask(create_database) => {
245 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
246 }
247 Task::DropDatabaseTask(drop_database) => {
248 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
249 }
250 Task::AlterDatabaseTask(alter_database) => {
251 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
252 }
253 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
254 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
255 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
256 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
257 Task::CreateTriggerTask(create_trigger) => {
258 #[cfg(feature = "enterprise")]
259 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
260 #[cfg(not(feature = "enterprise"))]
261 {
262 let _ = create_trigger;
263 crate::error::UnsupportedSnafu {
264 operation: "create trigger",
265 }
266 .fail()
267 }
268 }
269 Task::DropTriggerTask(drop_trigger) => {
270 #[cfg(feature = "enterprise")]
271 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
272 #[cfg(not(feature = "enterprise"))]
273 {
274 let _ = drop_trigger;
275 crate::error::UnsupportedSnafu {
276 operation: "drop trigger",
277 }
278 .fail()
279 }
280 }
281 }
282 }
283}
284
285#[derive(Clone)]
286pub struct SubmitDdlTaskRequest {
287 pub query_context: QueryContextRef,
288 pub task: DdlTask,
289}
290
291impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
292 type Error = error::Error;
293
294 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
295 let task = match request.task {
296 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
297 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
298 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
299 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
300 DdlTask::CreateLogicalTables(tasks) => {
301 let tasks = tasks
302 .into_iter()
303 .map(|task| task.try_into())
304 .collect::<Result<Vec<_>>>()?;
305
306 Task::CreateTableTasks(PbCreateTableTasks { tasks })
307 }
308 DdlTask::DropLogicalTables(tasks) => {
309 let tasks = tasks
310 .into_iter()
311 .map(|task| task.into())
312 .collect::<Vec<_>>();
313
314 Task::DropTableTasks(PbDropTableTasks { tasks })
315 }
316 DdlTask::AlterLogicalTables(tasks) => {
317 let tasks = tasks
318 .into_iter()
319 .map(|task| task.try_into())
320 .collect::<Result<Vec<_>>>()?;
321
322 Task::AlterTableTasks(PbAlterTableTasks { tasks })
323 }
324 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
325 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
326 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
327 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
328 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
329 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
330 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
331 #[cfg(feature = "enterprise")]
332 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
333 #[cfg(feature = "enterprise")]
334 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
335 };
336
337 Ok(Self {
338 header: None,
339 query_context: Some((*request.query_context).clone().into()),
340 task: Some(task),
341 })
342 }
343}
344
345#[derive(Debug, Default)]
346pub struct SubmitDdlTaskResponse {
347 pub key: Vec<u8>,
348 pub table_ids: Vec<TableId>,
350}
351
352impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
353 type Error = error::Error;
354
355 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
356 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
357 Ok(Self {
358 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
359 table_ids,
360 })
361 }
362}
363
364impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
365 fn from(val: SubmitDdlTaskResponse) -> Self {
366 Self {
367 pid: Some(ProcedureId { key: val.key }),
368 table_ids: val
369 .table_ids
370 .into_iter()
371 .map(|id| api::v1::TableId { id })
372 .collect(),
373 ..Default::default()
374 }
375 }
376}
377
378#[derive(Debug, PartialEq, Clone)]
380pub struct CreateViewTask {
381 pub create_view: CreateViewExpr,
382 pub view_info: RawTableInfo,
383}
384
385impl CreateViewTask {
386 pub fn table_ref(&self) -> TableReference<'_> {
388 TableReference {
389 catalog: &self.create_view.catalog_name,
390 schema: &self.create_view.schema_name,
391 table: &self.create_view.view_name,
392 }
393 }
394
395 pub fn raw_logical_plan(&self) -> &Vec<u8> {
397 &self.create_view.logical_plan
398 }
399
400 pub fn view_definition(&self) -> &str {
402 &self.create_view.definition
403 }
404
405 pub fn table_names(&self) -> HashSet<TableName> {
407 self.create_view
408 .table_names
409 .iter()
410 .map(|t| t.clone().into())
411 .collect()
412 }
413
414 pub fn columns(&self) -> &Vec<String> {
416 &self.create_view.columns
417 }
418
419 pub fn plan_columns(&self) -> &Vec<String> {
421 &self.create_view.plan_columns
422 }
423}
424
425impl TryFrom<PbCreateViewTask> for CreateViewTask {
426 type Error = error::Error;
427
428 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
429 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
430
431 Ok(CreateViewTask {
432 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
433 err_msg: "expected create view",
434 })?,
435 view_info,
436 })
437 }
438}
439
440impl TryFrom<CreateViewTask> for PbCreateViewTask {
441 type Error = error::Error;
442
443 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
444 Ok(PbCreateViewTask {
445 create_view: Some(task.create_view),
446 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
447 })
448 }
449}
450
451impl Serialize for CreateViewTask {
452 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
453 where
454 S: serde::Serializer,
455 {
456 let view_info = serde_json::to_vec(&self.view_info)
457 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
458
459 let pb = PbCreateViewTask {
460 create_view: Some(self.create_view.clone()),
461 view_info,
462 };
463 let buf = pb.encode_to_vec();
464 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
465 serializer.serialize_str(&encoded)
466 }
467}
468
469impl<'de> Deserialize<'de> for CreateViewTask {
470 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
471 where
472 D: serde::Deserializer<'de>,
473 {
474 let encoded = String::deserialize(deserializer)?;
475 let buf = general_purpose::STANDARD_NO_PAD
476 .decode(encoded)
477 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
478 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
479 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
480
481 let expr = CreateViewTask::try_from(expr)
482 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
483
484 Ok(expr)
485 }
486}
487
488#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
490pub struct DropViewTask {
491 pub catalog: String,
492 pub schema: String,
493 pub view: String,
494 pub view_id: TableId,
495 pub drop_if_exists: bool,
496}
497
498impl DropViewTask {
499 pub fn table_ref(&self) -> TableReference<'_> {
501 TableReference {
502 catalog: &self.catalog,
503 schema: &self.schema,
504 table: &self.view,
505 }
506 }
507}
508
509impl TryFrom<PbDropViewTask> for DropViewTask {
510 type Error = error::Error;
511
512 fn try_from(pb: PbDropViewTask) -> Result<Self> {
513 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
514 err_msg: "expected drop view",
515 })?;
516
517 Ok(DropViewTask {
518 catalog: expr.catalog_name,
519 schema: expr.schema_name,
520 view: expr.view_name,
521 view_id: expr
522 .view_id
523 .context(error::InvalidProtoMsgSnafu {
524 err_msg: "expected view_id",
525 })?
526 .id,
527 drop_if_exists: expr.drop_if_exists,
528 })
529 }
530}
531
532impl From<DropViewTask> for PbDropViewTask {
533 fn from(task: DropViewTask) -> Self {
534 PbDropViewTask {
535 drop_view: Some(DropViewExpr {
536 catalog_name: task.catalog,
537 schema_name: task.schema,
538 view_name: task.view,
539 view_id: Some(api::v1::TableId { id: task.view_id }),
540 drop_if_exists: task.drop_if_exists,
541 }),
542 }
543 }
544}
545
546#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
547pub struct DropTableTask {
548 pub catalog: String,
549 pub schema: String,
550 pub table: String,
551 pub table_id: TableId,
552 #[serde(default)]
553 pub drop_if_exists: bool,
554}
555
556impl DropTableTask {
557 pub fn table_ref(&self) -> TableReference<'_> {
558 TableReference {
559 catalog: &self.catalog,
560 schema: &self.schema,
561 table: &self.table,
562 }
563 }
564
565 pub fn table_name(&self) -> TableName {
566 TableName {
567 catalog_name: self.catalog.clone(),
568 schema_name: self.schema.clone(),
569 table_name: self.table.clone(),
570 }
571 }
572}
573
574impl TryFrom<PbDropTableTask> for DropTableTask {
575 type Error = error::Error;
576
577 fn try_from(pb: PbDropTableTask) -> Result<Self> {
578 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
579 err_msg: "expected drop table",
580 })?;
581
582 Ok(Self {
583 catalog: drop_table.catalog_name,
584 schema: drop_table.schema_name,
585 table: drop_table.table_name,
586 table_id: drop_table
587 .table_id
588 .context(error::InvalidProtoMsgSnafu {
589 err_msg: "expected table_id",
590 })?
591 .id,
592 drop_if_exists: drop_table.drop_if_exists,
593 })
594 }
595}
596
597impl From<DropTableTask> for PbDropTableTask {
598 fn from(task: DropTableTask) -> Self {
599 PbDropTableTask {
600 drop_table: Some(DropTableExpr {
601 catalog_name: task.catalog,
602 schema_name: task.schema,
603 table_name: task.table,
604 table_id: Some(api::v1::TableId { id: task.table_id }),
605 drop_if_exists: task.drop_if_exists,
606 }),
607 }
608 }
609}
610
611#[derive(Debug, PartialEq, Clone)]
612pub struct CreateTableTask {
613 pub create_table: CreateTableExpr,
614 pub partitions: Vec<Partition>,
615 pub table_info: RawTableInfo,
616}
617
618impl TryFrom<PbCreateTableTask> for CreateTableTask {
619 type Error = error::Error;
620
621 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
622 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
623
624 Ok(CreateTableTask::new(
625 pb.create_table.context(error::InvalidProtoMsgSnafu {
626 err_msg: "expected create table",
627 })?,
628 pb.partitions,
629 table_info,
630 ))
631 }
632}
633
634impl TryFrom<CreateTableTask> for PbCreateTableTask {
635 type Error = error::Error;
636
637 fn try_from(task: CreateTableTask) -> Result<Self> {
638 Ok(PbCreateTableTask {
639 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
640 create_table: Some(task.create_table),
641 partitions: task.partitions,
642 })
643 }
644}
645
646impl CreateTableTask {
647 pub fn new(
648 expr: CreateTableExpr,
649 partitions: Vec<Partition>,
650 table_info: RawTableInfo,
651 ) -> CreateTableTask {
652 CreateTableTask {
653 create_table: expr,
654 partitions,
655 table_info,
656 }
657 }
658
659 pub fn table_name(&self) -> TableName {
660 let table = &self.create_table;
661
662 TableName {
663 catalog_name: table.catalog_name.clone(),
664 schema_name: table.schema_name.clone(),
665 table_name: table.table_name.clone(),
666 }
667 }
668
669 pub fn table_ref(&self) -> TableReference<'_> {
670 let table = &self.create_table;
671
672 TableReference {
673 catalog: &table.catalog_name,
674 schema: &table.schema_name,
675 table: &table.table_name,
676 }
677 }
678
679 pub fn set_table_id(&mut self, table_id: TableId) {
681 self.table_info.ident.table_id = table_id;
682 }
683
684 pub fn sort_columns(&mut self) {
689 self.create_table
692 .column_defs
693 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
694
695 self.table_info.sort_columns();
696 }
697}
698
699impl Serialize for CreateTableTask {
700 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
701 where
702 S: serde::Serializer,
703 {
704 let table_info = serde_json::to_vec(&self.table_info)
705 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
706
707 let pb = PbCreateTableTask {
708 create_table: Some(self.create_table.clone()),
709 partitions: self.partitions.clone(),
710 table_info,
711 };
712 let buf = pb.encode_to_vec();
713 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
714 serializer.serialize_str(&encoded)
715 }
716}
717
718impl<'de> Deserialize<'de> for CreateTableTask {
719 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
720 where
721 D: serde::Deserializer<'de>,
722 {
723 let encoded = String::deserialize(deserializer)?;
724 let buf = general_purpose::STANDARD_NO_PAD
725 .decode(encoded)
726 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
727 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
728 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
729
730 let expr = CreateTableTask::try_from(expr)
731 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
732
733 Ok(expr)
734 }
735}
736
737#[derive(Debug, PartialEq, Clone)]
738pub struct AlterTableTask {
739 pub alter_table: AlterTableExpr,
741}
742
743impl AlterTableTask {
744 pub fn validate(&self) -> Result<()> {
745 self.alter_table
746 .kind
747 .as_ref()
748 .context(error::UnexpectedSnafu {
749 err_msg: "'kind' is absent",
750 })?;
751 Ok(())
752 }
753
754 pub fn table_ref(&self) -> TableReference<'_> {
755 TableReference {
756 catalog: &self.alter_table.catalog_name,
757 schema: &self.alter_table.schema_name,
758 table: &self.alter_table.table_name,
759 }
760 }
761
762 pub fn table_name(&self) -> TableName {
763 let table = &self.alter_table;
764
765 TableName {
766 catalog_name: table.catalog_name.clone(),
767 schema_name: table.schema_name.clone(),
768 table_name: table.table_name.clone(),
769 }
770 }
771}
772
773impl TryFrom<PbAlterTableTask> for AlterTableTask {
774 type Error = error::Error;
775
776 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
777 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
778 err_msg: "expected alter_table",
779 })?;
780
781 Ok(AlterTableTask { alter_table })
782 }
783}
784
785impl TryFrom<AlterTableTask> for PbAlterTableTask {
786 type Error = error::Error;
787
788 fn try_from(task: AlterTableTask) -> Result<Self> {
789 Ok(PbAlterTableTask {
790 alter_table: Some(task.alter_table),
791 })
792 }
793}
794
795impl Serialize for AlterTableTask {
796 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
797 where
798 S: serde::Serializer,
799 {
800 let pb = PbAlterTableTask {
801 alter_table: Some(self.alter_table.clone()),
802 };
803 let buf = pb.encode_to_vec();
804 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
805 serializer.serialize_str(&encoded)
806 }
807}
808
809impl<'de> Deserialize<'de> for AlterTableTask {
810 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
811 where
812 D: serde::Deserializer<'de>,
813 {
814 let encoded = String::deserialize(deserializer)?;
815 let buf = general_purpose::STANDARD_NO_PAD
816 .decode(encoded)
817 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
818 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
819 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
820
821 let expr = AlterTableTask::try_from(expr)
822 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
823
824 Ok(expr)
825 }
826}
827
828#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
829pub struct TruncateTableTask {
830 pub catalog: String,
831 pub schema: String,
832 pub table: String,
833 pub table_id: TableId,
834 pub time_ranges: Vec<(Timestamp, Timestamp)>,
835}
836
837impl TruncateTableTask {
838 pub fn table_ref(&self) -> TableReference<'_> {
839 TableReference {
840 catalog: &self.catalog,
841 schema: &self.schema,
842 table: &self.table,
843 }
844 }
845
846 pub fn table_name(&self) -> TableName {
847 TableName {
848 catalog_name: self.catalog.clone(),
849 schema_name: self.schema.clone(),
850 table_name: self.table.clone(),
851 }
852 }
853}
854
855impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
856 type Error = error::Error;
857
858 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
859 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
860 err_msg: "expected truncate table",
861 })?;
862
863 Ok(Self {
864 catalog: truncate_table.catalog_name,
865 schema: truncate_table.schema_name,
866 table: truncate_table.table_name,
867 table_id: truncate_table
868 .table_id
869 .context(error::InvalidProtoMsgSnafu {
870 err_msg: "expected table_id",
871 })?
872 .id,
873 time_ranges: truncate_table
874 .time_ranges
875 .map(from_pb_time_ranges)
876 .transpose()
877 .map_err(BoxedError::new)
878 .context(ExternalSnafu)?
879 .unwrap_or_default(),
880 })
881 }
882}
883
884impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
885 type Error = error::Error;
886
887 fn try_from(task: TruncateTableTask) -> Result<Self> {
888 Ok(PbTruncateTableTask {
889 truncate_table: Some(TruncateTableExpr {
890 catalog_name: task.catalog,
891 schema_name: task.schema,
892 table_name: task.table,
893 table_id: Some(api::v1::TableId { id: task.table_id }),
894 time_ranges: Some(
895 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
896 ),
897 }),
898 })
899 }
900}
901
902#[serde_as]
903#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
904pub struct CreateDatabaseTask {
905 pub catalog: String,
906 pub schema: String,
907 pub create_if_not_exists: bool,
908 #[serde_as(deserialize_as = "DefaultOnNull")]
909 pub options: HashMap<String, String>,
910}
911
912impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
913 type Error = error::Error;
914
915 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
916 let CreateDatabaseExpr {
917 catalog_name,
918 schema_name,
919 create_if_not_exists,
920 options,
921 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
922 err_msg: "expected create database",
923 })?;
924
925 Ok(CreateDatabaseTask {
926 catalog: catalog_name,
927 schema: schema_name,
928 create_if_not_exists,
929 options,
930 })
931 }
932}
933
934impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
935 type Error = error::Error;
936
937 fn try_from(
938 CreateDatabaseTask {
939 catalog,
940 schema,
941 create_if_not_exists,
942 options,
943 }: CreateDatabaseTask,
944 ) -> Result<Self> {
945 Ok(PbCreateDatabaseTask {
946 create_database: Some(CreateDatabaseExpr {
947 catalog_name: catalog,
948 schema_name: schema,
949 create_if_not_exists,
950 options,
951 }),
952 })
953 }
954}
955
956#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
957pub struct DropDatabaseTask {
958 pub catalog: String,
959 pub schema: String,
960 pub drop_if_exists: bool,
961}
962
963impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
964 type Error = error::Error;
965
966 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
967 let DropDatabaseExpr {
968 catalog_name,
969 schema_name,
970 drop_if_exists,
971 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
972 err_msg: "expected drop database",
973 })?;
974
975 Ok(DropDatabaseTask {
976 catalog: catalog_name,
977 schema: schema_name,
978 drop_if_exists,
979 })
980 }
981}
982
983impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
984 type Error = error::Error;
985
986 fn try_from(
987 DropDatabaseTask {
988 catalog,
989 schema,
990 drop_if_exists,
991 }: DropDatabaseTask,
992 ) -> Result<Self> {
993 Ok(PbDropDatabaseTask {
994 drop_database: Some(DropDatabaseExpr {
995 catalog_name: catalog,
996 schema_name: schema,
997 drop_if_exists,
998 }),
999 })
1000 }
1001}
1002
1003#[derive(Debug, PartialEq, Clone)]
1004pub struct AlterDatabaseTask {
1005 pub alter_expr: AlterDatabaseExpr,
1006}
1007
1008impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1009 type Error = error::Error;
1010
1011 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1012 Ok(PbAlterDatabaseTask {
1013 task: Some(task.alter_expr),
1014 })
1015 }
1016}
1017
1018impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1019 type Error = error::Error;
1020
1021 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1022 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1023 err_msg: "expected alter database",
1024 })?;
1025
1026 Ok(AlterDatabaseTask { alter_expr })
1027 }
1028}
1029
1030impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1031 type Error = error::Error;
1032
1033 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1034 match pb {
1035 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1036 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1037 options
1038 .set_database_options
1039 .into_iter()
1040 .map(SetDatabaseOption::try_from)
1041 .collect::<Result<Vec<_>>>()?,
1042 )))
1043 }
1044 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1045 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1046 options
1047 .keys
1048 .iter()
1049 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1050 .collect::<Result<Vec<_>>>()?,
1051 )),
1052 ),
1053 }
1054 }
1055}
1056
1057const TTL_KEY: &str = "ttl";
1058
1059impl TryFrom<PbOption> for SetDatabaseOption {
1060 type Error = error::Error;
1061
1062 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1063 let key_lower = key.to_ascii_lowercase();
1064 match key_lower.as_str() {
1065 TTL_KEY => {
1066 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1067 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1068
1069 Ok(SetDatabaseOption::Ttl(ttl))
1070 }
1071 _ => {
1072 if validate_database_option(&key_lower) {
1073 Ok(SetDatabaseOption::Other(key_lower, value))
1074 } else {
1075 InvalidSetDatabaseOptionSnafu { key, value }.fail()
1076 }
1077 }
1078 }
1079 }
1080}
1081
1082#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1083pub enum SetDatabaseOption {
1084 Ttl(DatabaseTimeToLive),
1085 Other(String, String),
1086}
1087
1088#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1089pub enum UnsetDatabaseOption {
1090 Ttl,
1091 Other(String),
1092}
1093
1094impl TryFrom<&str> for UnsetDatabaseOption {
1095 type Error = error::Error;
1096
1097 fn try_from(key: &str) -> Result<Self> {
1098 let key_lower = key.to_ascii_lowercase();
1099 match key_lower.as_str() {
1100 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1101 _ => {
1102 if validate_database_option(&key_lower) {
1103 Ok(UnsetDatabaseOption::Other(key_lower))
1104 } else {
1105 InvalidUnsetDatabaseOptionSnafu { key }.fail()
1106 }
1107 }
1108 }
1109 }
1110}
1111
1112#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1113pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1114
1115#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1116pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1117
1118#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1119pub enum AlterDatabaseKind {
1120 SetDatabaseOptions(SetDatabaseOptions),
1121 UnsetDatabaseOptions(UnsetDatabaseOptions),
1122}
1123
1124impl AlterDatabaseTask {
1125 pub fn catalog(&self) -> &str {
1126 &self.alter_expr.catalog_name
1127 }
1128
1129 pub fn schema(&self) -> &str {
1130 &self.alter_expr.catalog_name
1131 }
1132}
1133
1134#[derive(Debug, Clone, Serialize, Deserialize)]
1136pub struct CreateFlowTask {
1137 pub catalog_name: String,
1138 pub flow_name: String,
1139 pub source_table_names: Vec<TableName>,
1140 pub sink_table_name: TableName,
1141 pub or_replace: bool,
1142 pub create_if_not_exists: bool,
1143 pub expire_after: Option<i64>,
1145 pub eval_interval_secs: Option<i64>,
1146 pub comment: String,
1147 pub sql: String,
1148 pub flow_options: HashMap<String, String>,
1149}
1150
1151impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1152 type Error = error::Error;
1153
1154 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1155 let CreateFlowExpr {
1156 catalog_name,
1157 flow_name,
1158 source_table_names,
1159 sink_table_name,
1160 or_replace,
1161 create_if_not_exists,
1162 expire_after,
1163 eval_interval,
1164 comment,
1165 sql,
1166 flow_options,
1167 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1168 err_msg: "expected create_flow",
1169 })?;
1170
1171 Ok(CreateFlowTask {
1172 catalog_name,
1173 flow_name,
1174 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1175 sink_table_name: sink_table_name
1176 .context(error::InvalidProtoMsgSnafu {
1177 err_msg: "expected sink_table_name",
1178 })?
1179 .into(),
1180 or_replace,
1181 create_if_not_exists,
1182 expire_after: expire_after.map(|e| e.value),
1183 eval_interval_secs: eval_interval.map(|e| e.seconds),
1184 comment,
1185 sql,
1186 flow_options,
1187 })
1188 }
1189}
1190
1191impl From<CreateFlowTask> for PbCreateFlowTask {
1192 fn from(
1193 CreateFlowTask {
1194 catalog_name,
1195 flow_name,
1196 source_table_names,
1197 sink_table_name,
1198 or_replace,
1199 create_if_not_exists,
1200 expire_after,
1201 eval_interval_secs: eval_interval,
1202 comment,
1203 sql,
1204 flow_options,
1205 }: CreateFlowTask,
1206 ) -> Self {
1207 PbCreateFlowTask {
1208 create_flow: Some(CreateFlowExpr {
1209 catalog_name,
1210 flow_name,
1211 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1212 sink_table_name: Some(sink_table_name.into()),
1213 or_replace,
1214 create_if_not_exists,
1215 expire_after: expire_after.map(|value| ExpireAfter { value }),
1216 eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1217 comment,
1218 sql,
1219 flow_options,
1220 }),
1221 }
1222 }
1223}
1224
1225#[derive(Debug, Clone, Serialize, Deserialize)]
1227pub struct DropFlowTask {
1228 pub catalog_name: String,
1229 pub flow_name: String,
1230 pub flow_id: FlowId,
1231 pub drop_if_exists: bool,
1232}
1233
1234impl TryFrom<PbDropFlowTask> for DropFlowTask {
1235 type Error = error::Error;
1236
1237 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1238 let DropFlowExpr {
1239 catalog_name,
1240 flow_name,
1241 flow_id,
1242 drop_if_exists,
1243 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1244 err_msg: "expected drop_flow",
1245 })?;
1246 let flow_id = flow_id
1247 .context(error::InvalidProtoMsgSnafu {
1248 err_msg: "expected flow_id",
1249 })?
1250 .id;
1251 Ok(DropFlowTask {
1252 catalog_name,
1253 flow_name,
1254 flow_id,
1255 drop_if_exists,
1256 })
1257 }
1258}
1259
1260impl From<DropFlowTask> for PbDropFlowTask {
1261 fn from(
1262 DropFlowTask {
1263 catalog_name,
1264 flow_name,
1265 flow_id,
1266 drop_if_exists,
1267 }: DropFlowTask,
1268 ) -> Self {
1269 PbDropFlowTask {
1270 drop_flow: Some(DropFlowExpr {
1271 catalog_name,
1272 flow_name,
1273 flow_id: Some(api::v1::FlowId { id: flow_id }),
1274 drop_if_exists,
1275 }),
1276 }
1277 }
1278}
1279
1280#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1281pub struct QueryContext {
1282 pub(crate) current_catalog: String,
1283 pub(crate) current_schema: String,
1284 pub(crate) timezone: String,
1285 pub(crate) extensions: HashMap<String, String>,
1286 pub(crate) channel: u8,
1287}
1288
1289impl QueryContext {
1290 pub fn current_catalog(&self) -> &str {
1292 &self.current_catalog
1293 }
1294
1295 pub fn current_schema(&self) -> &str {
1297 &self.current_schema
1298 }
1299
1300 pub fn timezone(&self) -> &str {
1302 &self.timezone
1303 }
1304
1305 pub fn extensions(&self) -> &HashMap<String, String> {
1307 &self.extensions
1308 }
1309
1310 pub fn channel(&self) -> u8 {
1312 self.channel
1313 }
1314}
1315
1316#[derive(Debug, Clone, Serialize, PartialEq)]
1320pub struct FlowQueryContext {
1321 pub(crate) catalog: String,
1323 pub(crate) schema: String,
1325 pub(crate) timezone: String,
1327}
1328
1329impl<'de> Deserialize<'de> for FlowQueryContext {
1330 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1331 where
1332 D: serde::Deserializer<'de>,
1333 {
1334 #[derive(Deserialize)]
1336 #[serde(untagged)]
1337 enum ContextCompat {
1338 Flow(FlowQueryContextHelper),
1339 Full(QueryContext),
1340 }
1341
1342 #[derive(Deserialize)]
1343 struct FlowQueryContextHelper {
1344 catalog: String,
1345 schema: String,
1346 timezone: String,
1347 }
1348
1349 match ContextCompat::deserialize(deserializer)? {
1350 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1351 catalog: helper.catalog,
1352 schema: helper.schema,
1353 timezone: helper.timezone,
1354 }),
1355 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1356 }
1357 }
1358}
1359
1360impl From<QueryContextRef> for QueryContext {
1361 fn from(query_context: QueryContextRef) -> Self {
1362 QueryContext {
1363 current_catalog: query_context.current_catalog().to_string(),
1364 current_schema: query_context.current_schema().clone(),
1365 timezone: query_context.timezone().to_string(),
1366 extensions: query_context.extensions(),
1367 channel: query_context.channel() as u8,
1368 }
1369 }
1370}
1371
1372impl TryFrom<QueryContext> for session::context::QueryContext {
1373 type Error = error::Error;
1374 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1375 Ok(QueryContextBuilder::default()
1376 .current_catalog(value.current_catalog)
1377 .current_schema(value.current_schema)
1378 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1379 .extensions(value.extensions)
1380 .channel((value.channel as u32).into())
1381 .build())
1382 }
1383}
1384
1385impl From<QueryContext> for PbQueryContext {
1386 fn from(
1387 QueryContext {
1388 current_catalog,
1389 current_schema,
1390 timezone,
1391 extensions,
1392 channel,
1393 }: QueryContext,
1394 ) -> Self {
1395 PbQueryContext {
1396 current_catalog,
1397 current_schema,
1398 timezone,
1399 extensions,
1400 channel: channel as u32,
1401 snapshot_seqs: None,
1402 explain: None,
1403 }
1404 }
1405}
1406
1407impl From<QueryContext> for FlowQueryContext {
1408 fn from(ctx: QueryContext) -> Self {
1409 Self {
1410 catalog: ctx.current_catalog,
1411 schema: ctx.current_schema,
1412 timezone: ctx.timezone,
1413 }
1414 }
1415}
1416
1417impl From<QueryContextRef> for FlowQueryContext {
1418 fn from(ctx: QueryContextRef) -> Self {
1419 Self {
1420 catalog: ctx.current_catalog().to_string(),
1421 schema: ctx.current_schema().clone(),
1422 timezone: ctx.timezone().to_string(),
1423 }
1424 }
1425}
1426
1427impl From<FlowQueryContext> for QueryContext {
1428 fn from(flow_ctx: FlowQueryContext) -> Self {
1429 Self {
1430 current_catalog: flow_ctx.catalog,
1431 current_schema: flow_ctx.schema,
1432 timezone: flow_ctx.timezone,
1433 extensions: HashMap::new(),
1434 channel: 0, }
1436 }
1437}
1438
1439impl From<FlowQueryContext> for PbQueryContext {
1440 fn from(flow_ctx: FlowQueryContext) -> Self {
1441 let query_ctx: QueryContext = flow_ctx.into();
1442 query_ctx.into()
1443 }
1444}
1445
1446#[cfg(test)]
1447mod tests {
1448 use std::sync::Arc;
1449
1450 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1451 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1452 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1453 use store_api::storage::ConcreteDataType;
1454 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1455 use table::test_util::table_info::test_table_info;
1456
1457 use super::{AlterTableTask, CreateTableTask, *};
1458
1459 #[test]
1460 fn test_basic_ser_de_create_table_task() {
1461 let schema = SchemaBuilder::default().build().unwrap();
1462 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1463 let task = CreateTableTask::new(
1464 CreateTableExpr::default(),
1465 Vec::new(),
1466 RawTableInfo::from(table_info),
1467 );
1468
1469 let output = serde_json::to_vec(&task).unwrap();
1470
1471 let de = serde_json::from_slice(&output).unwrap();
1472 assert_eq!(task, de);
1473 }
1474
1475 #[test]
1476 fn test_basic_ser_de_alter_table_task() {
1477 let task = AlterTableTask {
1478 alter_table: AlterTableExpr::default(),
1479 };
1480
1481 let output = serde_json::to_vec(&task).unwrap();
1482
1483 let de = serde_json::from_slice(&output).unwrap();
1484 assert_eq!(task, de);
1485 }
1486
1487 #[test]
1488 fn test_sort_columns() {
1489 let raw_schema = RawSchema {
1491 column_schemas: vec![
1492 ColumnSchema::new(
1493 "column3".to_string(),
1494 ConcreteDataType::string_datatype(),
1495 true,
1496 ),
1497 ColumnSchema::new(
1498 "column1".to_string(),
1499 ConcreteDataType::timestamp_millisecond_datatype(),
1500 false,
1501 )
1502 .with_time_index(true),
1503 ColumnSchema::new(
1504 "column2".to_string(),
1505 ConcreteDataType::float64_datatype(),
1506 true,
1507 ),
1508 ],
1509 timestamp_index: Some(1),
1510 version: 0,
1511 };
1512
1513 let raw_table_meta = RawTableMeta {
1515 schema: raw_schema,
1516 primary_key_indices: vec![0],
1517 value_indices: vec![2],
1518 engine: METRIC_ENGINE_NAME.to_string(),
1519 next_column_id: 0,
1520 region_numbers: vec![0],
1521 options: Default::default(),
1522 created_on: Default::default(),
1523 updated_on: Default::default(),
1524 partition_key_indices: Default::default(),
1525 column_ids: Default::default(),
1526 };
1527
1528 let raw_table_info = RawTableInfo {
1530 ident: Default::default(),
1531 meta: raw_table_meta,
1532 name: Default::default(),
1533 desc: Default::default(),
1534 catalog_name: Default::default(),
1535 schema_name: Default::default(),
1536 table_type: TableType::Base,
1537 };
1538
1539 let create_table_expr = CreateTableExpr {
1541 column_defs: vec![
1542 ColumnDef {
1543 name: "column3".to_string(),
1544 semantic_type: SemanticType::Tag as i32,
1545 ..Default::default()
1546 },
1547 ColumnDef {
1548 name: "column1".to_string(),
1549 semantic_type: SemanticType::Timestamp as i32,
1550 ..Default::default()
1551 },
1552 ColumnDef {
1553 name: "column2".to_string(),
1554 semantic_type: SemanticType::Field as i32,
1555 ..Default::default()
1556 },
1557 ],
1558 primary_keys: vec!["column3".to_string()],
1559 ..Default::default()
1560 };
1561
1562 let mut create_table_task =
1563 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1564
1565 create_table_task.sort_columns();
1567
1568 assert_eq!(
1570 create_table_task.create_table.column_defs[0].name,
1571 "column1".to_string()
1572 );
1573 assert_eq!(
1574 create_table_task.create_table.column_defs[1].name,
1575 "column2".to_string()
1576 );
1577 assert_eq!(
1578 create_table_task.create_table.column_defs[2].name,
1579 "column3".to_string()
1580 );
1581
1582 assert_eq!(
1584 create_table_task.table_info.meta.schema.timestamp_index,
1585 Some(0)
1586 );
1587 assert_eq!(
1588 create_table_task.table_info.meta.primary_key_indices,
1589 vec![2]
1590 );
1591 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1592 }
1593
1594 #[test]
1595 fn test_flow_query_context_conversion_from_query_context() {
1596 use std::collections::HashMap;
1597 let mut extensions = HashMap::new();
1598 extensions.insert("key1".to_string(), "value1".to_string());
1599 extensions.insert("key2".to_string(), "value2".to_string());
1600
1601 let query_ctx = QueryContext {
1602 current_catalog: "test_catalog".to_string(),
1603 current_schema: "test_schema".to_string(),
1604 timezone: "UTC".to_string(),
1605 extensions,
1606 channel: 5,
1607 };
1608
1609 let flow_ctx: FlowQueryContext = query_ctx.into();
1610
1611 assert_eq!(flow_ctx.catalog, "test_catalog");
1612 assert_eq!(flow_ctx.schema, "test_schema");
1613 assert_eq!(flow_ctx.timezone, "UTC");
1614 }
1615
1616 #[test]
1617 fn test_flow_query_context_conversion_to_query_context() {
1618 let flow_ctx = FlowQueryContext {
1619 catalog: "prod_catalog".to_string(),
1620 schema: "public".to_string(),
1621 timezone: "America/New_York".to_string(),
1622 };
1623
1624 let query_ctx: QueryContext = flow_ctx.clone().into();
1625
1626 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1627 assert_eq!(query_ctx.current_schema, "public");
1628 assert_eq!(query_ctx.timezone, "America/New_York");
1629 assert!(query_ctx.extensions.is_empty());
1630 assert_eq!(query_ctx.channel, 0);
1631
1632 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1634 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1635 }
1636
1637 #[test]
1638 fn test_flow_query_context_conversion_from_query_context_ref() {
1639 use common_time::Timezone;
1640 use session::context::QueryContextBuilder;
1641
1642 let session_ctx = QueryContextBuilder::default()
1643 .current_catalog("session_catalog".to_string())
1644 .current_schema("session_schema".to_string())
1645 .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1646 .build();
1647
1648 let session_ctx_ref = Arc::new(session_ctx);
1649 let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1650
1651 assert_eq!(flow_ctx.catalog, "session_catalog");
1652 assert_eq!(flow_ctx.schema, "session_schema");
1653 assert_eq!(flow_ctx.timezone, "Europe/London");
1654 }
1655
1656 #[test]
1657 fn test_flow_query_context_serialization() {
1658 let flow_ctx = FlowQueryContext {
1659 catalog: "test_catalog".to_string(),
1660 schema: "test_schema".to_string(),
1661 timezone: "UTC".to_string(),
1662 };
1663
1664 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1665 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1666
1667 assert_eq!(flow_ctx, deserialized);
1668
1669 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1671 assert_eq!(json_value["catalog"], "test_catalog");
1672 assert_eq!(json_value["schema"], "test_schema");
1673 assert_eq!(json_value["timezone"], "UTC");
1674 }
1675
1676 #[test]
1677 fn test_flow_query_context_conversion_to_pb() {
1678 let flow_ctx = FlowQueryContext {
1679 catalog: "pb_catalog".to_string(),
1680 schema: "pb_schema".to_string(),
1681 timezone: "Asia/Tokyo".to_string(),
1682 };
1683
1684 let pb_ctx: PbQueryContext = flow_ctx.into();
1685
1686 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1687 assert_eq!(pb_ctx.current_schema, "pb_schema");
1688 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1689 assert!(pb_ctx.extensions.is_empty());
1690 assert_eq!(pb_ctx.channel, 0);
1691 assert!(pb_ctx.snapshot_seqs.is_none());
1692 assert!(pb_ctx.explain.is_none());
1693 }
1694}