common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26    AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
27    CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
28    CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
29    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
30    DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
31    DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
32    DropViewTask as PbDropViewTask, Partition, ProcedureId,
33    TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36    AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
37    CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval,
38    ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
39};
40use base64::Engine as _;
41use base64::engine::general_purpose;
42use common_error::ext::BoxedError;
43use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use serde_with::{DefaultOnNull, serde_as};
47use session::context::{QueryContextBuilder, QueryContextRef};
48use snafu::{OptionExt, ResultExt};
49use table::metadata::{RawTableInfo, TableId};
50use table::requests::validate_database_option;
51use table::table_name::TableName;
52use table::table_reference::TableReference;
53
54use crate::error::{
55    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
56    InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
57};
58use crate::key::FlowId;
59
60/// DDL tasks
61#[derive(Debug, Clone)]
62pub enum DdlTask {
63    CreateTable(CreateTableTask),
64    DropTable(DropTableTask),
65    AlterTable(AlterTableTask),
66    TruncateTable(TruncateTableTask),
67    CreateLogicalTables(Vec<CreateTableTask>),
68    DropLogicalTables(Vec<DropTableTask>),
69    AlterLogicalTables(Vec<AlterTableTask>),
70    CreateDatabase(CreateDatabaseTask),
71    DropDatabase(DropDatabaseTask),
72    AlterDatabase(AlterDatabaseTask),
73    CreateFlow(CreateFlowTask),
74    DropFlow(DropFlowTask),
75    #[cfg(feature = "enterprise")]
76    DropTrigger(trigger::DropTriggerTask),
77    CreateView(CreateViewTask),
78    DropView(DropViewTask),
79    #[cfg(feature = "enterprise")]
80    CreateTrigger(trigger::CreateTriggerTask),
81}
82
83impl DdlTask {
84    /// Creates a [`DdlTask`] to create a flow.
85    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
86        DdlTask::CreateFlow(expr)
87    }
88
89    /// Creates a [`DdlTask`] to drop a flow.
90    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
91        DdlTask::DropFlow(expr)
92    }
93
94    /// Creates a [`DdlTask`] to drop a view.
95    pub fn new_drop_view(expr: DropViewTask) -> Self {
96        DdlTask::DropView(expr)
97    }
98
99    /// Creates a [`DdlTask`] to create a table.
100    pub fn new_create_table(
101        expr: CreateTableExpr,
102        partitions: Vec<Partition>,
103        table_info: RawTableInfo,
104    ) -> Self {
105        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
106    }
107
108    /// Creates a [`DdlTask`] to create several logical tables.
109    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
110        DdlTask::CreateLogicalTables(
111            table_data
112                .into_iter()
113                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
114                .collect(),
115        )
116    }
117
118    /// Creates a [`DdlTask`] to alter several logical tables.
119    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
120        DdlTask::AlterLogicalTables(
121            table_data
122                .into_iter()
123                .map(|alter_table| AlterTableTask { alter_table })
124                .collect(),
125        )
126    }
127
128    /// Creates a [`DdlTask`] to drop a table.
129    pub fn new_drop_table(
130        catalog: String,
131        schema: String,
132        table: String,
133        table_id: TableId,
134        drop_if_exists: bool,
135    ) -> Self {
136        DdlTask::DropTable(DropTableTask {
137            catalog,
138            schema,
139            table,
140            table_id,
141            drop_if_exists,
142        })
143    }
144
145    /// Creates a [`DdlTask`] to create a database.
146    pub fn new_create_database(
147        catalog: String,
148        schema: String,
149        create_if_not_exists: bool,
150        options: HashMap<String, String>,
151    ) -> Self {
152        DdlTask::CreateDatabase(CreateDatabaseTask {
153            catalog,
154            schema,
155            create_if_not_exists,
156            options,
157        })
158    }
159
160    /// Creates a [`DdlTask`] to drop a database.
161    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
162        DdlTask::DropDatabase(DropDatabaseTask {
163            catalog,
164            schema,
165            drop_if_exists,
166        })
167    }
168
169    /// Creates a [`DdlTask`] to alter a database.
170    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
171        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
172    }
173
174    /// Creates a [`DdlTask`] to alter a table.
175    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
176        DdlTask::AlterTable(AlterTableTask { alter_table })
177    }
178
179    /// Creates a [`DdlTask`] to truncate a table.
180    pub fn new_truncate_table(
181        catalog: String,
182        schema: String,
183        table: String,
184        table_id: TableId,
185        time_ranges: Vec<(Timestamp, Timestamp)>,
186    ) -> Self {
187        DdlTask::TruncateTable(TruncateTableTask {
188            catalog,
189            schema,
190            table,
191            table_id,
192            time_ranges,
193        })
194    }
195
196    /// Creates a [`DdlTask`] to create a view.
197    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
198        DdlTask::CreateView(CreateViewTask {
199            create_view,
200            view_info,
201        })
202    }
203}
204
205impl TryFrom<Task> for DdlTask {
206    type Error = error::Error;
207    fn try_from(task: Task) -> Result<Self> {
208        match task {
209            Task::CreateTableTask(create_table) => {
210                Ok(DdlTask::CreateTable(create_table.try_into()?))
211            }
212            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
213            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
214            Task::TruncateTableTask(truncate_table) => {
215                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
216            }
217            Task::CreateTableTasks(create_tables) => {
218                let tasks = create_tables
219                    .tasks
220                    .into_iter()
221                    .map(|task| task.try_into())
222                    .collect::<Result<Vec<_>>>()?;
223
224                Ok(DdlTask::CreateLogicalTables(tasks))
225            }
226            Task::DropTableTasks(drop_tables) => {
227                let tasks = drop_tables
228                    .tasks
229                    .into_iter()
230                    .map(|task| task.try_into())
231                    .collect::<Result<Vec<_>>>()?;
232
233                Ok(DdlTask::DropLogicalTables(tasks))
234            }
235            Task::AlterTableTasks(alter_tables) => {
236                let tasks = alter_tables
237                    .tasks
238                    .into_iter()
239                    .map(|task| task.try_into())
240                    .collect::<Result<Vec<_>>>()?;
241
242                Ok(DdlTask::AlterLogicalTables(tasks))
243            }
244            Task::CreateDatabaseTask(create_database) => {
245                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
246            }
247            Task::DropDatabaseTask(drop_database) => {
248                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
249            }
250            Task::AlterDatabaseTask(alter_database) => {
251                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
252            }
253            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
254            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
255            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
256            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
257            Task::CreateTriggerTask(create_trigger) => {
258                #[cfg(feature = "enterprise")]
259                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
260                #[cfg(not(feature = "enterprise"))]
261                {
262                    let _ = create_trigger;
263                    crate::error::UnsupportedSnafu {
264                        operation: "create trigger",
265                    }
266                    .fail()
267                }
268            }
269            Task::DropTriggerTask(drop_trigger) => {
270                #[cfg(feature = "enterprise")]
271                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
272                #[cfg(not(feature = "enterprise"))]
273                {
274                    let _ = drop_trigger;
275                    crate::error::UnsupportedSnafu {
276                        operation: "drop trigger",
277                    }
278                    .fail()
279                }
280            }
281        }
282    }
283}
284
285#[derive(Clone)]
286pub struct SubmitDdlTaskRequest {
287    pub query_context: QueryContextRef,
288    pub task: DdlTask,
289}
290
291impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
292    type Error = error::Error;
293
294    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
295        let task = match request.task {
296            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
297            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
298            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
299            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
300            DdlTask::CreateLogicalTables(tasks) => {
301                let tasks = tasks
302                    .into_iter()
303                    .map(|task| task.try_into())
304                    .collect::<Result<Vec<_>>>()?;
305
306                Task::CreateTableTasks(PbCreateTableTasks { tasks })
307            }
308            DdlTask::DropLogicalTables(tasks) => {
309                let tasks = tasks
310                    .into_iter()
311                    .map(|task| task.into())
312                    .collect::<Vec<_>>();
313
314                Task::DropTableTasks(PbDropTableTasks { tasks })
315            }
316            DdlTask::AlterLogicalTables(tasks) => {
317                let tasks = tasks
318                    .into_iter()
319                    .map(|task| task.try_into())
320                    .collect::<Result<Vec<_>>>()?;
321
322                Task::AlterTableTasks(PbAlterTableTasks { tasks })
323            }
324            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
325            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
326            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
327            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
328            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
329            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
330            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
331            #[cfg(feature = "enterprise")]
332            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
333            #[cfg(feature = "enterprise")]
334            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
335        };
336
337        Ok(Self {
338            header: None,
339            query_context: Some((*request.query_context).clone().into()),
340            task: Some(task),
341        })
342    }
343}
344
345#[derive(Debug, Default)]
346pub struct SubmitDdlTaskResponse {
347    pub key: Vec<u8>,
348    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
349    pub table_ids: Vec<TableId>,
350}
351
352impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
353    type Error = error::Error;
354
355    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
356        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
357        Ok(Self {
358            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
359            table_ids,
360        })
361    }
362}
363
364impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
365    fn from(val: SubmitDdlTaskResponse) -> Self {
366        Self {
367            pid: Some(ProcedureId { key: val.key }),
368            table_ids: val
369                .table_ids
370                .into_iter()
371                .map(|id| api::v1::TableId { id })
372                .collect(),
373            ..Default::default()
374        }
375    }
376}
377
378/// A `CREATE VIEW` task.
379#[derive(Debug, PartialEq, Clone)]
380pub struct CreateViewTask {
381    pub create_view: CreateViewExpr,
382    pub view_info: RawTableInfo,
383}
384
385impl CreateViewTask {
386    /// Returns the [`TableReference`] of view.
387    pub fn table_ref(&self) -> TableReference<'_> {
388        TableReference {
389            catalog: &self.create_view.catalog_name,
390            schema: &self.create_view.schema_name,
391            table: &self.create_view.view_name,
392        }
393    }
394
395    /// Returns the encoded logical plan
396    pub fn raw_logical_plan(&self) -> &Vec<u8> {
397        &self.create_view.logical_plan
398    }
399
400    /// Returns the view definition in SQL
401    pub fn view_definition(&self) -> &str {
402        &self.create_view.definition
403    }
404
405    /// Returns the resolved table names in view's logical plan
406    pub fn table_names(&self) -> HashSet<TableName> {
407        self.create_view
408            .table_names
409            .iter()
410            .map(|t| t.clone().into())
411            .collect()
412    }
413
414    /// Returns the view's columns
415    pub fn columns(&self) -> &Vec<String> {
416        &self.create_view.columns
417    }
418
419    /// Returns the original logical plan's columns
420    pub fn plan_columns(&self) -> &Vec<String> {
421        &self.create_view.plan_columns
422    }
423}
424
425impl TryFrom<PbCreateViewTask> for CreateViewTask {
426    type Error = error::Error;
427
428    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
429        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
430
431        Ok(CreateViewTask {
432            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
433                err_msg: "expected create view",
434            })?,
435            view_info,
436        })
437    }
438}
439
440impl TryFrom<CreateViewTask> for PbCreateViewTask {
441    type Error = error::Error;
442
443    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
444        Ok(PbCreateViewTask {
445            create_view: Some(task.create_view),
446            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
447        })
448    }
449}
450
451impl Serialize for CreateViewTask {
452    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
453    where
454        S: serde::Serializer,
455    {
456        let view_info = serde_json::to_vec(&self.view_info)
457            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
458
459        let pb = PbCreateViewTask {
460            create_view: Some(self.create_view.clone()),
461            view_info,
462        };
463        let buf = pb.encode_to_vec();
464        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
465        serializer.serialize_str(&encoded)
466    }
467}
468
469impl<'de> Deserialize<'de> for CreateViewTask {
470    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
471    where
472        D: serde::Deserializer<'de>,
473    {
474        let encoded = String::deserialize(deserializer)?;
475        let buf = general_purpose::STANDARD_NO_PAD
476            .decode(encoded)
477            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
478        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
479            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
480
481        let expr = CreateViewTask::try_from(expr)
482            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
483
484        Ok(expr)
485    }
486}
487
488/// A `DROP VIEW` task.
489#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
490pub struct DropViewTask {
491    pub catalog: String,
492    pub schema: String,
493    pub view: String,
494    pub view_id: TableId,
495    pub drop_if_exists: bool,
496}
497
498impl DropViewTask {
499    /// Returns the [`TableReference`] of view.
500    pub fn table_ref(&self) -> TableReference<'_> {
501        TableReference {
502            catalog: &self.catalog,
503            schema: &self.schema,
504            table: &self.view,
505        }
506    }
507}
508
509impl TryFrom<PbDropViewTask> for DropViewTask {
510    type Error = error::Error;
511
512    fn try_from(pb: PbDropViewTask) -> Result<Self> {
513        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
514            err_msg: "expected drop view",
515        })?;
516
517        Ok(DropViewTask {
518            catalog: expr.catalog_name,
519            schema: expr.schema_name,
520            view: expr.view_name,
521            view_id: expr
522                .view_id
523                .context(error::InvalidProtoMsgSnafu {
524                    err_msg: "expected view_id",
525                })?
526                .id,
527            drop_if_exists: expr.drop_if_exists,
528        })
529    }
530}
531
532impl From<DropViewTask> for PbDropViewTask {
533    fn from(task: DropViewTask) -> Self {
534        PbDropViewTask {
535            drop_view: Some(DropViewExpr {
536                catalog_name: task.catalog,
537                schema_name: task.schema,
538                view_name: task.view,
539                view_id: Some(api::v1::TableId { id: task.view_id }),
540                drop_if_exists: task.drop_if_exists,
541            }),
542        }
543    }
544}
545
546#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
547pub struct DropTableTask {
548    pub catalog: String,
549    pub schema: String,
550    pub table: String,
551    pub table_id: TableId,
552    #[serde(default)]
553    pub drop_if_exists: bool,
554}
555
556impl DropTableTask {
557    pub fn table_ref(&self) -> TableReference<'_> {
558        TableReference {
559            catalog: &self.catalog,
560            schema: &self.schema,
561            table: &self.table,
562        }
563    }
564
565    pub fn table_name(&self) -> TableName {
566        TableName {
567            catalog_name: self.catalog.clone(),
568            schema_name: self.schema.clone(),
569            table_name: self.table.clone(),
570        }
571    }
572}
573
574impl TryFrom<PbDropTableTask> for DropTableTask {
575    type Error = error::Error;
576
577    fn try_from(pb: PbDropTableTask) -> Result<Self> {
578        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
579            err_msg: "expected drop table",
580        })?;
581
582        Ok(Self {
583            catalog: drop_table.catalog_name,
584            schema: drop_table.schema_name,
585            table: drop_table.table_name,
586            table_id: drop_table
587                .table_id
588                .context(error::InvalidProtoMsgSnafu {
589                    err_msg: "expected table_id",
590                })?
591                .id,
592            drop_if_exists: drop_table.drop_if_exists,
593        })
594    }
595}
596
597impl From<DropTableTask> for PbDropTableTask {
598    fn from(task: DropTableTask) -> Self {
599        PbDropTableTask {
600            drop_table: Some(DropTableExpr {
601                catalog_name: task.catalog,
602                schema_name: task.schema,
603                table_name: task.table,
604                table_id: Some(api::v1::TableId { id: task.table_id }),
605                drop_if_exists: task.drop_if_exists,
606            }),
607        }
608    }
609}
610
611#[derive(Debug, PartialEq, Clone)]
612pub struct CreateTableTask {
613    pub create_table: CreateTableExpr,
614    pub partitions: Vec<Partition>,
615    pub table_info: RawTableInfo,
616}
617
618impl TryFrom<PbCreateTableTask> for CreateTableTask {
619    type Error = error::Error;
620
621    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
622        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
623
624        Ok(CreateTableTask::new(
625            pb.create_table.context(error::InvalidProtoMsgSnafu {
626                err_msg: "expected create table",
627            })?,
628            pb.partitions,
629            table_info,
630        ))
631    }
632}
633
634impl TryFrom<CreateTableTask> for PbCreateTableTask {
635    type Error = error::Error;
636
637    fn try_from(task: CreateTableTask) -> Result<Self> {
638        Ok(PbCreateTableTask {
639            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
640            create_table: Some(task.create_table),
641            partitions: task.partitions,
642        })
643    }
644}
645
646impl CreateTableTask {
647    pub fn new(
648        expr: CreateTableExpr,
649        partitions: Vec<Partition>,
650        table_info: RawTableInfo,
651    ) -> CreateTableTask {
652        CreateTableTask {
653            create_table: expr,
654            partitions,
655            table_info,
656        }
657    }
658
659    pub fn table_name(&self) -> TableName {
660        let table = &self.create_table;
661
662        TableName {
663            catalog_name: table.catalog_name.clone(),
664            schema_name: table.schema_name.clone(),
665            table_name: table.table_name.clone(),
666        }
667    }
668
669    pub fn table_ref(&self) -> TableReference<'_> {
670        let table = &self.create_table;
671
672        TableReference {
673            catalog: &table.catalog_name,
674            schema: &table.schema_name,
675            table: &table.table_name,
676        }
677    }
678
679    /// Sets the `table_info`'s table_id.
680    pub fn set_table_id(&mut self, table_id: TableId) {
681        self.table_info.ident.table_id = table_id;
682    }
683
684    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
685    ///
686    /// This function won't do any check or verification. Caller should
687    /// ensure this task is valid.
688    pub fn sort_columns(&mut self) {
689        // sort create table expr
690        // sort column_defs by name
691        self.create_table
692            .column_defs
693            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
694
695        self.table_info.sort_columns();
696    }
697}
698
699impl Serialize for CreateTableTask {
700    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
701    where
702        S: serde::Serializer,
703    {
704        let table_info = serde_json::to_vec(&self.table_info)
705            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
706
707        let pb = PbCreateTableTask {
708            create_table: Some(self.create_table.clone()),
709            partitions: self.partitions.clone(),
710            table_info,
711        };
712        let buf = pb.encode_to_vec();
713        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
714        serializer.serialize_str(&encoded)
715    }
716}
717
718impl<'de> Deserialize<'de> for CreateTableTask {
719    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
720    where
721        D: serde::Deserializer<'de>,
722    {
723        let encoded = String::deserialize(deserializer)?;
724        let buf = general_purpose::STANDARD_NO_PAD
725            .decode(encoded)
726            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
727        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
728            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
729
730        let expr = CreateTableTask::try_from(expr)
731            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
732
733        Ok(expr)
734    }
735}
736
737#[derive(Debug, PartialEq, Clone)]
738pub struct AlterTableTask {
739    // TODO(CookiePieWw): Replace proto struct with user-defined struct
740    pub alter_table: AlterTableExpr,
741}
742
743impl AlterTableTask {
744    pub fn validate(&self) -> Result<()> {
745        self.alter_table
746            .kind
747            .as_ref()
748            .context(error::UnexpectedSnafu {
749                err_msg: "'kind' is absent",
750            })?;
751        Ok(())
752    }
753
754    pub fn table_ref(&self) -> TableReference<'_> {
755        TableReference {
756            catalog: &self.alter_table.catalog_name,
757            schema: &self.alter_table.schema_name,
758            table: &self.alter_table.table_name,
759        }
760    }
761
762    pub fn table_name(&self) -> TableName {
763        let table = &self.alter_table;
764
765        TableName {
766            catalog_name: table.catalog_name.clone(),
767            schema_name: table.schema_name.clone(),
768            table_name: table.table_name.clone(),
769        }
770    }
771}
772
773impl TryFrom<PbAlterTableTask> for AlterTableTask {
774    type Error = error::Error;
775
776    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
777        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
778            err_msg: "expected alter_table",
779        })?;
780
781        Ok(AlterTableTask { alter_table })
782    }
783}
784
785impl TryFrom<AlterTableTask> for PbAlterTableTask {
786    type Error = error::Error;
787
788    fn try_from(task: AlterTableTask) -> Result<Self> {
789        Ok(PbAlterTableTask {
790            alter_table: Some(task.alter_table),
791        })
792    }
793}
794
795impl Serialize for AlterTableTask {
796    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
797    where
798        S: serde::Serializer,
799    {
800        let pb = PbAlterTableTask {
801            alter_table: Some(self.alter_table.clone()),
802        };
803        let buf = pb.encode_to_vec();
804        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
805        serializer.serialize_str(&encoded)
806    }
807}
808
809impl<'de> Deserialize<'de> for AlterTableTask {
810    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
811    where
812        D: serde::Deserializer<'de>,
813    {
814        let encoded = String::deserialize(deserializer)?;
815        let buf = general_purpose::STANDARD_NO_PAD
816            .decode(encoded)
817            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
818        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
819            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
820
821        let expr = AlterTableTask::try_from(expr)
822            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
823
824        Ok(expr)
825    }
826}
827
828#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
829pub struct TruncateTableTask {
830    pub catalog: String,
831    pub schema: String,
832    pub table: String,
833    pub table_id: TableId,
834    pub time_ranges: Vec<(Timestamp, Timestamp)>,
835}
836
837impl TruncateTableTask {
838    pub fn table_ref(&self) -> TableReference<'_> {
839        TableReference {
840            catalog: &self.catalog,
841            schema: &self.schema,
842            table: &self.table,
843        }
844    }
845
846    pub fn table_name(&self) -> TableName {
847        TableName {
848            catalog_name: self.catalog.clone(),
849            schema_name: self.schema.clone(),
850            table_name: self.table.clone(),
851        }
852    }
853}
854
855impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
856    type Error = error::Error;
857
858    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
859        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
860            err_msg: "expected truncate table",
861        })?;
862
863        Ok(Self {
864            catalog: truncate_table.catalog_name,
865            schema: truncate_table.schema_name,
866            table: truncate_table.table_name,
867            table_id: truncate_table
868                .table_id
869                .context(error::InvalidProtoMsgSnafu {
870                    err_msg: "expected table_id",
871                })?
872                .id,
873            time_ranges: truncate_table
874                .time_ranges
875                .map(from_pb_time_ranges)
876                .transpose()
877                .map_err(BoxedError::new)
878                .context(ExternalSnafu)?
879                .unwrap_or_default(),
880        })
881    }
882}
883
884impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
885    type Error = error::Error;
886
887    fn try_from(task: TruncateTableTask) -> Result<Self> {
888        Ok(PbTruncateTableTask {
889            truncate_table: Some(TruncateTableExpr {
890                catalog_name: task.catalog,
891                schema_name: task.schema,
892                table_name: task.table,
893                table_id: Some(api::v1::TableId { id: task.table_id }),
894                time_ranges: Some(
895                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
896                ),
897            }),
898        })
899    }
900}
901
902#[serde_as]
903#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
904pub struct CreateDatabaseTask {
905    pub catalog: String,
906    pub schema: String,
907    pub create_if_not_exists: bool,
908    #[serde_as(deserialize_as = "DefaultOnNull")]
909    pub options: HashMap<String, String>,
910}
911
912impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
913    type Error = error::Error;
914
915    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
916        let CreateDatabaseExpr {
917            catalog_name,
918            schema_name,
919            create_if_not_exists,
920            options,
921        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
922            err_msg: "expected create database",
923        })?;
924
925        Ok(CreateDatabaseTask {
926            catalog: catalog_name,
927            schema: schema_name,
928            create_if_not_exists,
929            options,
930        })
931    }
932}
933
934impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
935    type Error = error::Error;
936
937    fn try_from(
938        CreateDatabaseTask {
939            catalog,
940            schema,
941            create_if_not_exists,
942            options,
943        }: CreateDatabaseTask,
944    ) -> Result<Self> {
945        Ok(PbCreateDatabaseTask {
946            create_database: Some(CreateDatabaseExpr {
947                catalog_name: catalog,
948                schema_name: schema,
949                create_if_not_exists,
950                options,
951            }),
952        })
953    }
954}
955
956#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
957pub struct DropDatabaseTask {
958    pub catalog: String,
959    pub schema: String,
960    pub drop_if_exists: bool,
961}
962
963impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
964    type Error = error::Error;
965
966    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
967        let DropDatabaseExpr {
968            catalog_name,
969            schema_name,
970            drop_if_exists,
971        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
972            err_msg: "expected drop database",
973        })?;
974
975        Ok(DropDatabaseTask {
976            catalog: catalog_name,
977            schema: schema_name,
978            drop_if_exists,
979        })
980    }
981}
982
983impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
984    type Error = error::Error;
985
986    fn try_from(
987        DropDatabaseTask {
988            catalog,
989            schema,
990            drop_if_exists,
991        }: DropDatabaseTask,
992    ) -> Result<Self> {
993        Ok(PbDropDatabaseTask {
994            drop_database: Some(DropDatabaseExpr {
995                catalog_name: catalog,
996                schema_name: schema,
997                drop_if_exists,
998            }),
999        })
1000    }
1001}
1002
1003#[derive(Debug, PartialEq, Clone)]
1004pub struct AlterDatabaseTask {
1005    pub alter_expr: AlterDatabaseExpr,
1006}
1007
1008impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1009    type Error = error::Error;
1010
1011    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1012        Ok(PbAlterDatabaseTask {
1013            task: Some(task.alter_expr),
1014        })
1015    }
1016}
1017
1018impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1019    type Error = error::Error;
1020
1021    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1022        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1023            err_msg: "expected alter database",
1024        })?;
1025
1026        Ok(AlterDatabaseTask { alter_expr })
1027    }
1028}
1029
1030impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1031    type Error = error::Error;
1032
1033    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1034        match pb {
1035            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1036                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1037                    options
1038                        .set_database_options
1039                        .into_iter()
1040                        .map(SetDatabaseOption::try_from)
1041                        .collect::<Result<Vec<_>>>()?,
1042                )))
1043            }
1044            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1045                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1046                    options
1047                        .keys
1048                        .iter()
1049                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1050                        .collect::<Result<Vec<_>>>()?,
1051                )),
1052            ),
1053        }
1054    }
1055}
1056
1057const TTL_KEY: &str = "ttl";
1058
1059impl TryFrom<PbOption> for SetDatabaseOption {
1060    type Error = error::Error;
1061
1062    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1063        let key_lower = key.to_ascii_lowercase();
1064        match key_lower.as_str() {
1065            TTL_KEY => {
1066                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1067                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1068
1069                Ok(SetDatabaseOption::Ttl(ttl))
1070            }
1071            _ => {
1072                if validate_database_option(&key_lower) {
1073                    Ok(SetDatabaseOption::Other(key_lower, value))
1074                } else {
1075                    InvalidSetDatabaseOptionSnafu { key, value }.fail()
1076                }
1077            }
1078        }
1079    }
1080}
1081
1082#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1083pub enum SetDatabaseOption {
1084    Ttl(DatabaseTimeToLive),
1085    Other(String, String),
1086}
1087
1088#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1089pub enum UnsetDatabaseOption {
1090    Ttl,
1091    Other(String),
1092}
1093
1094impl TryFrom<&str> for UnsetDatabaseOption {
1095    type Error = error::Error;
1096
1097    fn try_from(key: &str) -> Result<Self> {
1098        let key_lower = key.to_ascii_lowercase();
1099        match key_lower.as_str() {
1100            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1101            _ => {
1102                if validate_database_option(&key_lower) {
1103                    Ok(UnsetDatabaseOption::Other(key_lower))
1104                } else {
1105                    InvalidUnsetDatabaseOptionSnafu { key }.fail()
1106                }
1107            }
1108        }
1109    }
1110}
1111
1112#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1113pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1114
1115#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1116pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1117
1118#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1119pub enum AlterDatabaseKind {
1120    SetDatabaseOptions(SetDatabaseOptions),
1121    UnsetDatabaseOptions(UnsetDatabaseOptions),
1122}
1123
1124impl AlterDatabaseTask {
1125    pub fn catalog(&self) -> &str {
1126        &self.alter_expr.catalog_name
1127    }
1128
1129    pub fn schema(&self) -> &str {
1130        &self.alter_expr.catalog_name
1131    }
1132}
1133
1134/// Create flow
1135#[derive(Debug, Clone, Serialize, Deserialize)]
1136pub struct CreateFlowTask {
1137    pub catalog_name: String,
1138    pub flow_name: String,
1139    pub source_table_names: Vec<TableName>,
1140    pub sink_table_name: TableName,
1141    pub or_replace: bool,
1142    pub create_if_not_exists: bool,
1143    /// Duration in seconds. Data older than this duration will not be used.
1144    pub expire_after: Option<i64>,
1145    pub eval_interval_secs: Option<i64>,
1146    pub comment: String,
1147    pub sql: String,
1148    pub flow_options: HashMap<String, String>,
1149}
1150
1151impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1152    type Error = error::Error;
1153
1154    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1155        let CreateFlowExpr {
1156            catalog_name,
1157            flow_name,
1158            source_table_names,
1159            sink_table_name,
1160            or_replace,
1161            create_if_not_exists,
1162            expire_after,
1163            eval_interval,
1164            comment,
1165            sql,
1166            flow_options,
1167        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1168            err_msg: "expected create_flow",
1169        })?;
1170
1171        Ok(CreateFlowTask {
1172            catalog_name,
1173            flow_name,
1174            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1175            sink_table_name: sink_table_name
1176                .context(error::InvalidProtoMsgSnafu {
1177                    err_msg: "expected sink_table_name",
1178                })?
1179                .into(),
1180            or_replace,
1181            create_if_not_exists,
1182            expire_after: expire_after.map(|e| e.value),
1183            eval_interval_secs: eval_interval.map(|e| e.seconds),
1184            comment,
1185            sql,
1186            flow_options,
1187        })
1188    }
1189}
1190
1191impl From<CreateFlowTask> for PbCreateFlowTask {
1192    fn from(
1193        CreateFlowTask {
1194            catalog_name,
1195            flow_name,
1196            source_table_names,
1197            sink_table_name,
1198            or_replace,
1199            create_if_not_exists,
1200            expire_after,
1201            eval_interval_secs: eval_interval,
1202            comment,
1203            sql,
1204            flow_options,
1205        }: CreateFlowTask,
1206    ) -> Self {
1207        PbCreateFlowTask {
1208            create_flow: Some(CreateFlowExpr {
1209                catalog_name,
1210                flow_name,
1211                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1212                sink_table_name: Some(sink_table_name.into()),
1213                or_replace,
1214                create_if_not_exists,
1215                expire_after: expire_after.map(|value| ExpireAfter { value }),
1216                eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1217                comment,
1218                sql,
1219                flow_options,
1220            }),
1221        }
1222    }
1223}
1224
1225/// Drop flow
1226#[derive(Debug, Clone, Serialize, Deserialize)]
1227pub struct DropFlowTask {
1228    pub catalog_name: String,
1229    pub flow_name: String,
1230    pub flow_id: FlowId,
1231    pub drop_if_exists: bool,
1232}
1233
1234impl TryFrom<PbDropFlowTask> for DropFlowTask {
1235    type Error = error::Error;
1236
1237    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1238        let DropFlowExpr {
1239            catalog_name,
1240            flow_name,
1241            flow_id,
1242            drop_if_exists,
1243        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1244            err_msg: "expected drop_flow",
1245        })?;
1246        let flow_id = flow_id
1247            .context(error::InvalidProtoMsgSnafu {
1248                err_msg: "expected flow_id",
1249            })?
1250            .id;
1251        Ok(DropFlowTask {
1252            catalog_name,
1253            flow_name,
1254            flow_id,
1255            drop_if_exists,
1256        })
1257    }
1258}
1259
1260impl From<DropFlowTask> for PbDropFlowTask {
1261    fn from(
1262        DropFlowTask {
1263            catalog_name,
1264            flow_name,
1265            flow_id,
1266            drop_if_exists,
1267        }: DropFlowTask,
1268    ) -> Self {
1269        PbDropFlowTask {
1270            drop_flow: Some(DropFlowExpr {
1271                catalog_name,
1272                flow_name,
1273                flow_id: Some(api::v1::FlowId { id: flow_id }),
1274                drop_if_exists,
1275            }),
1276        }
1277    }
1278}
1279
1280#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1281pub struct QueryContext {
1282    pub(crate) current_catalog: String,
1283    pub(crate) current_schema: String,
1284    pub(crate) timezone: String,
1285    pub(crate) extensions: HashMap<String, String>,
1286    pub(crate) channel: u8,
1287}
1288
1289impl QueryContext {
1290    /// Get the current catalog
1291    pub fn current_catalog(&self) -> &str {
1292        &self.current_catalog
1293    }
1294
1295    /// Get the current schema
1296    pub fn current_schema(&self) -> &str {
1297        &self.current_schema
1298    }
1299
1300    /// Get the timezone
1301    pub fn timezone(&self) -> &str {
1302        &self.timezone
1303    }
1304
1305    /// Get the extensions
1306    pub fn extensions(&self) -> &HashMap<String, String> {
1307        &self.extensions
1308    }
1309
1310    /// Get the channel
1311    pub fn channel(&self) -> u8 {
1312        self.channel
1313    }
1314}
1315
1316/// Lightweight query context for flow operations containing only essential fields.
1317/// This is a subset of QueryContext that includes only the fields actually needed
1318/// for flow creation and execution.
1319#[derive(Debug, Clone, Serialize, PartialEq)]
1320pub struct FlowQueryContext {
1321    /// Current catalog name - needed for flow metadata and recovery
1322    pub(crate) catalog: String,
1323    /// Current schema name - needed for table resolution during flow execution
1324    pub(crate) schema: String,
1325    /// Timezone for timestamp operations in the flow
1326    pub(crate) timezone: String,
1327}
1328
1329impl<'de> Deserialize<'de> for FlowQueryContext {
1330    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1331    where
1332        D: serde::Deserializer<'de>,
1333    {
1334        // Support both QueryContext format and FlowQueryContext format
1335        #[derive(Deserialize)]
1336        #[serde(untagged)]
1337        enum ContextCompat {
1338            Flow(FlowQueryContextHelper),
1339            Full(QueryContext),
1340        }
1341
1342        #[derive(Deserialize)]
1343        struct FlowQueryContextHelper {
1344            catalog: String,
1345            schema: String,
1346            timezone: String,
1347        }
1348
1349        match ContextCompat::deserialize(deserializer)? {
1350            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1351                catalog: helper.catalog,
1352                schema: helper.schema,
1353                timezone: helper.timezone,
1354            }),
1355            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1356        }
1357    }
1358}
1359
1360impl From<QueryContextRef> for QueryContext {
1361    fn from(query_context: QueryContextRef) -> Self {
1362        QueryContext {
1363            current_catalog: query_context.current_catalog().to_string(),
1364            current_schema: query_context.current_schema().clone(),
1365            timezone: query_context.timezone().to_string(),
1366            extensions: query_context.extensions(),
1367            channel: query_context.channel() as u8,
1368        }
1369    }
1370}
1371
1372impl TryFrom<QueryContext> for session::context::QueryContext {
1373    type Error = error::Error;
1374    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1375        Ok(QueryContextBuilder::default()
1376            .current_catalog(value.current_catalog)
1377            .current_schema(value.current_schema)
1378            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1379            .extensions(value.extensions)
1380            .channel((value.channel as u32).into())
1381            .build())
1382    }
1383}
1384
1385impl From<QueryContext> for PbQueryContext {
1386    fn from(
1387        QueryContext {
1388            current_catalog,
1389            current_schema,
1390            timezone,
1391            extensions,
1392            channel,
1393        }: QueryContext,
1394    ) -> Self {
1395        PbQueryContext {
1396            current_catalog,
1397            current_schema,
1398            timezone,
1399            extensions,
1400            channel: channel as u32,
1401            snapshot_seqs: None,
1402            explain: None,
1403        }
1404    }
1405}
1406
1407impl From<QueryContext> for FlowQueryContext {
1408    fn from(ctx: QueryContext) -> Self {
1409        Self {
1410            catalog: ctx.current_catalog,
1411            schema: ctx.current_schema,
1412            timezone: ctx.timezone,
1413        }
1414    }
1415}
1416
1417impl From<QueryContextRef> for FlowQueryContext {
1418    fn from(ctx: QueryContextRef) -> Self {
1419        Self {
1420            catalog: ctx.current_catalog().to_string(),
1421            schema: ctx.current_schema().clone(),
1422            timezone: ctx.timezone().to_string(),
1423        }
1424    }
1425}
1426
1427impl From<FlowQueryContext> for QueryContext {
1428    fn from(flow_ctx: FlowQueryContext) -> Self {
1429        Self {
1430            current_catalog: flow_ctx.catalog,
1431            current_schema: flow_ctx.schema,
1432            timezone: flow_ctx.timezone,
1433            extensions: HashMap::new(),
1434            channel: 0, // Use default channel for flows
1435        }
1436    }
1437}
1438
1439impl From<FlowQueryContext> for PbQueryContext {
1440    fn from(flow_ctx: FlowQueryContext) -> Self {
1441        let query_ctx: QueryContext = flow_ctx.into();
1442        query_ctx.into()
1443    }
1444}
1445
1446#[cfg(test)]
1447mod tests {
1448    use std::sync::Arc;
1449
1450    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1451    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1452    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1453    use store_api::storage::ConcreteDataType;
1454    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1455    use table::test_util::table_info::test_table_info;
1456
1457    use super::{AlterTableTask, CreateTableTask, *};
1458
1459    #[test]
1460    fn test_basic_ser_de_create_table_task() {
1461        let schema = SchemaBuilder::default().build().unwrap();
1462        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1463        let task = CreateTableTask::new(
1464            CreateTableExpr::default(),
1465            Vec::new(),
1466            RawTableInfo::from(table_info),
1467        );
1468
1469        let output = serde_json::to_vec(&task).unwrap();
1470
1471        let de = serde_json::from_slice(&output).unwrap();
1472        assert_eq!(task, de);
1473    }
1474
1475    #[test]
1476    fn test_basic_ser_de_alter_table_task() {
1477        let task = AlterTableTask {
1478            alter_table: AlterTableExpr::default(),
1479        };
1480
1481        let output = serde_json::to_vec(&task).unwrap();
1482
1483        let de = serde_json::from_slice(&output).unwrap();
1484        assert_eq!(task, de);
1485    }
1486
1487    #[test]
1488    fn test_sort_columns() {
1489        // construct RawSchema
1490        let raw_schema = RawSchema {
1491            column_schemas: vec![
1492                ColumnSchema::new(
1493                    "column3".to_string(),
1494                    ConcreteDataType::string_datatype(),
1495                    true,
1496                ),
1497                ColumnSchema::new(
1498                    "column1".to_string(),
1499                    ConcreteDataType::timestamp_millisecond_datatype(),
1500                    false,
1501                )
1502                .with_time_index(true),
1503                ColumnSchema::new(
1504                    "column2".to_string(),
1505                    ConcreteDataType::float64_datatype(),
1506                    true,
1507                ),
1508            ],
1509            timestamp_index: Some(1),
1510            version: 0,
1511        };
1512
1513        // construct RawTableMeta
1514        let raw_table_meta = RawTableMeta {
1515            schema: raw_schema,
1516            primary_key_indices: vec![0],
1517            value_indices: vec![2],
1518            engine: METRIC_ENGINE_NAME.to_string(),
1519            next_column_id: 0,
1520            region_numbers: vec![0],
1521            options: Default::default(),
1522            created_on: Default::default(),
1523            updated_on: Default::default(),
1524            partition_key_indices: Default::default(),
1525            column_ids: Default::default(),
1526        };
1527
1528        // construct RawTableInfo
1529        let raw_table_info = RawTableInfo {
1530            ident: Default::default(),
1531            meta: raw_table_meta,
1532            name: Default::default(),
1533            desc: Default::default(),
1534            catalog_name: Default::default(),
1535            schema_name: Default::default(),
1536            table_type: TableType::Base,
1537        };
1538
1539        // construct create table expr
1540        let create_table_expr = CreateTableExpr {
1541            column_defs: vec![
1542                ColumnDef {
1543                    name: "column3".to_string(),
1544                    semantic_type: SemanticType::Tag as i32,
1545                    ..Default::default()
1546                },
1547                ColumnDef {
1548                    name: "column1".to_string(),
1549                    semantic_type: SemanticType::Timestamp as i32,
1550                    ..Default::default()
1551                },
1552                ColumnDef {
1553                    name: "column2".to_string(),
1554                    semantic_type: SemanticType::Field as i32,
1555                    ..Default::default()
1556                },
1557            ],
1558            primary_keys: vec!["column3".to_string()],
1559            ..Default::default()
1560        };
1561
1562        let mut create_table_task =
1563            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1564
1565        // Call the sort_columns method
1566        create_table_task.sort_columns();
1567
1568        // Assert that the columns are sorted correctly
1569        assert_eq!(
1570            create_table_task.create_table.column_defs[0].name,
1571            "column1".to_string()
1572        );
1573        assert_eq!(
1574            create_table_task.create_table.column_defs[1].name,
1575            "column2".to_string()
1576        );
1577        assert_eq!(
1578            create_table_task.create_table.column_defs[2].name,
1579            "column3".to_string()
1580        );
1581
1582        // Assert that the table_info is updated correctly
1583        assert_eq!(
1584            create_table_task.table_info.meta.schema.timestamp_index,
1585            Some(0)
1586        );
1587        assert_eq!(
1588            create_table_task.table_info.meta.primary_key_indices,
1589            vec![2]
1590        );
1591        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1592    }
1593
1594    #[test]
1595    fn test_flow_query_context_conversion_from_query_context() {
1596        use std::collections::HashMap;
1597        let mut extensions = HashMap::new();
1598        extensions.insert("key1".to_string(), "value1".to_string());
1599        extensions.insert("key2".to_string(), "value2".to_string());
1600
1601        let query_ctx = QueryContext {
1602            current_catalog: "test_catalog".to_string(),
1603            current_schema: "test_schema".to_string(),
1604            timezone: "UTC".to_string(),
1605            extensions,
1606            channel: 5,
1607        };
1608
1609        let flow_ctx: FlowQueryContext = query_ctx.into();
1610
1611        assert_eq!(flow_ctx.catalog, "test_catalog");
1612        assert_eq!(flow_ctx.schema, "test_schema");
1613        assert_eq!(flow_ctx.timezone, "UTC");
1614    }
1615
1616    #[test]
1617    fn test_flow_query_context_conversion_to_query_context() {
1618        let flow_ctx = FlowQueryContext {
1619            catalog: "prod_catalog".to_string(),
1620            schema: "public".to_string(),
1621            timezone: "America/New_York".to_string(),
1622        };
1623
1624        let query_ctx: QueryContext = flow_ctx.clone().into();
1625
1626        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1627        assert_eq!(query_ctx.current_schema, "public");
1628        assert_eq!(query_ctx.timezone, "America/New_York");
1629        assert!(query_ctx.extensions.is_empty());
1630        assert_eq!(query_ctx.channel, 0);
1631
1632        // Test roundtrip conversion
1633        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1634        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1635    }
1636
1637    #[test]
1638    fn test_flow_query_context_conversion_from_query_context_ref() {
1639        use common_time::Timezone;
1640        use session::context::QueryContextBuilder;
1641
1642        let session_ctx = QueryContextBuilder::default()
1643            .current_catalog("session_catalog".to_string())
1644            .current_schema("session_schema".to_string())
1645            .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1646            .build();
1647
1648        let session_ctx_ref = Arc::new(session_ctx);
1649        let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1650
1651        assert_eq!(flow_ctx.catalog, "session_catalog");
1652        assert_eq!(flow_ctx.schema, "session_schema");
1653        assert_eq!(flow_ctx.timezone, "Europe/London");
1654    }
1655
1656    #[test]
1657    fn test_flow_query_context_serialization() {
1658        let flow_ctx = FlowQueryContext {
1659            catalog: "test_catalog".to_string(),
1660            schema: "test_schema".to_string(),
1661            timezone: "UTC".to_string(),
1662        };
1663
1664        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1665        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1666
1667        assert_eq!(flow_ctx, deserialized);
1668
1669        // Verify JSON structure
1670        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1671        assert_eq!(json_value["catalog"], "test_catalog");
1672        assert_eq!(json_value["schema"], "test_schema");
1673        assert_eq!(json_value["timezone"], "UTC");
1674    }
1675
1676    #[test]
1677    fn test_flow_query_context_conversion_to_pb() {
1678        let flow_ctx = FlowQueryContext {
1679            catalog: "pb_catalog".to_string(),
1680            schema: "pb_schema".to_string(),
1681            timezone: "Asia/Tokyo".to_string(),
1682        };
1683
1684        let pb_ctx: PbQueryContext = flow_ctx.into();
1685
1686        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1687        assert_eq!(pb_ctx.current_schema, "pb_schema");
1688        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1689        assert!(pb_ctx.extensions.is_empty());
1690        assert_eq!(pb_ctx.channel, 0);
1691        assert!(pb_ctx.snapshot_seqs.is_none());
1692        assert!(pb_ctx.explain.is_none());
1693    }
1694}