1#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25 AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26 AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
27 CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
28 CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
29 DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
30 DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
31 DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
32 DropViewTask as PbDropViewTask, Partition, ProcedureId,
33 TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36 AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
37 CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
38 Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
39};
40use base64::engine::general_purpose;
41use base64::Engine as _;
42use common_error::ext::BoxedError;
43use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use serde_with::{serde_as, DefaultOnNull};
47use session::context::{QueryContextBuilder, QueryContextRef};
48use snafu::{OptionExt, ResultExt};
49use table::metadata::{RawTableInfo, TableId};
50use table::table_name::TableName;
51use table::table_reference::TableReference;
52
53use crate::error::{
54 self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
55 InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
56};
57use crate::key::FlowId;
58
59#[derive(Debug, Clone)]
61pub enum DdlTask {
62 CreateTable(CreateTableTask),
63 DropTable(DropTableTask),
64 AlterTable(AlterTableTask),
65 TruncateTable(TruncateTableTask),
66 CreateLogicalTables(Vec<CreateTableTask>),
67 DropLogicalTables(Vec<DropTableTask>),
68 AlterLogicalTables(Vec<AlterTableTask>),
69 CreateDatabase(CreateDatabaseTask),
70 DropDatabase(DropDatabaseTask),
71 AlterDatabase(AlterDatabaseTask),
72 CreateFlow(CreateFlowTask),
73 DropFlow(DropFlowTask),
74 #[cfg(feature = "enterprise")]
75 DropTrigger(trigger::DropTriggerTask),
76 CreateView(CreateViewTask),
77 DropView(DropViewTask),
78 #[cfg(feature = "enterprise")]
79 CreateTrigger(trigger::CreateTriggerTask),
80}
81
82impl DdlTask {
83 pub fn new_create_flow(expr: CreateFlowTask) -> Self {
85 DdlTask::CreateFlow(expr)
86 }
87
88 pub fn new_drop_flow(expr: DropFlowTask) -> Self {
90 DdlTask::DropFlow(expr)
91 }
92
93 pub fn new_drop_view(expr: DropViewTask) -> Self {
95 DdlTask::DropView(expr)
96 }
97
98 pub fn new_create_table(
100 expr: CreateTableExpr,
101 partitions: Vec<Partition>,
102 table_info: RawTableInfo,
103 ) -> Self {
104 DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
105 }
106
107 pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
109 DdlTask::CreateLogicalTables(
110 table_data
111 .into_iter()
112 .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
113 .collect(),
114 )
115 }
116
117 pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
119 DdlTask::AlterLogicalTables(
120 table_data
121 .into_iter()
122 .map(|alter_table| AlterTableTask { alter_table })
123 .collect(),
124 )
125 }
126
127 pub fn new_drop_table(
129 catalog: String,
130 schema: String,
131 table: String,
132 table_id: TableId,
133 drop_if_exists: bool,
134 ) -> Self {
135 DdlTask::DropTable(DropTableTask {
136 catalog,
137 schema,
138 table,
139 table_id,
140 drop_if_exists,
141 })
142 }
143
144 pub fn new_create_database(
146 catalog: String,
147 schema: String,
148 create_if_not_exists: bool,
149 options: HashMap<String, String>,
150 ) -> Self {
151 DdlTask::CreateDatabase(CreateDatabaseTask {
152 catalog,
153 schema,
154 create_if_not_exists,
155 options,
156 })
157 }
158
159 pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
161 DdlTask::DropDatabase(DropDatabaseTask {
162 catalog,
163 schema,
164 drop_if_exists,
165 })
166 }
167
168 pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
170 DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
171 }
172
173 pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
175 DdlTask::AlterTable(AlterTableTask { alter_table })
176 }
177
178 pub fn new_truncate_table(
180 catalog: String,
181 schema: String,
182 table: String,
183 table_id: TableId,
184 time_ranges: Vec<(Timestamp, Timestamp)>,
185 ) -> Self {
186 DdlTask::TruncateTable(TruncateTableTask {
187 catalog,
188 schema,
189 table,
190 table_id,
191 time_ranges,
192 })
193 }
194
195 pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
197 DdlTask::CreateView(CreateViewTask {
198 create_view,
199 view_info,
200 })
201 }
202}
203
204impl TryFrom<Task> for DdlTask {
205 type Error = error::Error;
206 fn try_from(task: Task) -> Result<Self> {
207 match task {
208 Task::CreateTableTask(create_table) => {
209 Ok(DdlTask::CreateTable(create_table.try_into()?))
210 }
211 Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
212 Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
213 Task::TruncateTableTask(truncate_table) => {
214 Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
215 }
216 Task::CreateTableTasks(create_tables) => {
217 let tasks = create_tables
218 .tasks
219 .into_iter()
220 .map(|task| task.try_into())
221 .collect::<Result<Vec<_>>>()?;
222
223 Ok(DdlTask::CreateLogicalTables(tasks))
224 }
225 Task::DropTableTasks(drop_tables) => {
226 let tasks = drop_tables
227 .tasks
228 .into_iter()
229 .map(|task| task.try_into())
230 .collect::<Result<Vec<_>>>()?;
231
232 Ok(DdlTask::DropLogicalTables(tasks))
233 }
234 Task::AlterTableTasks(alter_tables) => {
235 let tasks = alter_tables
236 .tasks
237 .into_iter()
238 .map(|task| task.try_into())
239 .collect::<Result<Vec<_>>>()?;
240
241 Ok(DdlTask::AlterLogicalTables(tasks))
242 }
243 Task::CreateDatabaseTask(create_database) => {
244 Ok(DdlTask::CreateDatabase(create_database.try_into()?))
245 }
246 Task::DropDatabaseTask(drop_database) => {
247 Ok(DdlTask::DropDatabase(drop_database.try_into()?))
248 }
249 Task::AlterDatabaseTask(alter_database) => {
250 Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
251 }
252 Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
253 Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
254 Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
255 Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
256 Task::CreateTriggerTask(create_trigger) => {
257 #[cfg(feature = "enterprise")]
258 return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
259 #[cfg(not(feature = "enterprise"))]
260 {
261 let _ = create_trigger;
262 crate::error::UnsupportedSnafu {
263 operation: "create trigger",
264 }
265 .fail()
266 }
267 }
268 Task::DropTriggerTask(drop_trigger) => {
269 #[cfg(feature = "enterprise")]
270 return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
271 #[cfg(not(feature = "enterprise"))]
272 {
273 let _ = drop_trigger;
274 crate::error::UnsupportedSnafu {
275 operation: "drop trigger",
276 }
277 .fail()
278 }
279 }
280 }
281 }
282}
283
284#[derive(Clone)]
285pub struct SubmitDdlTaskRequest {
286 pub query_context: QueryContextRef,
287 pub task: DdlTask,
288}
289
290impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
291 type Error = error::Error;
292
293 fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
294 let task = match request.task {
295 DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
296 DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
297 DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
298 DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
299 DdlTask::CreateLogicalTables(tasks) => {
300 let tasks = tasks
301 .into_iter()
302 .map(|task| task.try_into())
303 .collect::<Result<Vec<_>>>()?;
304
305 Task::CreateTableTasks(PbCreateTableTasks { tasks })
306 }
307 DdlTask::DropLogicalTables(tasks) => {
308 let tasks = tasks
309 .into_iter()
310 .map(|task| task.into())
311 .collect::<Vec<_>>();
312
313 Task::DropTableTasks(PbDropTableTasks { tasks })
314 }
315 DdlTask::AlterLogicalTables(tasks) => {
316 let tasks = tasks
317 .into_iter()
318 .map(|task| task.try_into())
319 .collect::<Result<Vec<_>>>()?;
320
321 Task::AlterTableTasks(PbAlterTableTasks { tasks })
322 }
323 DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
324 DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
325 DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
326 DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
327 DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
328 DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
329 DdlTask::DropView(task) => Task::DropViewTask(task.into()),
330 #[cfg(feature = "enterprise")]
331 DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
332 #[cfg(feature = "enterprise")]
333 DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
334 };
335
336 Ok(Self {
337 header: None,
338 query_context: Some((*request.query_context).clone().into()),
339 task: Some(task),
340 })
341 }
342}
343
344#[derive(Debug, Default)]
345pub struct SubmitDdlTaskResponse {
346 pub key: Vec<u8>,
347 pub table_ids: Vec<TableId>,
349}
350
351impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
352 type Error = error::Error;
353
354 fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
355 let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
356 Ok(Self {
357 key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
358 table_ids,
359 })
360 }
361}
362
363impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
364 fn from(val: SubmitDdlTaskResponse) -> Self {
365 Self {
366 pid: Some(ProcedureId { key: val.key }),
367 table_ids: val
368 .table_ids
369 .into_iter()
370 .map(|id| api::v1::TableId { id })
371 .collect(),
372 ..Default::default()
373 }
374 }
375}
376
377#[derive(Debug, PartialEq, Clone)]
379pub struct CreateViewTask {
380 pub create_view: CreateViewExpr,
381 pub view_info: RawTableInfo,
382}
383
384impl CreateViewTask {
385 pub fn table_ref(&self) -> TableReference {
387 TableReference {
388 catalog: &self.create_view.catalog_name,
389 schema: &self.create_view.schema_name,
390 table: &self.create_view.view_name,
391 }
392 }
393
394 pub fn raw_logical_plan(&self) -> &Vec<u8> {
396 &self.create_view.logical_plan
397 }
398
399 pub fn view_definition(&self) -> &str {
401 &self.create_view.definition
402 }
403
404 pub fn table_names(&self) -> HashSet<TableName> {
406 self.create_view
407 .table_names
408 .iter()
409 .map(|t| t.clone().into())
410 .collect()
411 }
412
413 pub fn columns(&self) -> &Vec<String> {
415 &self.create_view.columns
416 }
417
418 pub fn plan_columns(&self) -> &Vec<String> {
420 &self.create_view.plan_columns
421 }
422}
423
424impl TryFrom<PbCreateViewTask> for CreateViewTask {
425 type Error = error::Error;
426
427 fn try_from(pb: PbCreateViewTask) -> Result<Self> {
428 let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
429
430 Ok(CreateViewTask {
431 create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
432 err_msg: "expected create view",
433 })?,
434 view_info,
435 })
436 }
437}
438
439impl TryFrom<CreateViewTask> for PbCreateViewTask {
440 type Error = error::Error;
441
442 fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
443 Ok(PbCreateViewTask {
444 create_view: Some(task.create_view),
445 view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
446 })
447 }
448}
449
450impl Serialize for CreateViewTask {
451 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
452 where
453 S: serde::Serializer,
454 {
455 let view_info = serde_json::to_vec(&self.view_info)
456 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
457
458 let pb = PbCreateViewTask {
459 create_view: Some(self.create_view.clone()),
460 view_info,
461 };
462 let buf = pb.encode_to_vec();
463 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
464 serializer.serialize_str(&encoded)
465 }
466}
467
468impl<'de> Deserialize<'de> for CreateViewTask {
469 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
470 where
471 D: serde::Deserializer<'de>,
472 {
473 let encoded = String::deserialize(deserializer)?;
474 let buf = general_purpose::STANDARD_NO_PAD
475 .decode(encoded)
476 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
477 let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
478 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
479
480 let expr = CreateViewTask::try_from(expr)
481 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
482
483 Ok(expr)
484 }
485}
486
487#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
489pub struct DropViewTask {
490 pub catalog: String,
491 pub schema: String,
492 pub view: String,
493 pub view_id: TableId,
494 pub drop_if_exists: bool,
495}
496
497impl DropViewTask {
498 pub fn table_ref(&self) -> TableReference {
500 TableReference {
501 catalog: &self.catalog,
502 schema: &self.schema,
503 table: &self.view,
504 }
505 }
506}
507
508impl TryFrom<PbDropViewTask> for DropViewTask {
509 type Error = error::Error;
510
511 fn try_from(pb: PbDropViewTask) -> Result<Self> {
512 let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
513 err_msg: "expected drop view",
514 })?;
515
516 Ok(DropViewTask {
517 catalog: expr.catalog_name,
518 schema: expr.schema_name,
519 view: expr.view_name,
520 view_id: expr
521 .view_id
522 .context(error::InvalidProtoMsgSnafu {
523 err_msg: "expected view_id",
524 })?
525 .id,
526 drop_if_exists: expr.drop_if_exists,
527 })
528 }
529}
530
531impl From<DropViewTask> for PbDropViewTask {
532 fn from(task: DropViewTask) -> Self {
533 PbDropViewTask {
534 drop_view: Some(DropViewExpr {
535 catalog_name: task.catalog,
536 schema_name: task.schema,
537 view_name: task.view,
538 view_id: Some(api::v1::TableId { id: task.view_id }),
539 drop_if_exists: task.drop_if_exists,
540 }),
541 }
542 }
543}
544
545#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
546pub struct DropTableTask {
547 pub catalog: String,
548 pub schema: String,
549 pub table: String,
550 pub table_id: TableId,
551 #[serde(default)]
552 pub drop_if_exists: bool,
553}
554
555impl DropTableTask {
556 pub fn table_ref(&self) -> TableReference {
557 TableReference {
558 catalog: &self.catalog,
559 schema: &self.schema,
560 table: &self.table,
561 }
562 }
563
564 pub fn table_name(&self) -> TableName {
565 TableName {
566 catalog_name: self.catalog.to_string(),
567 schema_name: self.schema.to_string(),
568 table_name: self.table.to_string(),
569 }
570 }
571}
572
573impl TryFrom<PbDropTableTask> for DropTableTask {
574 type Error = error::Error;
575
576 fn try_from(pb: PbDropTableTask) -> Result<Self> {
577 let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
578 err_msg: "expected drop table",
579 })?;
580
581 Ok(Self {
582 catalog: drop_table.catalog_name,
583 schema: drop_table.schema_name,
584 table: drop_table.table_name,
585 table_id: drop_table
586 .table_id
587 .context(error::InvalidProtoMsgSnafu {
588 err_msg: "expected table_id",
589 })?
590 .id,
591 drop_if_exists: drop_table.drop_if_exists,
592 })
593 }
594}
595
596impl From<DropTableTask> for PbDropTableTask {
597 fn from(task: DropTableTask) -> Self {
598 PbDropTableTask {
599 drop_table: Some(DropTableExpr {
600 catalog_name: task.catalog,
601 schema_name: task.schema,
602 table_name: task.table,
603 table_id: Some(api::v1::TableId { id: task.table_id }),
604 drop_if_exists: task.drop_if_exists,
605 }),
606 }
607 }
608}
609
610#[derive(Debug, PartialEq, Clone)]
611pub struct CreateTableTask {
612 pub create_table: CreateTableExpr,
613 pub partitions: Vec<Partition>,
614 pub table_info: RawTableInfo,
615}
616
617impl TryFrom<PbCreateTableTask> for CreateTableTask {
618 type Error = error::Error;
619
620 fn try_from(pb: PbCreateTableTask) -> Result<Self> {
621 let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
622
623 Ok(CreateTableTask::new(
624 pb.create_table.context(error::InvalidProtoMsgSnafu {
625 err_msg: "expected create table",
626 })?,
627 pb.partitions,
628 table_info,
629 ))
630 }
631}
632
633impl TryFrom<CreateTableTask> for PbCreateTableTask {
634 type Error = error::Error;
635
636 fn try_from(task: CreateTableTask) -> Result<Self> {
637 Ok(PbCreateTableTask {
638 table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
639 create_table: Some(task.create_table),
640 partitions: task.partitions,
641 })
642 }
643}
644
645impl CreateTableTask {
646 pub fn new(
647 expr: CreateTableExpr,
648 partitions: Vec<Partition>,
649 table_info: RawTableInfo,
650 ) -> CreateTableTask {
651 CreateTableTask {
652 create_table: expr,
653 partitions,
654 table_info,
655 }
656 }
657
658 pub fn table_name(&self) -> TableName {
659 let table = &self.create_table;
660
661 TableName {
662 catalog_name: table.catalog_name.to_string(),
663 schema_name: table.schema_name.to_string(),
664 table_name: table.table_name.to_string(),
665 }
666 }
667
668 pub fn table_ref(&self) -> TableReference {
669 let table = &self.create_table;
670
671 TableReference {
672 catalog: &table.catalog_name,
673 schema: &table.schema_name,
674 table: &table.table_name,
675 }
676 }
677
678 pub fn set_table_id(&mut self, table_id: TableId) {
680 self.table_info.ident.table_id = table_id;
681 }
682
683 pub fn sort_columns(&mut self) {
688 self.create_table
691 .column_defs
692 .sort_unstable_by(|a, b| a.name.cmp(&b.name));
693
694 self.table_info.sort_columns();
695 }
696}
697
698impl Serialize for CreateTableTask {
699 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
700 where
701 S: serde::Serializer,
702 {
703 let table_info = serde_json::to_vec(&self.table_info)
704 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
705
706 let pb = PbCreateTableTask {
707 create_table: Some(self.create_table.clone()),
708 partitions: self.partitions.clone(),
709 table_info,
710 };
711 let buf = pb.encode_to_vec();
712 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
713 serializer.serialize_str(&encoded)
714 }
715}
716
717impl<'de> Deserialize<'de> for CreateTableTask {
718 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
719 where
720 D: serde::Deserializer<'de>,
721 {
722 let encoded = String::deserialize(deserializer)?;
723 let buf = general_purpose::STANDARD_NO_PAD
724 .decode(encoded)
725 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
726 let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
727 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
728
729 let expr = CreateTableTask::try_from(expr)
730 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
731
732 Ok(expr)
733 }
734}
735
736#[derive(Debug, PartialEq, Clone)]
737pub struct AlterTableTask {
738 pub alter_table: AlterTableExpr,
740}
741
742impl AlterTableTask {
743 pub fn validate(&self) -> Result<()> {
744 self.alter_table
745 .kind
746 .as_ref()
747 .context(error::UnexpectedSnafu {
748 err_msg: "'kind' is absent",
749 })?;
750 Ok(())
751 }
752
753 pub fn table_ref(&self) -> TableReference {
754 TableReference {
755 catalog: &self.alter_table.catalog_name,
756 schema: &self.alter_table.schema_name,
757 table: &self.alter_table.table_name,
758 }
759 }
760
761 pub fn table_name(&self) -> TableName {
762 let table = &self.alter_table;
763
764 TableName {
765 catalog_name: table.catalog_name.to_string(),
766 schema_name: table.schema_name.to_string(),
767 table_name: table.table_name.to_string(),
768 }
769 }
770}
771
772impl TryFrom<PbAlterTableTask> for AlterTableTask {
773 type Error = error::Error;
774
775 fn try_from(pb: PbAlterTableTask) -> Result<Self> {
776 let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
777 err_msg: "expected alter_table",
778 })?;
779
780 Ok(AlterTableTask { alter_table })
781 }
782}
783
784impl TryFrom<AlterTableTask> for PbAlterTableTask {
785 type Error = error::Error;
786
787 fn try_from(task: AlterTableTask) -> Result<Self> {
788 Ok(PbAlterTableTask {
789 alter_table: Some(task.alter_table),
790 })
791 }
792}
793
794impl Serialize for AlterTableTask {
795 fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
796 where
797 S: serde::Serializer,
798 {
799 let pb = PbAlterTableTask {
800 alter_table: Some(self.alter_table.clone()),
801 };
802 let buf = pb.encode_to_vec();
803 let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
804 serializer.serialize_str(&encoded)
805 }
806}
807
808impl<'de> Deserialize<'de> for AlterTableTask {
809 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
810 where
811 D: serde::Deserializer<'de>,
812 {
813 let encoded = String::deserialize(deserializer)?;
814 let buf = general_purpose::STANDARD_NO_PAD
815 .decode(encoded)
816 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
817 let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
818 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
819
820 let expr = AlterTableTask::try_from(expr)
821 .map_err(|err| serde::de::Error::custom(err.to_string()))?;
822
823 Ok(expr)
824 }
825}
826
827#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
828pub struct TruncateTableTask {
829 pub catalog: String,
830 pub schema: String,
831 pub table: String,
832 pub table_id: TableId,
833 pub time_ranges: Vec<(Timestamp, Timestamp)>,
834}
835
836impl TruncateTableTask {
837 pub fn table_ref(&self) -> TableReference {
838 TableReference {
839 catalog: &self.catalog,
840 schema: &self.schema,
841 table: &self.table,
842 }
843 }
844
845 pub fn table_name(&self) -> TableName {
846 TableName {
847 catalog_name: self.catalog.to_string(),
848 schema_name: self.schema.to_string(),
849 table_name: self.table.to_string(),
850 }
851 }
852}
853
854impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
855 type Error = error::Error;
856
857 fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
858 let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
859 err_msg: "expected truncate table",
860 })?;
861
862 Ok(Self {
863 catalog: truncate_table.catalog_name,
864 schema: truncate_table.schema_name,
865 table: truncate_table.table_name,
866 table_id: truncate_table
867 .table_id
868 .context(error::InvalidProtoMsgSnafu {
869 err_msg: "expected table_id",
870 })?
871 .id,
872 time_ranges: truncate_table
873 .time_ranges
874 .map(from_pb_time_ranges)
875 .transpose()
876 .map_err(BoxedError::new)
877 .context(ExternalSnafu)?
878 .unwrap_or_default(),
879 })
880 }
881}
882
883impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
884 type Error = error::Error;
885
886 fn try_from(task: TruncateTableTask) -> Result<Self> {
887 Ok(PbTruncateTableTask {
888 truncate_table: Some(TruncateTableExpr {
889 catalog_name: task.catalog,
890 schema_name: task.schema,
891 table_name: task.table,
892 table_id: Some(api::v1::TableId { id: task.table_id }),
893 time_ranges: Some(
894 to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
895 ),
896 }),
897 })
898 }
899}
900
901#[serde_as]
902#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
903pub struct CreateDatabaseTask {
904 pub catalog: String,
905 pub schema: String,
906 pub create_if_not_exists: bool,
907 #[serde_as(deserialize_as = "DefaultOnNull")]
908 pub options: HashMap<String, String>,
909}
910
911impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
912 type Error = error::Error;
913
914 fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
915 let CreateDatabaseExpr {
916 catalog_name,
917 schema_name,
918 create_if_not_exists,
919 options,
920 } = pb.create_database.context(error::InvalidProtoMsgSnafu {
921 err_msg: "expected create database",
922 })?;
923
924 Ok(CreateDatabaseTask {
925 catalog: catalog_name,
926 schema: schema_name,
927 create_if_not_exists,
928 options,
929 })
930 }
931}
932
933impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
934 type Error = error::Error;
935
936 fn try_from(
937 CreateDatabaseTask {
938 catalog,
939 schema,
940 create_if_not_exists,
941 options,
942 }: CreateDatabaseTask,
943 ) -> Result<Self> {
944 Ok(PbCreateDatabaseTask {
945 create_database: Some(CreateDatabaseExpr {
946 catalog_name: catalog,
947 schema_name: schema,
948 create_if_not_exists,
949 options,
950 }),
951 })
952 }
953}
954
955#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
956pub struct DropDatabaseTask {
957 pub catalog: String,
958 pub schema: String,
959 pub drop_if_exists: bool,
960}
961
962impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
963 type Error = error::Error;
964
965 fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
966 let DropDatabaseExpr {
967 catalog_name,
968 schema_name,
969 drop_if_exists,
970 } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
971 err_msg: "expected drop database",
972 })?;
973
974 Ok(DropDatabaseTask {
975 catalog: catalog_name,
976 schema: schema_name,
977 drop_if_exists,
978 })
979 }
980}
981
982impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
983 type Error = error::Error;
984
985 fn try_from(
986 DropDatabaseTask {
987 catalog,
988 schema,
989 drop_if_exists,
990 }: DropDatabaseTask,
991 ) -> Result<Self> {
992 Ok(PbDropDatabaseTask {
993 drop_database: Some(DropDatabaseExpr {
994 catalog_name: catalog,
995 schema_name: schema,
996 drop_if_exists,
997 }),
998 })
999 }
1000}
1001
1002#[derive(Debug, PartialEq, Clone)]
1003pub struct AlterDatabaseTask {
1004 pub alter_expr: AlterDatabaseExpr,
1005}
1006
1007impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1008 type Error = error::Error;
1009
1010 fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1011 Ok(PbAlterDatabaseTask {
1012 task: Some(task.alter_expr),
1013 })
1014 }
1015}
1016
1017impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1018 type Error = error::Error;
1019
1020 fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1021 let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1022 err_msg: "expected alter database",
1023 })?;
1024
1025 Ok(AlterDatabaseTask { alter_expr })
1026 }
1027}
1028
1029impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1030 type Error = error::Error;
1031
1032 fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1033 match pb {
1034 PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1035 Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1036 options
1037 .set_database_options
1038 .into_iter()
1039 .map(SetDatabaseOption::try_from)
1040 .collect::<Result<Vec<_>>>()?,
1041 )))
1042 }
1043 PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1044 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1045 options
1046 .keys
1047 .iter()
1048 .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1049 .collect::<Result<Vec<_>>>()?,
1050 )),
1051 ),
1052 }
1053 }
1054}
1055
1056const TTL_KEY: &str = "ttl";
1057
1058impl TryFrom<PbOption> for SetDatabaseOption {
1059 type Error = error::Error;
1060
1061 fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1062 match key.to_ascii_lowercase().as_str() {
1063 TTL_KEY => {
1064 let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1065 .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1066
1067 Ok(SetDatabaseOption::Ttl(ttl))
1068 }
1069 _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1070 }
1071 }
1072}
1073
1074#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1075pub enum SetDatabaseOption {
1076 Ttl(DatabaseTimeToLive),
1077}
1078
1079#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1080pub enum UnsetDatabaseOption {
1081 Ttl,
1082}
1083
1084impl TryFrom<&str> for UnsetDatabaseOption {
1085 type Error = error::Error;
1086
1087 fn try_from(key: &str) -> Result<Self> {
1088 match key.to_ascii_lowercase().as_str() {
1089 TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1090 _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1091 }
1092 }
1093}
1094
1095#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1096pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1097
1098#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1099pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1100
1101#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1102pub enum AlterDatabaseKind {
1103 SetDatabaseOptions(SetDatabaseOptions),
1104 UnsetDatabaseOptions(UnsetDatabaseOptions),
1105}
1106
1107impl AlterDatabaseTask {
1108 pub fn catalog(&self) -> &str {
1109 &self.alter_expr.catalog_name
1110 }
1111
1112 pub fn schema(&self) -> &str {
1113 &self.alter_expr.catalog_name
1114 }
1115}
1116
1117#[derive(Debug, Clone, Serialize, Deserialize)]
1119pub struct CreateFlowTask {
1120 pub catalog_name: String,
1121 pub flow_name: String,
1122 pub source_table_names: Vec<TableName>,
1123 pub sink_table_name: TableName,
1124 pub or_replace: bool,
1125 pub create_if_not_exists: bool,
1126 pub expire_after: Option<i64>,
1128 pub comment: String,
1129 pub sql: String,
1130 pub flow_options: HashMap<String, String>,
1131}
1132
1133impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1134 type Error = error::Error;
1135
1136 fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1137 let CreateFlowExpr {
1138 catalog_name,
1139 flow_name,
1140 source_table_names,
1141 sink_table_name,
1142 or_replace,
1143 create_if_not_exists,
1144 expire_after,
1145 comment,
1146 sql,
1147 flow_options,
1148 } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1149 err_msg: "expected create_flow",
1150 })?;
1151
1152 Ok(CreateFlowTask {
1153 catalog_name,
1154 flow_name,
1155 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1156 sink_table_name: sink_table_name
1157 .context(error::InvalidProtoMsgSnafu {
1158 err_msg: "expected sink_table_name",
1159 })?
1160 .into(),
1161 or_replace,
1162 create_if_not_exists,
1163 expire_after: expire_after.map(|e| e.value),
1164 comment,
1165 sql,
1166 flow_options,
1167 })
1168 }
1169}
1170
1171impl From<CreateFlowTask> for PbCreateFlowTask {
1172 fn from(
1173 CreateFlowTask {
1174 catalog_name,
1175 flow_name,
1176 source_table_names,
1177 sink_table_name,
1178 or_replace,
1179 create_if_not_exists,
1180 expire_after,
1181 comment,
1182 sql,
1183 flow_options,
1184 }: CreateFlowTask,
1185 ) -> Self {
1186 PbCreateFlowTask {
1187 create_flow: Some(CreateFlowExpr {
1188 catalog_name,
1189 flow_name,
1190 source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1191 sink_table_name: Some(sink_table_name.into()),
1192 or_replace,
1193 create_if_not_exists,
1194 expire_after: expire_after.map(|value| ExpireAfter { value }),
1195 comment,
1196 sql,
1197 flow_options,
1198 }),
1199 }
1200 }
1201}
1202
1203#[derive(Debug, Clone, Serialize, Deserialize)]
1205pub struct DropFlowTask {
1206 pub catalog_name: String,
1207 pub flow_name: String,
1208 pub flow_id: FlowId,
1209 pub drop_if_exists: bool,
1210}
1211
1212impl TryFrom<PbDropFlowTask> for DropFlowTask {
1213 type Error = error::Error;
1214
1215 fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1216 let DropFlowExpr {
1217 catalog_name,
1218 flow_name,
1219 flow_id,
1220 drop_if_exists,
1221 } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1222 err_msg: "expected drop_flow",
1223 })?;
1224 let flow_id = flow_id
1225 .context(error::InvalidProtoMsgSnafu {
1226 err_msg: "expected flow_id",
1227 })?
1228 .id;
1229 Ok(DropFlowTask {
1230 catalog_name,
1231 flow_name,
1232 flow_id,
1233 drop_if_exists,
1234 })
1235 }
1236}
1237
1238impl From<DropFlowTask> for PbDropFlowTask {
1239 fn from(
1240 DropFlowTask {
1241 catalog_name,
1242 flow_name,
1243 flow_id,
1244 drop_if_exists,
1245 }: DropFlowTask,
1246 ) -> Self {
1247 PbDropFlowTask {
1248 drop_flow: Some(DropFlowExpr {
1249 catalog_name,
1250 flow_name,
1251 flow_id: Some(api::v1::FlowId { id: flow_id }),
1252 drop_if_exists,
1253 }),
1254 }
1255 }
1256}
1257
1258#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1259pub struct QueryContext {
1260 pub(crate) current_catalog: String,
1261 pub(crate) current_schema: String,
1262 pub(crate) timezone: String,
1263 pub(crate) extensions: HashMap<String, String>,
1264 pub(crate) channel: u8,
1265}
1266
1267impl QueryContext {
1268 pub fn current_catalog(&self) -> &str {
1270 &self.current_catalog
1271 }
1272
1273 pub fn current_schema(&self) -> &str {
1275 &self.current_schema
1276 }
1277
1278 pub fn timezone(&self) -> &str {
1280 &self.timezone
1281 }
1282
1283 pub fn extensions(&self) -> &HashMap<String, String> {
1285 &self.extensions
1286 }
1287
1288 pub fn channel(&self) -> u8 {
1290 self.channel
1291 }
1292}
1293
1294#[derive(Debug, Clone, Serialize, PartialEq)]
1298pub struct FlowQueryContext {
1299 pub(crate) catalog: String,
1301 pub(crate) schema: String,
1303 pub(crate) timezone: String,
1305}
1306
1307impl<'de> Deserialize<'de> for FlowQueryContext {
1308 fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1309 where
1310 D: serde::Deserializer<'de>,
1311 {
1312 #[derive(Deserialize)]
1314 #[serde(untagged)]
1315 enum ContextCompat {
1316 Flow(FlowQueryContextHelper),
1317 Full(QueryContext),
1318 }
1319
1320 #[derive(Deserialize)]
1321 struct FlowQueryContextHelper {
1322 catalog: String,
1323 schema: String,
1324 timezone: String,
1325 }
1326
1327 match ContextCompat::deserialize(deserializer)? {
1328 ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1329 catalog: helper.catalog,
1330 schema: helper.schema,
1331 timezone: helper.timezone,
1332 }),
1333 ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1334 }
1335 }
1336}
1337
1338impl From<QueryContextRef> for QueryContext {
1339 fn from(query_context: QueryContextRef) -> Self {
1340 QueryContext {
1341 current_catalog: query_context.current_catalog().to_string(),
1342 current_schema: query_context.current_schema().to_string(),
1343 timezone: query_context.timezone().to_string(),
1344 extensions: query_context.extensions(),
1345 channel: query_context.channel() as u8,
1346 }
1347 }
1348}
1349
1350impl TryFrom<QueryContext> for session::context::QueryContext {
1351 type Error = error::Error;
1352 fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1353 Ok(QueryContextBuilder::default()
1354 .current_catalog(value.current_catalog)
1355 .current_schema(value.current_schema)
1356 .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1357 .extensions(value.extensions)
1358 .channel((value.channel as u32).into())
1359 .build())
1360 }
1361}
1362
1363impl From<QueryContext> for PbQueryContext {
1364 fn from(
1365 QueryContext {
1366 current_catalog,
1367 current_schema,
1368 timezone,
1369 extensions,
1370 channel,
1371 }: QueryContext,
1372 ) -> Self {
1373 PbQueryContext {
1374 current_catalog,
1375 current_schema,
1376 timezone,
1377 extensions,
1378 channel: channel as u32,
1379 snapshot_seqs: None,
1380 explain: None,
1381 }
1382 }
1383}
1384
1385impl From<QueryContext> for FlowQueryContext {
1386 fn from(ctx: QueryContext) -> Self {
1387 Self {
1388 catalog: ctx.current_catalog,
1389 schema: ctx.current_schema,
1390 timezone: ctx.timezone,
1391 }
1392 }
1393}
1394
1395impl From<QueryContextRef> for FlowQueryContext {
1396 fn from(ctx: QueryContextRef) -> Self {
1397 Self {
1398 catalog: ctx.current_catalog().to_string(),
1399 schema: ctx.current_schema().to_string(),
1400 timezone: ctx.timezone().to_string(),
1401 }
1402 }
1403}
1404
1405impl From<FlowQueryContext> for QueryContext {
1406 fn from(flow_ctx: FlowQueryContext) -> Self {
1407 Self {
1408 current_catalog: flow_ctx.catalog,
1409 current_schema: flow_ctx.schema,
1410 timezone: flow_ctx.timezone,
1411 extensions: HashMap::new(),
1412 channel: 0, }
1414 }
1415}
1416
1417impl From<FlowQueryContext> for PbQueryContext {
1418 fn from(flow_ctx: FlowQueryContext) -> Self {
1419 let query_ctx: QueryContext = flow_ctx.into();
1420 query_ctx.into()
1421 }
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426 use std::sync::Arc;
1427
1428 use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1429 use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1430 use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1431 use store_api::storage::ConcreteDataType;
1432 use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1433 use table::test_util::table_info::test_table_info;
1434
1435 use super::{AlterTableTask, CreateTableTask, *};
1436
1437 #[test]
1438 fn test_basic_ser_de_create_table_task() {
1439 let schema = SchemaBuilder::default().build().unwrap();
1440 let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1441 let task = CreateTableTask::new(
1442 CreateTableExpr::default(),
1443 Vec::new(),
1444 RawTableInfo::from(table_info),
1445 );
1446
1447 let output = serde_json::to_vec(&task).unwrap();
1448
1449 let de = serde_json::from_slice(&output).unwrap();
1450 assert_eq!(task, de);
1451 }
1452
1453 #[test]
1454 fn test_basic_ser_de_alter_table_task() {
1455 let task = AlterTableTask {
1456 alter_table: AlterTableExpr::default(),
1457 };
1458
1459 let output = serde_json::to_vec(&task).unwrap();
1460
1461 let de = serde_json::from_slice(&output).unwrap();
1462 assert_eq!(task, de);
1463 }
1464
1465 #[test]
1466 fn test_sort_columns() {
1467 let raw_schema = RawSchema {
1469 column_schemas: vec![
1470 ColumnSchema::new(
1471 "column3".to_string(),
1472 ConcreteDataType::string_datatype(),
1473 true,
1474 ),
1475 ColumnSchema::new(
1476 "column1".to_string(),
1477 ConcreteDataType::timestamp_millisecond_datatype(),
1478 false,
1479 )
1480 .with_time_index(true),
1481 ColumnSchema::new(
1482 "column2".to_string(),
1483 ConcreteDataType::float64_datatype(),
1484 true,
1485 ),
1486 ],
1487 timestamp_index: Some(1),
1488 version: 0,
1489 };
1490
1491 let raw_table_meta = RawTableMeta {
1493 schema: raw_schema,
1494 primary_key_indices: vec![0],
1495 value_indices: vec![2],
1496 engine: METRIC_ENGINE_NAME.to_string(),
1497 next_column_id: 0,
1498 region_numbers: vec![0],
1499 options: Default::default(),
1500 created_on: Default::default(),
1501 partition_key_indices: Default::default(),
1502 column_ids: Default::default(),
1503 };
1504
1505 let raw_table_info = RawTableInfo {
1507 ident: Default::default(),
1508 meta: raw_table_meta,
1509 name: Default::default(),
1510 desc: Default::default(),
1511 catalog_name: Default::default(),
1512 schema_name: Default::default(),
1513 table_type: TableType::Base,
1514 };
1515
1516 let create_table_expr = CreateTableExpr {
1518 column_defs: vec![
1519 ColumnDef {
1520 name: "column3".to_string(),
1521 semantic_type: SemanticType::Tag as i32,
1522 ..Default::default()
1523 },
1524 ColumnDef {
1525 name: "column1".to_string(),
1526 semantic_type: SemanticType::Timestamp as i32,
1527 ..Default::default()
1528 },
1529 ColumnDef {
1530 name: "column2".to_string(),
1531 semantic_type: SemanticType::Field as i32,
1532 ..Default::default()
1533 },
1534 ],
1535 primary_keys: vec!["column3".to_string()],
1536 ..Default::default()
1537 };
1538
1539 let mut create_table_task =
1540 CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1541
1542 create_table_task.sort_columns();
1544
1545 assert_eq!(
1547 create_table_task.create_table.column_defs[0].name,
1548 "column1".to_string()
1549 );
1550 assert_eq!(
1551 create_table_task.create_table.column_defs[1].name,
1552 "column2".to_string()
1553 );
1554 assert_eq!(
1555 create_table_task.create_table.column_defs[2].name,
1556 "column3".to_string()
1557 );
1558
1559 assert_eq!(
1561 create_table_task.table_info.meta.schema.timestamp_index,
1562 Some(0)
1563 );
1564 assert_eq!(
1565 create_table_task.table_info.meta.primary_key_indices,
1566 vec![2]
1567 );
1568 assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1569 }
1570
1571 #[test]
1572 fn test_flow_query_context_conversion_from_query_context() {
1573 use std::collections::HashMap;
1574 let mut extensions = HashMap::new();
1575 extensions.insert("key1".to_string(), "value1".to_string());
1576 extensions.insert("key2".to_string(), "value2".to_string());
1577
1578 let query_ctx = QueryContext {
1579 current_catalog: "test_catalog".to_string(),
1580 current_schema: "test_schema".to_string(),
1581 timezone: "UTC".to_string(),
1582 extensions,
1583 channel: 5,
1584 };
1585
1586 let flow_ctx: FlowQueryContext = query_ctx.into();
1587
1588 assert_eq!(flow_ctx.catalog, "test_catalog");
1589 assert_eq!(flow_ctx.schema, "test_schema");
1590 assert_eq!(flow_ctx.timezone, "UTC");
1591 }
1592
1593 #[test]
1594 fn test_flow_query_context_conversion_to_query_context() {
1595 let flow_ctx = FlowQueryContext {
1596 catalog: "prod_catalog".to_string(),
1597 schema: "public".to_string(),
1598 timezone: "America/New_York".to_string(),
1599 };
1600
1601 let query_ctx: QueryContext = flow_ctx.clone().into();
1602
1603 assert_eq!(query_ctx.current_catalog, "prod_catalog");
1604 assert_eq!(query_ctx.current_schema, "public");
1605 assert_eq!(query_ctx.timezone, "America/New_York");
1606 assert!(query_ctx.extensions.is_empty());
1607 assert_eq!(query_ctx.channel, 0);
1608
1609 let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1611 assert_eq!(flow_ctx, flow_ctx_roundtrip);
1612 }
1613
1614 #[test]
1615 fn test_flow_query_context_conversion_from_query_context_ref() {
1616 use common_time::Timezone;
1617 use session::context::QueryContextBuilder;
1618
1619 let session_ctx = QueryContextBuilder::default()
1620 .current_catalog("session_catalog".to_string())
1621 .current_schema("session_schema".to_string())
1622 .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1623 .build();
1624
1625 let session_ctx_ref = Arc::new(session_ctx);
1626 let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1627
1628 assert_eq!(flow_ctx.catalog, "session_catalog");
1629 assert_eq!(flow_ctx.schema, "session_schema");
1630 assert_eq!(flow_ctx.timezone, "Europe/London");
1631 }
1632
1633 #[test]
1634 fn test_flow_query_context_serialization() {
1635 let flow_ctx = FlowQueryContext {
1636 catalog: "test_catalog".to_string(),
1637 schema: "test_schema".to_string(),
1638 timezone: "UTC".to_string(),
1639 };
1640
1641 let serialized = serde_json::to_string(&flow_ctx).unwrap();
1642 let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1643
1644 assert_eq!(flow_ctx, deserialized);
1645
1646 let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1648 assert_eq!(json_value["catalog"], "test_catalog");
1649 assert_eq!(json_value["schema"], "test_schema");
1650 assert_eq!(json_value["timezone"], "UTC");
1651 }
1652
1653 #[test]
1654 fn test_flow_query_context_conversion_to_pb() {
1655 let flow_ctx = FlowQueryContext {
1656 catalog: "pb_catalog".to_string(),
1657 schema: "pb_schema".to_string(),
1658 timezone: "Asia/Tokyo".to_string(),
1659 };
1660
1661 let pb_ctx: PbQueryContext = flow_ctx.into();
1662
1663 assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1664 assert_eq!(pb_ctx.current_schema, "pb_schema");
1665 assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1666 assert!(pb_ctx.extensions.is_empty());
1667 assert_eq!(pb_ctx.channel, 0);
1668 assert!(pb_ctx.snapshot_seqs.is_none());
1669 assert!(pb_ctx.explain.is_none());
1670 }
1671}