1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26 AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
27 CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
28 CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
29 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
30 DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
31 DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
32 DropViewTask as PbDropViewTask, Partition, ProcedureId,
33 TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36 AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
37 CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval,
38 ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
39};
40use base64::Engine as _;
41use base64::engine::general_purpose;
42use common_error::ext::BoxedError;
43use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use serde_with::{DefaultOnNull, serde_as};
47use session::context::{QueryContextBuilder, QueryContextRef};
48use snafu::{OptionExt, ResultExt};
49use table::metadata::{RawTableInfo, TableId};
50use table::table_name::TableName;
51use table::table_reference::TableReference;
52
53use crate::error::{
54 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
55 InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
56};
57use crate::key::FlowId;
58
59#[derive(Debug, Clone)]
61pub enum DdlTask {
62 CreateTable(CreateTableTask),
63 DropTable(DropTableTask),
64 AlterTable(AlterTableTask),
65 TruncateTable(TruncateTableTask),
66 CreateLogicalTables(Vec<CreateTableTask>),
67 DropLogicalTables(Vec<DropTableTask>),
68 AlterLogicalTables(Vec<AlterTableTask>),
69 CreateDatabase(CreateDatabaseTask),
70 DropDatabase(DropDatabaseTask),
71 AlterDatabase(AlterDatabaseTask),
72 CreateFlow(CreateFlowTask),
73 DropFlow(DropFlowTask),
74 #[cfg(feature = "enterprise")]
75 DropTrigger(trigger::DropTriggerTask),
76 CreateView(CreateViewTask),
77 DropView(DropViewTask),
78 #[cfg(feature = "enterprise")]
79 CreateTrigger(trigger::CreateTriggerTask),
80}
81
82impl DdlTask {
83 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
85 DdlTask::CreateFlow(expr)
86 }
87
88 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
90 DdlTask::DropFlow(expr)
91 }
92
93 pub fn new_drop_view(expr: DropViewTask) -> Self {
95 DdlTask::DropView(expr)
96 }
97
98 pub fn new_create_table(
100 expr: CreateTableExpr,
101 partitions: Vec<Partition>,
102 table_info: RawTableInfo,
103 ) -> Self {
104 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
105 }
106
107 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
109 DdlTask::CreateLogicalTables(
110 table_data
111 .into_iter()
112 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
113 .collect(),
114 )
115 }
116
117 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
119 DdlTask::AlterLogicalTables(
120 table_data
121 .into_iter()
122 .map(|alter_table| AlterTableTask { alter_table })
123 .collect(),
124 )
125 }
126
127 pub fn new_drop_table(
129 catalog: String,
130 schema: String,
131 table: String,
132 table_id: TableId,
133 drop_if_exists: bool,
134 ) -> Self {
135 DdlTask::DropTable(DropTableTask {
136 catalog,
137 schema,
138 table,
139 table_id,
140 drop_if_exists,
141 })
142 }
143
144 pub fn new_create_database(
146 catalog: String,
147 schema: String,
148 create_if_not_exists: bool,
149 options: HashMap<String, String>,
150 ) -> Self {
151 DdlTask::CreateDatabase(CreateDatabaseTask {
152 catalog,
153 schema,
154 create_if_not_exists,
155 options,
156 })
157 }
158
159 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
161 DdlTask::DropDatabase(DropDatabaseTask {
162 catalog,
163 schema,
164 drop_if_exists,
165 })
166 }
167
168 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
170 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
171 }
172
173 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
175 DdlTask::AlterTable(AlterTableTask { alter_table })
176 }
177
178 pub fn new_truncate_table(
180 catalog: String,
181 schema: String,
182 table: String,
183 table_id: TableId,
184 time_ranges: Vec<(Timestamp, Timestamp)>,
185 ) -> Self {
186 DdlTask::TruncateTable(TruncateTableTask {
187 catalog,
188 schema,
189 table,
190 table_id,
191 time_ranges,
192 })
193 }
194
195 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
197 DdlTask::CreateView(CreateViewTask {
198 create_view,
199 view_info,
200 })
201 }
202}
203
204impl TryFrom<Task> for DdlTask {
205 type Error = error::Error;
206 fn try_from(task: Task) -> Result<Self> {
207 match task {
208 Task::CreateTableTask(create_table) => {
209 Ok(DdlTask::CreateTable(create_table.try_into()?))
210 }
211 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
212 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
213 Task::TruncateTableTask(truncate_table) => {
214 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
215 }
216 Task::CreateTableTasks(create_tables) => {
217 let tasks = create_tables
218 .tasks
219 .into_iter()
220 .map(|task| task.try_into())
221 .collect::<Result<Vec<_>>>()?;
222
223 Ok(DdlTask::CreateLogicalTables(tasks))
224 }
225 Task::DropTableTasks(drop_tables) => {
226 let tasks = drop_tables
227 .tasks
228 .into_iter()
229 .map(|task| task.try_into())
230 .collect::<Result<Vec<_>>>()?;
231
232 Ok(DdlTask::DropLogicalTables(tasks))
233 }
234 Task::AlterTableTasks(alter_tables) => {
235 let tasks = alter_tables
236 .tasks
237 .into_iter()
238 .map(|task| task.try_into())
239 .collect::<Result<Vec<_>>>()?;
240
241 Ok(DdlTask::AlterLogicalTables(tasks))
242 }
243 Task::CreateDatabaseTask(create_database) => {
244 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
245 }
246 Task::DropDatabaseTask(drop_database) => {
247 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
248 }
249 Task::AlterDatabaseTask(alter_database) => {
250 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
251 }
252 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
253 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
254 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
255 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
256 Task::CreateTriggerTask(create_trigger) => {
257 #[cfg(feature = "enterprise")]
258 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
259 #[cfg(not(feature = "enterprise"))]
260 {
261 let _ = create_trigger;
262 crate::error::UnsupportedSnafu {
263 operation: "create trigger",
264 }
265 .fail()
266 }
267 }
268 Task::DropTriggerTask(drop_trigger) => {
269 #[cfg(feature = "enterprise")]
270 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
271 #[cfg(not(feature = "enterprise"))]
272 {
273 let _ = drop_trigger;
274 crate::error::UnsupportedSnafu {
275 operation: "drop trigger",
276 }
277 .fail()
278 }
279 }
280 }
281 }
282}
283
284#[derive(Clone)]
285pub struct SubmitDdlTaskRequest {
286 pub query_context: QueryContextRef,
287 pub task: DdlTask,
288}
289
290impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
291 type Error = error::Error;
292
293 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
294 let task = match request.task {
295 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
296 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
297 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
298 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
299 DdlTask::CreateLogicalTables(tasks) => {
300 let tasks = tasks
301 .into_iter()
302 .map(|task| task.try_into())
303 .collect::<Result<Vec<_>>>()?;
304
305 Task::CreateTableTasks(PbCreateTableTasks { tasks })
306 }
307 DdlTask::DropLogicalTables(tasks) => {
308 let tasks = tasks
309 .into_iter()
310 .map(|task| task.into())
311 .collect::<Vec<_>>();
312
313 Task::DropTableTasks(PbDropTableTasks { tasks })
314 }
315 DdlTask::AlterLogicalTables(tasks) => {
316 let tasks = tasks
317 .into_iter()
318 .map(|task| task.try_into())
319 .collect::<Result<Vec<_>>>()?;
320
321 Task::AlterTableTasks(PbAlterTableTasks { tasks })
322 }
323 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
324 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
325 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
326 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
327 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
328 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
329 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
330 #[cfg(feature = "enterprise")]
331 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
332 #[cfg(feature = "enterprise")]
333 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
334 };
335
336 Ok(Self {
337 header: None,
338 query_context: Some((*request.query_context).clone().into()),
339 task: Some(task),
340 })
341 }
342}
343
344#[derive(Debug, Default)]
345pub struct SubmitDdlTaskResponse {
346 pub key: Vec<u8>,
347 pub table_ids: Vec<TableId>,
349}
350
351impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
352 type Error = error::Error;
353
354 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
355 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
356 Ok(Self {
357 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
358 table_ids,
359 })
360 }
361}
362
363impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
364 fn from(val: SubmitDdlTaskResponse) -> Self {
365 Self {
366 pid: Some(ProcedureId { key: val.key }),
367 table_ids: val
368 .table_ids
369 .into_iter()
370 .map(|id| api::v1::TableId { id })
371 .collect(),
372 ..Default::default()
373 }
374 }
375}
376
377#[derive(Debug, PartialEq, Clone)]
379pub struct CreateViewTask {
380 pub create_view: CreateViewExpr,
381 pub view_info: RawTableInfo,
382}
383
384impl CreateViewTask {
385 pub fn table_ref(&self) -> TableReference {
387 TableReference {
388 catalog: &self.create_view.catalog_name,
389 schema: &self.create_view.schema_name,
390 table: &self.create_view.view_name,
391 }
392 }
393
394 pub fn raw_logical_plan(&self) -> &Vec<u8> {
396 &self.create_view.logical_plan
397 }
398
399 pub fn view_definition(&self) -> &str {
401 &self.create_view.definition
402 }
403
404 pub fn table_names(&self) -> HashSet<TableName> {
406 self.create_view
407 .table_names
408 .iter()
409 .map(|t| t.clone().into())
410 .collect()
411 }
412
413 pub fn columns(&self) -> &Vec<String> {
415 &self.create_view.columns
416 }
417
418 pub fn plan_columns(&self) -> &Vec<String> {
420 &self.create_view.plan_columns
421 }
422}
423
424impl TryFrom<PbCreateViewTask> for CreateViewTask {
425 type Error = error::Error;
426
427 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
428 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
429
430 Ok(CreateViewTask {
431 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
432 err_msg: "expected create view",
433 })?,
434 view_info,
435 })
436 }
437}
438
439impl TryFrom<CreateViewTask> for PbCreateViewTask {
440 type Error = error::Error;
441
442 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
443 Ok(PbCreateViewTask {
444 create_view: Some(task.create_view),
445 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
446 })
447 }
448}
449
450impl Serialize for CreateViewTask {
451 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
452 where
453 S: serde::Serializer,
454 {
455 let view_info = serde_json::to_vec(&self.view_info)
456 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
457
458 let pb = PbCreateViewTask {
459 create_view: Some(self.create_view.clone()),
460 view_info,
461 };
462 let buf = pb.encode_to_vec();
463 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
464 serializer.serialize_str(&encoded)
465 }
466}
467
468impl<'de> Deserialize<'de> for CreateViewTask {
469 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
470 where
471 D: serde::Deserializer<'de>,
472 {
473 let encoded = String::deserialize(deserializer)?;
474 let buf = general_purpose::STANDARD_NO_PAD
475 .decode(encoded)
476 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
477 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
478 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
479
480 let expr = CreateViewTask::try_from(expr)
481 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
482
483 Ok(expr)
484 }
485}
486
487#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
489pub struct DropViewTask {
490 pub catalog: String,
491 pub schema: String,
492 pub view: String,
493 pub view_id: TableId,
494 pub drop_if_exists: bool,
495}
496
497impl DropViewTask {
498 pub fn table_ref(&self) -> TableReference {
500 TableReference {
501 catalog: &self.catalog,
502 schema: &self.schema,
503 table: &self.view,
504 }
505 }
506}
507
508impl TryFrom<PbDropViewTask> for DropViewTask {
509 type Error = error::Error;
510
511 fn try_from(pb: PbDropViewTask) -> Result<Self> {
512 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
513 err_msg: "expected drop view",
514 })?;
515
516 Ok(DropViewTask {
517 catalog: expr.catalog_name,
518 schema: expr.schema_name,
519 view: expr.view_name,
520 view_id: expr
521 .view_id
522 .context(error::InvalidProtoMsgSnafu {
523 err_msg: "expected view_id",
524 })?
525 .id,
526 drop_if_exists: expr.drop_if_exists,
527 })
528 }
529}
530
531impl From<DropViewTask> for PbDropViewTask {
532 fn from(task: DropViewTask) -> Self {
533 PbDropViewTask {
534 drop_view: Some(DropViewExpr {
535 catalog_name: task.catalog,
536 schema_name: task.schema,
537 view_name: task.view,
538 view_id: Some(api::v1::TableId { id: task.view_id }),
539 drop_if_exists: task.drop_if_exists,
540 }),
541 }
542 }
543}
544
545#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
546pub struct DropTableTask {
547 pub catalog: String,
548 pub schema: String,
549 pub table: String,
550 pub table_id: TableId,
551 #[serde(default)]
552 pub drop_if_exists: bool,
553}
554
555impl DropTableTask {
556 pub fn table_ref(&self) -> TableReference {
557 TableReference {
558 catalog: &self.catalog,
559 schema: &self.schema,
560 table: &self.table,
561 }
562 }
563
564 pub fn table_name(&self) -> TableName {
565 TableName {
566 catalog_name: self.catalog.to_string(),
567 schema_name: self.schema.to_string(),
568 table_name: self.table.to_string(),
569 }
570 }
571}
572
573impl TryFrom<PbDropTableTask> for DropTableTask {
574 type Error = error::Error;
575
576 fn try_from(pb: PbDropTableTask) -> Result<Self> {
577 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
578 err_msg: "expected drop table",
579 })?;
580
581 Ok(Self {
582 catalog: drop_table.catalog_name,
583 schema: drop_table.schema_name,
584 table: drop_table.table_name,
585 table_id: drop_table
586 .table_id
587 .context(error::InvalidProtoMsgSnafu {
588 err_msg: "expected table_id",
589 })?
590 .id,
591 drop_if_exists: drop_table.drop_if_exists,
592 })
593 }
594}
595
596impl From<DropTableTask> for PbDropTableTask {
597 fn from(task: DropTableTask) -> Self {
598 PbDropTableTask {
599 drop_table: Some(DropTableExpr {
600 catalog_name: task.catalog,
601 schema_name: task.schema,
602 table_name: task.table,
603 table_id: Some(api::v1::TableId { id: task.table_id }),
604 drop_if_exists: task.drop_if_exists,
605 }),
606 }
607 }
608}
609
610#[derive(Debug, PartialEq, Clone)]
611pub struct CreateTableTask {
612 pub create_table: CreateTableExpr,
613 pub partitions: Vec<Partition>,
614 pub table_info: RawTableInfo,
615}
616
617impl TryFrom<PbCreateTableTask> for CreateTableTask {
618 type Error = error::Error;
619
620 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
621 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
622
623 Ok(CreateTableTask::new(
624 pb.create_table.context(error::InvalidProtoMsgSnafu {
625 err_msg: "expected create table",
626 })?,
627 pb.partitions,
628 table_info,
629 ))
630 }
631}
632
633impl TryFrom<CreateTableTask> for PbCreateTableTask {
634 type Error = error::Error;
635
636 fn try_from(task: CreateTableTask) -> Result<Self> {
637 Ok(PbCreateTableTask {
638 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
639 create_table: Some(task.create_table),
640 partitions: task.partitions,
641 })
642 }
643}
644
645impl CreateTableTask {
646 pub fn new(
647 expr: CreateTableExpr,
648 partitions: Vec<Partition>,
649 table_info: RawTableInfo,
650 ) -> CreateTableTask {
651 CreateTableTask {
652 create_table: expr,
653 partitions,
654 table_info,
655 }
656 }
657
658 pub fn table_name(&self) -> TableName {
659 let table = &self.create_table;
660
661 TableName {
662 catalog_name: table.catalog_name.to_string(),
663 schema_name: table.schema_name.to_string(),
664 table_name: table.table_name.to_string(),
665 }
666 }
667
668 pub fn table_ref(&self) -> TableReference {
669 let table = &self.create_table;
670
671 TableReference {
672 catalog: &table.catalog_name,
673 schema: &table.schema_name,
674 table: &table.table_name,
675 }
676 }
677
678 pub fn set_table_id(&mut self, table_id: TableId) {
680 self.table_info.ident.table_id = table_id;
681 }
682
683 pub fn sort_columns(&mut self) {
688 self.create_table
691 .column_defs
692 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
693
694 self.table_info.sort_columns();
695 }
696}
697
698impl Serialize for CreateTableTask {
699 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
700 where
701 S: serde::Serializer,
702 {
703 let table_info = serde_json::to_vec(&self.table_info)
704 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
705
706 let pb = PbCreateTableTask {
707 create_table: Some(self.create_table.clone()),
708 partitions: self.partitions.clone(),
709 table_info,
710 };
711 let buf = pb.encode_to_vec();
712 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
713 serializer.serialize_str(&encoded)
714 }
715}
716
717impl<'de> Deserialize<'de> for CreateTableTask {
718 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
719 where
720 D: serde::Deserializer<'de>,
721 {
722 let encoded = String::deserialize(deserializer)?;
723 let buf = general_purpose::STANDARD_NO_PAD
724 .decode(encoded)
725 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
726 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
727 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
728
729 let expr = CreateTableTask::try_from(expr)
730 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
731
732 Ok(expr)
733 }
734}
735
736#[derive(Debug, PartialEq, Clone)]
737pub struct AlterTableTask {
738 pub alter_table: AlterTableExpr,
740}
741
742impl AlterTableTask {
743 pub fn validate(&self) -> Result<()> {
744 self.alter_table
745 .kind
746 .as_ref()
747 .context(error::UnexpectedSnafu {
748 err_msg: "'kind' is absent",
749 })?;
750 Ok(())
751 }
752
753 pub fn table_ref(&self) -> TableReference {
754 TableReference {
755 catalog: &self.alter_table.catalog_name,
756 schema: &self.alter_table.schema_name,
757 table: &self.alter_table.table_name,
758 }
759 }
760
761 pub fn table_name(&self) -> TableName {
762 let table = &self.alter_table;
763
764 TableName {
765 catalog_name: table.catalog_name.to_string(),
766 schema_name: table.schema_name.to_string(),
767 table_name: table.table_name.to_string(),
768 }
769 }
770}
771
772impl TryFrom<PbAlterTableTask> for AlterTableTask {
773 type Error = error::Error;
774
775 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
776 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
777 err_msg: "expected alter_table",
778 })?;
779
780 Ok(AlterTableTask { alter_table })
781 }
782}
783
784impl TryFrom<AlterTableTask> for PbAlterTableTask {
785 type Error = error::Error;
786
787 fn try_from(task: AlterTableTask) -> Result<Self> {
788 Ok(PbAlterTableTask {
789 alter_table: Some(task.alter_table),
790 })
791 }
792}
793
794impl Serialize for AlterTableTask {
795 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
796 where
797 S: serde::Serializer,
798 {
799 let pb = PbAlterTableTask {
800 alter_table: Some(self.alter_table.clone()),
801 };
802 let buf = pb.encode_to_vec();
803 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
804 serializer.serialize_str(&encoded)
805 }
806}
807
808impl<'de> Deserialize<'de> for AlterTableTask {
809 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
810 where
811 D: serde::Deserializer<'de>,
812 {
813 let encoded = String::deserialize(deserializer)?;
814 let buf = general_purpose::STANDARD_NO_PAD
815 .decode(encoded)
816 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
817 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
818 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
819
820 let expr = AlterTableTask::try_from(expr)
821 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
822
823 Ok(expr)
824 }
825}
826
827#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
828pub struct TruncateTableTask {
829 pub catalog: String,
830 pub schema: String,
831 pub table: String,
832 pub table_id: TableId,
833 pub time_ranges: Vec<(Timestamp, Timestamp)>,
834}
835
836impl TruncateTableTask {
837 pub fn table_ref(&self) -> TableReference {
838 TableReference {
839 catalog: &self.catalog,
840 schema: &self.schema,
841 table: &self.table,
842 }
843 }
844
845 pub fn table_name(&self) -> TableName {
846 TableName {
847 catalog_name: self.catalog.to_string(),
848 schema_name: self.schema.to_string(),
849 table_name: self.table.to_string(),
850 }
851 }
852}
853
854impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
855 type Error = error::Error;
856
857 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
858 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
859 err_msg: "expected truncate table",
860 })?;
861
862 Ok(Self {
863 catalog: truncate_table.catalog_name,
864 schema: truncate_table.schema_name,
865 table: truncate_table.table_name,
866 table_id: truncate_table
867 .table_id
868 .context(error::InvalidProtoMsgSnafu {
869 err_msg: "expected table_id",
870 })?
871 .id,
872 time_ranges: truncate_table
873 .time_ranges
874 .map(from_pb_time_ranges)
875 .transpose()
876 .map_err(BoxedError::new)
877 .context(ExternalSnafu)?
878 .unwrap_or_default(),
879 })
880 }
881}
882
883impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
884 type Error = error::Error;
885
886 fn try_from(task: TruncateTableTask) -> Result<Self> {
887 Ok(PbTruncateTableTask {
888 truncate_table: Some(TruncateTableExpr {
889 catalog_name: task.catalog,
890 schema_name: task.schema,
891 table_name: task.table,
892 table_id: Some(api::v1::TableId { id: task.table_id }),
893 time_ranges: Some(
894 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
895 ),
896 }),
897 })
898 }
899}
900
901#[serde_as]
902#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
903pub struct CreateDatabaseTask {
904 pub catalog: String,
905 pub schema: String,
906 pub create_if_not_exists: bool,
907 #[serde_as(deserialize_as = "DefaultOnNull")]
908 pub options: HashMap<String, String>,
909}
910
911impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
912 type Error = error::Error;
913
914 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
915 let CreateDatabaseExpr {
916 catalog_name,
917 schema_name,
918 create_if_not_exists,
919 options,
920 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
921 err_msg: "expected create database",
922 })?;
923
924 Ok(CreateDatabaseTask {
925 catalog: catalog_name,
926 schema: schema_name,
927 create_if_not_exists,
928 options,
929 })
930 }
931}
932
933impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
934 type Error = error::Error;
935
936 fn try_from(
937 CreateDatabaseTask {
938 catalog,
939 schema,
940 create_if_not_exists,
941 options,
942 }: CreateDatabaseTask,
943 ) -> Result<Self> {
944 Ok(PbCreateDatabaseTask {
945 create_database: Some(CreateDatabaseExpr {
946 catalog_name: catalog,
947 schema_name: schema,
948 create_if_not_exists,
949 options,
950 }),
951 })
952 }
953}
954
955#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
956pub struct DropDatabaseTask {
957 pub catalog: String,
958 pub schema: String,
959 pub drop_if_exists: bool,
960}
961
962impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
963 type Error = error::Error;
964
965 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
966 let DropDatabaseExpr {
967 catalog_name,
968 schema_name,
969 drop_if_exists,
970 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
971 err_msg: "expected drop database",
972 })?;
973
974 Ok(DropDatabaseTask {
975 catalog: catalog_name,
976 schema: schema_name,
977 drop_if_exists,
978 })
979 }
980}
981
982impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
983 type Error = error::Error;
984
985 fn try_from(
986 DropDatabaseTask {
987 catalog,
988 schema,
989 drop_if_exists,
990 }: DropDatabaseTask,
991 ) -> Result<Self> {
992 Ok(PbDropDatabaseTask {
993 drop_database: Some(DropDatabaseExpr {
994 catalog_name: catalog,
995 schema_name: schema,
996 drop_if_exists,
997 }),
998 })
999 }
1000}
1001
1002#[derive(Debug, PartialEq, Clone)]
1003pub struct AlterDatabaseTask {
1004 pub alter_expr: AlterDatabaseExpr,
1005}
1006
1007impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1008 type Error = error::Error;
1009
1010 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1011 Ok(PbAlterDatabaseTask {
1012 task: Some(task.alter_expr),
1013 })
1014 }
1015}
1016
1017impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1018 type Error = error::Error;
1019
1020 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1021 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1022 err_msg: "expected alter database",
1023 })?;
1024
1025 Ok(AlterDatabaseTask { alter_expr })
1026 }
1027}
1028
1029impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1030 type Error = error::Error;
1031
1032 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1033 match pb {
1034 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1035 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1036 options
1037 .set_database_options
1038 .into_iter()
1039 .map(SetDatabaseOption::try_from)
1040 .collect::<Result<Vec<_>>>()?,
1041 )))
1042 }
1043 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1044 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1045 options
1046 .keys
1047 .iter()
1048 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1049 .collect::<Result<Vec<_>>>()?,
1050 )),
1051 ),
1052 }
1053 }
1054}
1055
1056const TTL_KEY: &str = "ttl";
1057
1058impl TryFrom<PbOption> for SetDatabaseOption {
1059 type Error = error::Error;
1060
1061 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1062 match key.to_ascii_lowercase().as_str() {
1063 TTL_KEY => {
1064 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1065 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1066
1067 Ok(SetDatabaseOption::Ttl(ttl))
1068 }
1069 _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1070 }
1071 }
1072}
1073
1074#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1075pub enum SetDatabaseOption {
1076 Ttl(DatabaseTimeToLive),
1077}
1078
1079#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1080pub enum UnsetDatabaseOption {
1081 Ttl,
1082}
1083
1084impl TryFrom<&str> for UnsetDatabaseOption {
1085 type Error = error::Error;
1086
1087 fn try_from(key: &str) -> Result<Self> {
1088 match key.to_ascii_lowercase().as_str() {
1089 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1090 _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1091 }
1092 }
1093}
1094
1095#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1096pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1097
1098#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1099pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1100
1101#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1102pub enum AlterDatabaseKind {
1103 SetDatabaseOptions(SetDatabaseOptions),
1104 UnsetDatabaseOptions(UnsetDatabaseOptions),
1105}
1106
1107impl AlterDatabaseTask {
1108 pub fn catalog(&self) -> &str {
1109 &self.alter_expr.catalog_name
1110 }
1111
1112 pub fn schema(&self) -> &str {
1113 &self.alter_expr.catalog_name
1114 }
1115}
1116
1117#[derive(Debug, Clone, Serialize, Deserialize)]
1119pub struct CreateFlowTask {
1120 pub catalog_name: String,
1121 pub flow_name: String,
1122 pub source_table_names: Vec<TableName>,
1123 pub sink_table_name: TableName,
1124 pub or_replace: bool,
1125 pub create_if_not_exists: bool,
1126 pub expire_after: Option<i64>,
1128 pub eval_interval_secs: Option<i64>,
1129 pub comment: String,
1130 pub sql: String,
1131 pub flow_options: HashMap<String, String>,
1132}
1133
1134impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1135 type Error = error::Error;
1136
1137 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1138 let CreateFlowExpr {
1139 catalog_name,
1140 flow_name,
1141 source_table_names,
1142 sink_table_name,
1143 or_replace,
1144 create_if_not_exists,
1145 expire_after,
1146 eval_interval,
1147 comment,
1148 sql,
1149 flow_options,
1150 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1151 err_msg: "expected create_flow",
1152 })?;
1153
1154 Ok(CreateFlowTask {
1155 catalog_name,
1156 flow_name,
1157 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1158 sink_table_name: sink_table_name
1159 .context(error::InvalidProtoMsgSnafu {
1160 err_msg: "expected sink_table_name",
1161 })?
1162 .into(),
1163 or_replace,
1164 create_if_not_exists,
1165 expire_after: expire_after.map(|e| e.value),
1166 eval_interval_secs: eval_interval.map(|e| e.seconds),
1167 comment,
1168 sql,
1169 flow_options,
1170 })
1171 }
1172}
1173
1174impl From<CreateFlowTask> for PbCreateFlowTask {
1175 fn from(
1176 CreateFlowTask {
1177 catalog_name,
1178 flow_name,
1179 source_table_names,
1180 sink_table_name,
1181 or_replace,
1182 create_if_not_exists,
1183 expire_after,
1184 eval_interval_secs: eval_interval,
1185 comment,
1186 sql,
1187 flow_options,
1188 }: CreateFlowTask,
1189 ) -> Self {
1190 PbCreateFlowTask {
1191 create_flow: Some(CreateFlowExpr {
1192 catalog_name,
1193 flow_name,
1194 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1195 sink_table_name: Some(sink_table_name.into()),
1196 or_replace,
1197 create_if_not_exists,
1198 expire_after: expire_after.map(|value| ExpireAfter { value }),
1199 eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1200 comment,
1201 sql,
1202 flow_options,
1203 }),
1204 }
1205 }
1206}
1207
1208#[derive(Debug, Clone, Serialize, Deserialize)]
1210pub struct DropFlowTask {
1211 pub catalog_name: String,
1212 pub flow_name: String,
1213 pub flow_id: FlowId,
1214 pub drop_if_exists: bool,
1215}
1216
1217impl TryFrom<PbDropFlowTask> for DropFlowTask {
1218 type Error = error::Error;
1219
1220 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1221 let DropFlowExpr {
1222 catalog_name,
1223 flow_name,
1224 flow_id,
1225 drop_if_exists,
1226 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1227 err_msg: "expected drop_flow",
1228 })?;
1229 let flow_id = flow_id
1230 .context(error::InvalidProtoMsgSnafu {
1231 err_msg: "expected flow_id",
1232 })?
1233 .id;
1234 Ok(DropFlowTask {
1235 catalog_name,
1236 flow_name,
1237 flow_id,
1238 drop_if_exists,
1239 })
1240 }
1241}
1242
1243impl From<DropFlowTask> for PbDropFlowTask {
1244 fn from(
1245 DropFlowTask {
1246 catalog_name,
1247 flow_name,
1248 flow_id,
1249 drop_if_exists,
1250 }: DropFlowTask,
1251 ) -> Self {
1252 PbDropFlowTask {
1253 drop_flow: Some(DropFlowExpr {
1254 catalog_name,
1255 flow_name,
1256 flow_id: Some(api::v1::FlowId { id: flow_id }),
1257 drop_if_exists,
1258 }),
1259 }
1260 }
1261}
1262
1263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1264pub struct QueryContext {
1265 pub(crate) current_catalog: String,
1266 pub(crate) current_schema: String,
1267 pub(crate) timezone: String,
1268 pub(crate) extensions: HashMap<String, String>,
1269 pub(crate) channel: u8,
1270}
1271
1272impl QueryContext {
1273 pub fn current_catalog(&self) -> &str {
1275 &self.current_catalog
1276 }
1277
1278 pub fn current_schema(&self) -> &str {
1280 &self.current_schema
1281 }
1282
1283 pub fn timezone(&self) -> &str {
1285 &self.timezone
1286 }
1287
1288 pub fn extensions(&self) -> &HashMap<String, String> {
1290 &self.extensions
1291 }
1292
1293 pub fn channel(&self) -> u8 {
1295 self.channel
1296 }
1297}
1298
1299#[derive(Debug, Clone, Serialize, PartialEq)]
1303pub struct FlowQueryContext {
1304 pub(crate) catalog: String,
1306 pub(crate) schema: String,
1308 pub(crate) timezone: String,
1310}
1311
1312impl<'de> Deserialize<'de> for FlowQueryContext {
1313 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1314 where
1315 D: serde::Deserializer<'de>,
1316 {
1317 #[derive(Deserialize)]
1319 #[serde(untagged)]
1320 enum ContextCompat {
1321 Flow(FlowQueryContextHelper),
1322 Full(QueryContext),
1323 }
1324
1325 #[derive(Deserialize)]
1326 struct FlowQueryContextHelper {
1327 catalog: String,
1328 schema: String,
1329 timezone: String,
1330 }
1331
1332 match ContextCompat::deserialize(deserializer)? {
1333 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1334 catalog: helper.catalog,
1335 schema: helper.schema,
1336 timezone: helper.timezone,
1337 }),
1338 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1339 }
1340 }
1341}
1342
1343impl From<QueryContextRef> for QueryContext {
1344 fn from(query_context: QueryContextRef) -> Self {
1345 QueryContext {
1346 current_catalog: query_context.current_catalog().to_string(),
1347 current_schema: query_context.current_schema().to_string(),
1348 timezone: query_context.timezone().to_string(),
1349 extensions: query_context.extensions(),
1350 channel: query_context.channel() as u8,
1351 }
1352 }
1353}
1354
1355impl TryFrom<QueryContext> for session::context::QueryContext {
1356 type Error = error::Error;
1357 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1358 Ok(QueryContextBuilder::default()
1359 .current_catalog(value.current_catalog)
1360 .current_schema(value.current_schema)
1361 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1362 .extensions(value.extensions)
1363 .channel((value.channel as u32).into())
1364 .build())
1365 }
1366}
1367
1368impl From<QueryContext> for PbQueryContext {
1369 fn from(
1370 QueryContext {
1371 current_catalog,
1372 current_schema,
1373 timezone,
1374 extensions,
1375 channel,
1376 }: QueryContext,
1377 ) -> Self {
1378 PbQueryContext {
1379 current_catalog,
1380 current_schema,
1381 timezone,
1382 extensions,
1383 channel: channel as u32,
1384 snapshot_seqs: None,
1385 explain: None,
1386 }
1387 }
1388}
1389
1390impl From<QueryContext> for FlowQueryContext {
1391 fn from(ctx: QueryContext) -> Self {
1392 Self {
1393 catalog: ctx.current_catalog,
1394 schema: ctx.current_schema,
1395 timezone: ctx.timezone,
1396 }
1397 }
1398}
1399
1400impl From<QueryContextRef> for FlowQueryContext {
1401 fn from(ctx: QueryContextRef) -> Self {
1402 Self {
1403 catalog: ctx.current_catalog().to_string(),
1404 schema: ctx.current_schema().to_string(),
1405 timezone: ctx.timezone().to_string(),
1406 }
1407 }
1408}
1409
1410impl From<FlowQueryContext> for QueryContext {
1411 fn from(flow_ctx: FlowQueryContext) -> Self {
1412 Self {
1413 current_catalog: flow_ctx.catalog,
1414 current_schema: flow_ctx.schema,
1415 timezone: flow_ctx.timezone,
1416 extensions: HashMap::new(),
1417 channel: 0, }
1419 }
1420}
1421
1422impl From<FlowQueryContext> for PbQueryContext {
1423 fn from(flow_ctx: FlowQueryContext) -> Self {
1424 let query_ctx: QueryContext = flow_ctx.into();
1425 query_ctx.into()
1426 }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431 use std::sync::Arc;
1432
1433 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1434 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1435 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1436 use store_api::storage::ConcreteDataType;
1437 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1438 use table::test_util::table_info::test_table_info;
1439
1440 use super::{AlterTableTask, CreateTableTask, *};
1441
1442 #[test]
1443 fn test_basic_ser_de_create_table_task() {
1444 let schema = SchemaBuilder::default().build().unwrap();
1445 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1446 let task = CreateTableTask::new(
1447 CreateTableExpr::default(),
1448 Vec::new(),
1449 RawTableInfo::from(table_info),
1450 );
1451
1452 let output = serde_json::to_vec(&task).unwrap();
1453
1454 let de = serde_json::from_slice(&output).unwrap();
1455 assert_eq!(task, de);
1456 }
1457
1458 #[test]
1459 fn test_basic_ser_de_alter_table_task() {
1460 let task = AlterTableTask {
1461 alter_table: AlterTableExpr::default(),
1462 };
1463
1464 let output = serde_json::to_vec(&task).unwrap();
1465
1466 let de = serde_json::from_slice(&output).unwrap();
1467 assert_eq!(task, de);
1468 }
1469
1470 #[test]
1471 fn test_sort_columns() {
1472 let raw_schema = RawSchema {
1474 column_schemas: vec![
1475 ColumnSchema::new(
1476 "column3".to_string(),
1477 ConcreteDataType::string_datatype(),
1478 true,
1479 ),
1480 ColumnSchema::new(
1481 "column1".to_string(),
1482 ConcreteDataType::timestamp_millisecond_datatype(),
1483 false,
1484 )
1485 .with_time_index(true),
1486 ColumnSchema::new(
1487 "column2".to_string(),
1488 ConcreteDataType::float64_datatype(),
1489 true,
1490 ),
1491 ],
1492 timestamp_index: Some(1),
1493 version: 0,
1494 };
1495
1496 let raw_table_meta = RawTableMeta {
1498 schema: raw_schema,
1499 primary_key_indices: vec![0],
1500 value_indices: vec![2],
1501 engine: METRIC_ENGINE_NAME.to_string(),
1502 next_column_id: 0,
1503 region_numbers: vec![0],
1504 options: Default::default(),
1505 created_on: Default::default(),
1506 partition_key_indices: Default::default(),
1507 column_ids: Default::default(),
1508 };
1509
1510 let raw_table_info = RawTableInfo {
1512 ident: Default::default(),
1513 meta: raw_table_meta,
1514 name: Default::default(),
1515 desc: Default::default(),
1516 catalog_name: Default::default(),
1517 schema_name: Default::default(),
1518 table_type: TableType::Base,
1519 };
1520
1521 let create_table_expr = CreateTableExpr {
1523 column_defs: vec![
1524 ColumnDef {
1525 name: "column3".to_string(),
1526 semantic_type: SemanticType::Tag as i32,
1527 ..Default::default()
1528 },
1529 ColumnDef {
1530 name: "column1".to_string(),
1531 semantic_type: SemanticType::Timestamp as i32,
1532 ..Default::default()
1533 },
1534 ColumnDef {
1535 name: "column2".to_string(),
1536 semantic_type: SemanticType::Field as i32,
1537 ..Default::default()
1538 },
1539 ],
1540 primary_keys: vec!["column3".to_string()],
1541 ..Default::default()
1542 };
1543
1544 let mut create_table_task =
1545 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1546
1547 create_table_task.sort_columns();
1549
1550 assert_eq!(
1552 create_table_task.create_table.column_defs[0].name,
1553 "column1".to_string()
1554 );
1555 assert_eq!(
1556 create_table_task.create_table.column_defs[1].name,
1557 "column2".to_string()
1558 );
1559 assert_eq!(
1560 create_table_task.create_table.column_defs[2].name,
1561 "column3".to_string()
1562 );
1563
1564 assert_eq!(
1566 create_table_task.table_info.meta.schema.timestamp_index,
1567 Some(0)
1568 );
1569 assert_eq!(
1570 create_table_task.table_info.meta.primary_key_indices,
1571 vec![2]
1572 );
1573 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1574 }
1575
1576 #[test]
1577 fn test_flow_query_context_conversion_from_query_context() {
1578 use std::collections::HashMap;
1579 let mut extensions = HashMap::new();
1580 extensions.insert("key1".to_string(), "value1".to_string());
1581 extensions.insert("key2".to_string(), "value2".to_string());
1582
1583 let query_ctx = QueryContext {
1584 current_catalog: "test_catalog".to_string(),
1585 current_schema: "test_schema".to_string(),
1586 timezone: "UTC".to_string(),
1587 extensions,
1588 channel: 5,
1589 };
1590
1591 let flow_ctx: FlowQueryContext = query_ctx.into();
1592
1593 assert_eq!(flow_ctx.catalog, "test_catalog");
1594 assert_eq!(flow_ctx.schema, "test_schema");
1595 assert_eq!(flow_ctx.timezone, "UTC");
1596 }
1597
1598 #[test]
1599 fn test_flow_query_context_conversion_to_query_context() {
1600 let flow_ctx = FlowQueryContext {
1601 catalog: "prod_catalog".to_string(),
1602 schema: "public".to_string(),
1603 timezone: "America/New_York".to_string(),
1604 };
1605
1606 let query_ctx: QueryContext = flow_ctx.clone().into();
1607
1608 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1609 assert_eq!(query_ctx.current_schema, "public");
1610 assert_eq!(query_ctx.timezone, "America/New_York");
1611 assert!(query_ctx.extensions.is_empty());
1612 assert_eq!(query_ctx.channel, 0);
1613
1614 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1616 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1617 }
1618
1619 #[test]
1620 fn test_flow_query_context_conversion_from_query_context_ref() {
1621 use common_time::Timezone;
1622 use session::context::QueryContextBuilder;
1623
1624 let session_ctx = QueryContextBuilder::default()
1625 .current_catalog("session_catalog".to_string())
1626 .current_schema("session_schema".to_string())
1627 .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1628 .build();
1629
1630 let session_ctx_ref = Arc::new(session_ctx);
1631 let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1632
1633 assert_eq!(flow_ctx.catalog, "session_catalog");
1634 assert_eq!(flow_ctx.schema, "session_schema");
1635 assert_eq!(flow_ctx.timezone, "Europe/London");
1636 }
1637
1638 #[test]
1639 fn test_flow_query_context_serialization() {
1640 let flow_ctx = FlowQueryContext {
1641 catalog: "test_catalog".to_string(),
1642 schema: "test_schema".to_string(),
1643 timezone: "UTC".to_string(),
1644 };
1645
1646 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1647 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1648
1649 assert_eq!(flow_ctx, deserialized);
1650
1651 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1653 assert_eq!(json_value["catalog"], "test_catalog");
1654 assert_eq!(json_value["schema"], "test_schema");
1655 assert_eq!(json_value["timezone"], "UTC");
1656 }
1657
1658 #[test]
1659 fn test_flow_query_context_conversion_to_pb() {
1660 let flow_ctx = FlowQueryContext {
1661 catalog: "pb_catalog".to_string(),
1662 schema: "pb_schema".to_string(),
1663 timezone: "Asia/Tokyo".to_string(),
1664 };
1665
1666 let pb_ctx: PbQueryContext = flow_ctx.into();
1667
1668 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1669 assert_eq!(pb_ctx.current_schema, "pb_schema");
1670 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1671 assert!(pb_ctx.extensions.is_empty());
1672 assert_eq!(pb_ctx.channel, 0);
1673 assert!(pb_ctx.snapshot_seqs.is_none());
1674 assert!(pb_ctx.explain.is_none());
1675 }
1676}