common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26    AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
27    CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
28    CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
29    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
30    DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
31    DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
32    DropViewTask as PbDropViewTask, Partition, ProcedureId,
33    TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36    AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
37    CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval,
38    ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
39};
40use base64::Engine as _;
41use base64::engine::general_purpose;
42use common_error::ext::BoxedError;
43use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use serde_with::{DefaultOnNull, serde_as};
47use session::context::{QueryContextBuilder, QueryContextRef};
48use snafu::{OptionExt, ResultExt};
49use table::metadata::{RawTableInfo, TableId};
50use table::table_name::TableName;
51use table::table_reference::TableReference;
52
53use crate::error::{
54    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
55    InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
56};
57use crate::key::FlowId;
58
59/// DDL tasks
60#[derive(Debug, Clone)]
61pub enum DdlTask {
62    CreateTable(CreateTableTask),
63    DropTable(DropTableTask),
64    AlterTable(AlterTableTask),
65    TruncateTable(TruncateTableTask),
66    CreateLogicalTables(Vec<CreateTableTask>),
67    DropLogicalTables(Vec<DropTableTask>),
68    AlterLogicalTables(Vec<AlterTableTask>),
69    CreateDatabase(CreateDatabaseTask),
70    DropDatabase(DropDatabaseTask),
71    AlterDatabase(AlterDatabaseTask),
72    CreateFlow(CreateFlowTask),
73    DropFlow(DropFlowTask),
74    #[cfg(feature = "enterprise")]
75    DropTrigger(trigger::DropTriggerTask),
76    CreateView(CreateViewTask),
77    DropView(DropViewTask),
78    #[cfg(feature = "enterprise")]
79    CreateTrigger(trigger::CreateTriggerTask),
80}
81
82impl DdlTask {
83    /// Creates a [`DdlTask`] to create a flow.
84    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
85        DdlTask::CreateFlow(expr)
86    }
87
88    /// Creates a [`DdlTask`] to drop a flow.
89    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
90        DdlTask::DropFlow(expr)
91    }
92
93    /// Creates a [`DdlTask`] to drop a view.
94    pub fn new_drop_view(expr: DropViewTask) -> Self {
95        DdlTask::DropView(expr)
96    }
97
98    /// Creates a [`DdlTask`] to create a table.
99    pub fn new_create_table(
100        expr: CreateTableExpr,
101        partitions: Vec<Partition>,
102        table_info: RawTableInfo,
103    ) -> Self {
104        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
105    }
106
107    /// Creates a [`DdlTask`] to create several logical tables.
108    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
109        DdlTask::CreateLogicalTables(
110            table_data
111                .into_iter()
112                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
113                .collect(),
114        )
115    }
116
117    /// Creates a [`DdlTask`] to alter several logical tables.
118    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
119        DdlTask::AlterLogicalTables(
120            table_data
121                .into_iter()
122                .map(|alter_table| AlterTableTask { alter_table })
123                .collect(),
124        )
125    }
126
127    /// Creates a [`DdlTask`] to drop a table.
128    pub fn new_drop_table(
129        catalog: String,
130        schema: String,
131        table: String,
132        table_id: TableId,
133        drop_if_exists: bool,
134    ) -> Self {
135        DdlTask::DropTable(DropTableTask {
136            catalog,
137            schema,
138            table,
139            table_id,
140            drop_if_exists,
141        })
142    }
143
144    /// Creates a [`DdlTask`] to create a database.
145    pub fn new_create_database(
146        catalog: String,
147        schema: String,
148        create_if_not_exists: bool,
149        options: HashMap<String, String>,
150    ) -> Self {
151        DdlTask::CreateDatabase(CreateDatabaseTask {
152            catalog,
153            schema,
154            create_if_not_exists,
155            options,
156        })
157    }
158
159    /// Creates a [`DdlTask`] to drop a database.
160    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
161        DdlTask::DropDatabase(DropDatabaseTask {
162            catalog,
163            schema,
164            drop_if_exists,
165        })
166    }
167
168    /// Creates a [`DdlTask`] to alter a database.
169    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
170        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
171    }
172
173    /// Creates a [`DdlTask`] to alter a table.
174    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
175        DdlTask::AlterTable(AlterTableTask { alter_table })
176    }
177
178    /// Creates a [`DdlTask`] to truncate a table.
179    pub fn new_truncate_table(
180        catalog: String,
181        schema: String,
182        table: String,
183        table_id: TableId,
184        time_ranges: Vec<(Timestamp, Timestamp)>,
185    ) -> Self {
186        DdlTask::TruncateTable(TruncateTableTask {
187            catalog,
188            schema,
189            table,
190            table_id,
191            time_ranges,
192        })
193    }
194
195    /// Creates a [`DdlTask`] to create a view.
196    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
197        DdlTask::CreateView(CreateViewTask {
198            create_view,
199            view_info,
200        })
201    }
202}
203
204impl TryFrom<Task> for DdlTask {
205    type Error = error::Error;
206    fn try_from(task: Task) -> Result<Self> {
207        match task {
208            Task::CreateTableTask(create_table) => {
209                Ok(DdlTask::CreateTable(create_table.try_into()?))
210            }
211            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
212            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
213            Task::TruncateTableTask(truncate_table) => {
214                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
215            }
216            Task::CreateTableTasks(create_tables) => {
217                let tasks = create_tables
218                    .tasks
219                    .into_iter()
220                    .map(|task| task.try_into())
221                    .collect::<Result<Vec<_>>>()?;
222
223                Ok(DdlTask::CreateLogicalTables(tasks))
224            }
225            Task::DropTableTasks(drop_tables) => {
226                let tasks = drop_tables
227                    .tasks
228                    .into_iter()
229                    .map(|task| task.try_into())
230                    .collect::<Result<Vec<_>>>()?;
231
232                Ok(DdlTask::DropLogicalTables(tasks))
233            }
234            Task::AlterTableTasks(alter_tables) => {
235                let tasks = alter_tables
236                    .tasks
237                    .into_iter()
238                    .map(|task| task.try_into())
239                    .collect::<Result<Vec<_>>>()?;
240
241                Ok(DdlTask::AlterLogicalTables(tasks))
242            }
243            Task::CreateDatabaseTask(create_database) => {
244                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
245            }
246            Task::DropDatabaseTask(drop_database) => {
247                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
248            }
249            Task::AlterDatabaseTask(alter_database) => {
250                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
251            }
252            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
253            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
254            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
255            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
256            Task::CreateTriggerTask(create_trigger) => {
257                #[cfg(feature = "enterprise")]
258                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
259                #[cfg(not(feature = "enterprise"))]
260                {
261                    let _ = create_trigger;
262                    crate::error::UnsupportedSnafu {
263                        operation: "create trigger",
264                    }
265                    .fail()
266                }
267            }
268            Task::DropTriggerTask(drop_trigger) => {
269                #[cfg(feature = "enterprise")]
270                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
271                #[cfg(not(feature = "enterprise"))]
272                {
273                    let _ = drop_trigger;
274                    crate::error::UnsupportedSnafu {
275                        operation: "drop trigger",
276                    }
277                    .fail()
278                }
279            }
280        }
281    }
282}
283
284#[derive(Clone)]
285pub struct SubmitDdlTaskRequest {
286    pub query_context: QueryContextRef,
287    pub task: DdlTask,
288}
289
290impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
291    type Error = error::Error;
292
293    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
294        let task = match request.task {
295            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
296            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
297            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
298            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
299            DdlTask::CreateLogicalTables(tasks) => {
300                let tasks = tasks
301                    .into_iter()
302                    .map(|task| task.try_into())
303                    .collect::<Result<Vec<_>>>()?;
304
305                Task::CreateTableTasks(PbCreateTableTasks { tasks })
306            }
307            DdlTask::DropLogicalTables(tasks) => {
308                let tasks = tasks
309                    .into_iter()
310                    .map(|task| task.into())
311                    .collect::<Vec<_>>();
312
313                Task::DropTableTasks(PbDropTableTasks { tasks })
314            }
315            DdlTask::AlterLogicalTables(tasks) => {
316                let tasks = tasks
317                    .into_iter()
318                    .map(|task| task.try_into())
319                    .collect::<Result<Vec<_>>>()?;
320
321                Task::AlterTableTasks(PbAlterTableTasks { tasks })
322            }
323            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
324            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
325            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
326            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
327            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
328            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
329            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
330            #[cfg(feature = "enterprise")]
331            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
332            #[cfg(feature = "enterprise")]
333            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
334        };
335
336        Ok(Self {
337            header: None,
338            query_context: Some((*request.query_context).clone().into()),
339            task: Some(task),
340        })
341    }
342}
343
344#[derive(Debug, Default)]
345pub struct SubmitDdlTaskResponse {
346    pub key: Vec<u8>,
347    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
348    pub table_ids: Vec<TableId>,
349}
350
351impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
352    type Error = error::Error;
353
354    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
355        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
356        Ok(Self {
357            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
358            table_ids,
359        })
360    }
361}
362
363impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
364    fn from(val: SubmitDdlTaskResponse) -> Self {
365        Self {
366            pid: Some(ProcedureId { key: val.key }),
367            table_ids: val
368                .table_ids
369                .into_iter()
370                .map(|id| api::v1::TableId { id })
371                .collect(),
372            ..Default::default()
373        }
374    }
375}
376
377/// A `CREATE VIEW` task.
378#[derive(Debug, PartialEq, Clone)]
379pub struct CreateViewTask {
380    pub create_view: CreateViewExpr,
381    pub view_info: RawTableInfo,
382}
383
384impl CreateViewTask {
385    /// Returns the [`TableReference`] of view.
386    pub fn table_ref(&self) -> TableReference {
387        TableReference {
388            catalog: &self.create_view.catalog_name,
389            schema: &self.create_view.schema_name,
390            table: &self.create_view.view_name,
391        }
392    }
393
394    /// Returns the encoded logical plan
395    pub fn raw_logical_plan(&self) -> &Vec<u8> {
396        &self.create_view.logical_plan
397    }
398
399    /// Returns the view definition in SQL
400    pub fn view_definition(&self) -> &str {
401        &self.create_view.definition
402    }
403
404    /// Returns the resolved table names in view's logical plan
405    pub fn table_names(&self) -> HashSet<TableName> {
406        self.create_view
407            .table_names
408            .iter()
409            .map(|t| t.clone().into())
410            .collect()
411    }
412
413    /// Returns the view's columns
414    pub fn columns(&self) -> &Vec<String> {
415        &self.create_view.columns
416    }
417
418    /// Returns the original logical plan's columns
419    pub fn plan_columns(&self) -> &Vec<String> {
420        &self.create_view.plan_columns
421    }
422}
423
424impl TryFrom<PbCreateViewTask> for CreateViewTask {
425    type Error = error::Error;
426
427    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
428        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
429
430        Ok(CreateViewTask {
431            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
432                err_msg: "expected create view",
433            })?,
434            view_info,
435        })
436    }
437}
438
439impl TryFrom<CreateViewTask> for PbCreateViewTask {
440    type Error = error::Error;
441
442    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
443        Ok(PbCreateViewTask {
444            create_view: Some(task.create_view),
445            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
446        })
447    }
448}
449
450impl Serialize for CreateViewTask {
451    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
452    where
453        S: serde::Serializer,
454    {
455        let view_info = serde_json::to_vec(&self.view_info)
456            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
457
458        let pb = PbCreateViewTask {
459            create_view: Some(self.create_view.clone()),
460            view_info,
461        };
462        let buf = pb.encode_to_vec();
463        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
464        serializer.serialize_str(&encoded)
465    }
466}
467
468impl<'de> Deserialize<'de> for CreateViewTask {
469    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
470    where
471        D: serde::Deserializer<'de>,
472    {
473        let encoded = String::deserialize(deserializer)?;
474        let buf = general_purpose::STANDARD_NO_PAD
475            .decode(encoded)
476            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
477        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
478            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
479
480        let expr = CreateViewTask::try_from(expr)
481            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
482
483        Ok(expr)
484    }
485}
486
487/// A `DROP VIEW` task.
488#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
489pub struct DropViewTask {
490    pub catalog: String,
491    pub schema: String,
492    pub view: String,
493    pub view_id: TableId,
494    pub drop_if_exists: bool,
495}
496
497impl DropViewTask {
498    /// Returns the [`TableReference`] of view.
499    pub fn table_ref(&self) -> TableReference {
500        TableReference {
501            catalog: &self.catalog,
502            schema: &self.schema,
503            table: &self.view,
504        }
505    }
506}
507
508impl TryFrom<PbDropViewTask> for DropViewTask {
509    type Error = error::Error;
510
511    fn try_from(pb: PbDropViewTask) -> Result<Self> {
512        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
513            err_msg: "expected drop view",
514        })?;
515
516        Ok(DropViewTask {
517            catalog: expr.catalog_name,
518            schema: expr.schema_name,
519            view: expr.view_name,
520            view_id: expr
521                .view_id
522                .context(error::InvalidProtoMsgSnafu {
523                    err_msg: "expected view_id",
524                })?
525                .id,
526            drop_if_exists: expr.drop_if_exists,
527        })
528    }
529}
530
531impl From<DropViewTask> for PbDropViewTask {
532    fn from(task: DropViewTask) -> Self {
533        PbDropViewTask {
534            drop_view: Some(DropViewExpr {
535                catalog_name: task.catalog,
536                schema_name: task.schema,
537                view_name: task.view,
538                view_id: Some(api::v1::TableId { id: task.view_id }),
539                drop_if_exists: task.drop_if_exists,
540            }),
541        }
542    }
543}
544
545#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
546pub struct DropTableTask {
547    pub catalog: String,
548    pub schema: String,
549    pub table: String,
550    pub table_id: TableId,
551    #[serde(default)]
552    pub drop_if_exists: bool,
553}
554
555impl DropTableTask {
556    pub fn table_ref(&self) -> TableReference {
557        TableReference {
558            catalog: &self.catalog,
559            schema: &self.schema,
560            table: &self.table,
561        }
562    }
563
564    pub fn table_name(&self) -> TableName {
565        TableName {
566            catalog_name: self.catalog.to_string(),
567            schema_name: self.schema.to_string(),
568            table_name: self.table.to_string(),
569        }
570    }
571}
572
573impl TryFrom<PbDropTableTask> for DropTableTask {
574    type Error = error::Error;
575
576    fn try_from(pb: PbDropTableTask) -> Result<Self> {
577        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
578            err_msg: "expected drop table",
579        })?;
580
581        Ok(Self {
582            catalog: drop_table.catalog_name,
583            schema: drop_table.schema_name,
584            table: drop_table.table_name,
585            table_id: drop_table
586                .table_id
587                .context(error::InvalidProtoMsgSnafu {
588                    err_msg: "expected table_id",
589                })?
590                .id,
591            drop_if_exists: drop_table.drop_if_exists,
592        })
593    }
594}
595
596impl From<DropTableTask> for PbDropTableTask {
597    fn from(task: DropTableTask) -> Self {
598        PbDropTableTask {
599            drop_table: Some(DropTableExpr {
600                catalog_name: task.catalog,
601                schema_name: task.schema,
602                table_name: task.table,
603                table_id: Some(api::v1::TableId { id: task.table_id }),
604                drop_if_exists: task.drop_if_exists,
605            }),
606        }
607    }
608}
609
610#[derive(Debug, PartialEq, Clone)]
611pub struct CreateTableTask {
612    pub create_table: CreateTableExpr,
613    pub partitions: Vec<Partition>,
614    pub table_info: RawTableInfo,
615}
616
617impl TryFrom<PbCreateTableTask> for CreateTableTask {
618    type Error = error::Error;
619
620    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
621        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
622
623        Ok(CreateTableTask::new(
624            pb.create_table.context(error::InvalidProtoMsgSnafu {
625                err_msg: "expected create table",
626            })?,
627            pb.partitions,
628            table_info,
629        ))
630    }
631}
632
633impl TryFrom<CreateTableTask> for PbCreateTableTask {
634    type Error = error::Error;
635
636    fn try_from(task: CreateTableTask) -> Result<Self> {
637        Ok(PbCreateTableTask {
638            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
639            create_table: Some(task.create_table),
640            partitions: task.partitions,
641        })
642    }
643}
644
645impl CreateTableTask {
646    pub fn new(
647        expr: CreateTableExpr,
648        partitions: Vec<Partition>,
649        table_info: RawTableInfo,
650    ) -> CreateTableTask {
651        CreateTableTask {
652            create_table: expr,
653            partitions,
654            table_info,
655        }
656    }
657
658    pub fn table_name(&self) -> TableName {
659        let table = &self.create_table;
660
661        TableName {
662            catalog_name: table.catalog_name.to_string(),
663            schema_name: table.schema_name.to_string(),
664            table_name: table.table_name.to_string(),
665        }
666    }
667
668    pub fn table_ref(&self) -> TableReference {
669        let table = &self.create_table;
670
671        TableReference {
672            catalog: &table.catalog_name,
673            schema: &table.schema_name,
674            table: &table.table_name,
675        }
676    }
677
678    /// Sets the `table_info`'s table_id.
679    pub fn set_table_id(&mut self, table_id: TableId) {
680        self.table_info.ident.table_id = table_id;
681    }
682
683    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
684    ///
685    /// This function won't do any check or verification. Caller should
686    /// ensure this task is valid.
687    pub fn sort_columns(&mut self) {
688        // sort create table expr
689        // sort column_defs by name
690        self.create_table
691            .column_defs
692            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
693
694        self.table_info.sort_columns();
695    }
696}
697
698impl Serialize for CreateTableTask {
699    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
700    where
701        S: serde::Serializer,
702    {
703        let table_info = serde_json::to_vec(&self.table_info)
704            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
705
706        let pb = PbCreateTableTask {
707            create_table: Some(self.create_table.clone()),
708            partitions: self.partitions.clone(),
709            table_info,
710        };
711        let buf = pb.encode_to_vec();
712        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
713        serializer.serialize_str(&encoded)
714    }
715}
716
717impl<'de> Deserialize<'de> for CreateTableTask {
718    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
719    where
720        D: serde::Deserializer<'de>,
721    {
722        let encoded = String::deserialize(deserializer)?;
723        let buf = general_purpose::STANDARD_NO_PAD
724            .decode(encoded)
725            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
726        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
727            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
728
729        let expr = CreateTableTask::try_from(expr)
730            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
731
732        Ok(expr)
733    }
734}
735
736#[derive(Debug, PartialEq, Clone)]
737pub struct AlterTableTask {
738    // TODO(CookiePieWw): Replace proto struct with user-defined struct
739    pub alter_table: AlterTableExpr,
740}
741
742impl AlterTableTask {
743    pub fn validate(&self) -> Result<()> {
744        self.alter_table
745            .kind
746            .as_ref()
747            .context(error::UnexpectedSnafu {
748                err_msg: "'kind' is absent",
749            })?;
750        Ok(())
751    }
752
753    pub fn table_ref(&self) -> TableReference {
754        TableReference {
755            catalog: &self.alter_table.catalog_name,
756            schema: &self.alter_table.schema_name,
757            table: &self.alter_table.table_name,
758        }
759    }
760
761    pub fn table_name(&self) -> TableName {
762        let table = &self.alter_table;
763
764        TableName {
765            catalog_name: table.catalog_name.to_string(),
766            schema_name: table.schema_name.to_string(),
767            table_name: table.table_name.to_string(),
768        }
769    }
770}
771
772impl TryFrom<PbAlterTableTask> for AlterTableTask {
773    type Error = error::Error;
774
775    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
776        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
777            err_msg: "expected alter_table",
778        })?;
779
780        Ok(AlterTableTask { alter_table })
781    }
782}
783
784impl TryFrom<AlterTableTask> for PbAlterTableTask {
785    type Error = error::Error;
786
787    fn try_from(task: AlterTableTask) -> Result<Self> {
788        Ok(PbAlterTableTask {
789            alter_table: Some(task.alter_table),
790        })
791    }
792}
793
794impl Serialize for AlterTableTask {
795    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
796    where
797        S: serde::Serializer,
798    {
799        let pb = PbAlterTableTask {
800            alter_table: Some(self.alter_table.clone()),
801        };
802        let buf = pb.encode_to_vec();
803        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
804        serializer.serialize_str(&encoded)
805    }
806}
807
808impl<'de> Deserialize<'de> for AlterTableTask {
809    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
810    where
811        D: serde::Deserializer<'de>,
812    {
813        let encoded = String::deserialize(deserializer)?;
814        let buf = general_purpose::STANDARD_NO_PAD
815            .decode(encoded)
816            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
817        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
818            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
819
820        let expr = AlterTableTask::try_from(expr)
821            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
822
823        Ok(expr)
824    }
825}
826
827#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
828pub struct TruncateTableTask {
829    pub catalog: String,
830    pub schema: String,
831    pub table: String,
832    pub table_id: TableId,
833    pub time_ranges: Vec<(Timestamp, Timestamp)>,
834}
835
836impl TruncateTableTask {
837    pub fn table_ref(&self) -> TableReference {
838        TableReference {
839            catalog: &self.catalog,
840            schema: &self.schema,
841            table: &self.table,
842        }
843    }
844
845    pub fn table_name(&self) -> TableName {
846        TableName {
847            catalog_name: self.catalog.to_string(),
848            schema_name: self.schema.to_string(),
849            table_name: self.table.to_string(),
850        }
851    }
852}
853
854impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
855    type Error = error::Error;
856
857    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
858        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
859            err_msg: "expected truncate table",
860        })?;
861
862        Ok(Self {
863            catalog: truncate_table.catalog_name,
864            schema: truncate_table.schema_name,
865            table: truncate_table.table_name,
866            table_id: truncate_table
867                .table_id
868                .context(error::InvalidProtoMsgSnafu {
869                    err_msg: "expected table_id",
870                })?
871                .id,
872            time_ranges: truncate_table
873                .time_ranges
874                .map(from_pb_time_ranges)
875                .transpose()
876                .map_err(BoxedError::new)
877                .context(ExternalSnafu)?
878                .unwrap_or_default(),
879        })
880    }
881}
882
883impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
884    type Error = error::Error;
885
886    fn try_from(task: TruncateTableTask) -> Result<Self> {
887        Ok(PbTruncateTableTask {
888            truncate_table: Some(TruncateTableExpr {
889                catalog_name: task.catalog,
890                schema_name: task.schema,
891                table_name: task.table,
892                table_id: Some(api::v1::TableId { id: task.table_id }),
893                time_ranges: Some(
894                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
895                ),
896            }),
897        })
898    }
899}
900
901#[serde_as]
902#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
903pub struct CreateDatabaseTask {
904    pub catalog: String,
905    pub schema: String,
906    pub create_if_not_exists: bool,
907    #[serde_as(deserialize_as = "DefaultOnNull")]
908    pub options: HashMap<String, String>,
909}
910
911impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
912    type Error = error::Error;
913
914    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
915        let CreateDatabaseExpr {
916            catalog_name,
917            schema_name,
918            create_if_not_exists,
919            options,
920        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
921            err_msg: "expected create database",
922        })?;
923
924        Ok(CreateDatabaseTask {
925            catalog: catalog_name,
926            schema: schema_name,
927            create_if_not_exists,
928            options,
929        })
930    }
931}
932
933impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
934    type Error = error::Error;
935
936    fn try_from(
937        CreateDatabaseTask {
938            catalog,
939            schema,
940            create_if_not_exists,
941            options,
942        }: CreateDatabaseTask,
943    ) -> Result<Self> {
944        Ok(PbCreateDatabaseTask {
945            create_database: Some(CreateDatabaseExpr {
946                catalog_name: catalog,
947                schema_name: schema,
948                create_if_not_exists,
949                options,
950            }),
951        })
952    }
953}
954
955#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
956pub struct DropDatabaseTask {
957    pub catalog: String,
958    pub schema: String,
959    pub drop_if_exists: bool,
960}
961
962impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
963    type Error = error::Error;
964
965    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
966        let DropDatabaseExpr {
967            catalog_name,
968            schema_name,
969            drop_if_exists,
970        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
971            err_msg: "expected drop database",
972        })?;
973
974        Ok(DropDatabaseTask {
975            catalog: catalog_name,
976            schema: schema_name,
977            drop_if_exists,
978        })
979    }
980}
981
982impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
983    type Error = error::Error;
984
985    fn try_from(
986        DropDatabaseTask {
987            catalog,
988            schema,
989            drop_if_exists,
990        }: DropDatabaseTask,
991    ) -> Result<Self> {
992        Ok(PbDropDatabaseTask {
993            drop_database: Some(DropDatabaseExpr {
994                catalog_name: catalog,
995                schema_name: schema,
996                drop_if_exists,
997            }),
998        })
999    }
1000}
1001
1002#[derive(Debug, PartialEq, Clone)]
1003pub struct AlterDatabaseTask {
1004    pub alter_expr: AlterDatabaseExpr,
1005}
1006
1007impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1008    type Error = error::Error;
1009
1010    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1011        Ok(PbAlterDatabaseTask {
1012            task: Some(task.alter_expr),
1013        })
1014    }
1015}
1016
1017impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1018    type Error = error::Error;
1019
1020    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1021        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1022            err_msg: "expected alter database",
1023        })?;
1024
1025        Ok(AlterDatabaseTask { alter_expr })
1026    }
1027}
1028
1029impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1030    type Error = error::Error;
1031
1032    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1033        match pb {
1034            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1035                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1036                    options
1037                        .set_database_options
1038                        .into_iter()
1039                        .map(SetDatabaseOption::try_from)
1040                        .collect::<Result<Vec<_>>>()?,
1041                )))
1042            }
1043            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1044                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1045                    options
1046                        .keys
1047                        .iter()
1048                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1049                        .collect::<Result<Vec<_>>>()?,
1050                )),
1051            ),
1052        }
1053    }
1054}
1055
1056const TTL_KEY: &str = "ttl";
1057
1058impl TryFrom<PbOption> for SetDatabaseOption {
1059    type Error = error::Error;
1060
1061    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1062        match key.to_ascii_lowercase().as_str() {
1063            TTL_KEY => {
1064                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1065                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1066
1067                Ok(SetDatabaseOption::Ttl(ttl))
1068            }
1069            _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1070        }
1071    }
1072}
1073
1074#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1075pub enum SetDatabaseOption {
1076    Ttl(DatabaseTimeToLive),
1077}
1078
1079#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1080pub enum UnsetDatabaseOption {
1081    Ttl,
1082}
1083
1084impl TryFrom<&str> for UnsetDatabaseOption {
1085    type Error = error::Error;
1086
1087    fn try_from(key: &str) -> Result<Self> {
1088        match key.to_ascii_lowercase().as_str() {
1089            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1090            _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1091        }
1092    }
1093}
1094
1095#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1096pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1097
1098#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1099pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1100
1101#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1102pub enum AlterDatabaseKind {
1103    SetDatabaseOptions(SetDatabaseOptions),
1104    UnsetDatabaseOptions(UnsetDatabaseOptions),
1105}
1106
1107impl AlterDatabaseTask {
1108    pub fn catalog(&self) -> &str {
1109        &self.alter_expr.catalog_name
1110    }
1111
1112    pub fn schema(&self) -> &str {
1113        &self.alter_expr.catalog_name
1114    }
1115}
1116
1117/// Create flow
1118#[derive(Debug, Clone, Serialize, Deserialize)]
1119pub struct CreateFlowTask {
1120    pub catalog_name: String,
1121    pub flow_name: String,
1122    pub source_table_names: Vec<TableName>,
1123    pub sink_table_name: TableName,
1124    pub or_replace: bool,
1125    pub create_if_not_exists: bool,
1126    /// Duration in seconds. Data older than this duration will not be used.
1127    pub expire_after: Option<i64>,
1128    pub eval_interval_secs: Option<i64>,
1129    pub comment: String,
1130    pub sql: String,
1131    pub flow_options: HashMap<String, String>,
1132}
1133
1134impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1135    type Error = error::Error;
1136
1137    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1138        let CreateFlowExpr {
1139            catalog_name,
1140            flow_name,
1141            source_table_names,
1142            sink_table_name,
1143            or_replace,
1144            create_if_not_exists,
1145            expire_after,
1146            eval_interval,
1147            comment,
1148            sql,
1149            flow_options,
1150        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1151            err_msg: "expected create_flow",
1152        })?;
1153
1154        Ok(CreateFlowTask {
1155            catalog_name,
1156            flow_name,
1157            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1158            sink_table_name: sink_table_name
1159                .context(error::InvalidProtoMsgSnafu {
1160                    err_msg: "expected sink_table_name",
1161                })?
1162                .into(),
1163            or_replace,
1164            create_if_not_exists,
1165            expire_after: expire_after.map(|e| e.value),
1166            eval_interval_secs: eval_interval.map(|e| e.seconds),
1167            comment,
1168            sql,
1169            flow_options,
1170        })
1171    }
1172}
1173
1174impl From<CreateFlowTask> for PbCreateFlowTask {
1175    fn from(
1176        CreateFlowTask {
1177            catalog_name,
1178            flow_name,
1179            source_table_names,
1180            sink_table_name,
1181            or_replace,
1182            create_if_not_exists,
1183            expire_after,
1184            eval_interval_secs: eval_interval,
1185            comment,
1186            sql,
1187            flow_options,
1188        }: CreateFlowTask,
1189    ) -> Self {
1190        PbCreateFlowTask {
1191            create_flow: Some(CreateFlowExpr {
1192                catalog_name,
1193                flow_name,
1194                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1195                sink_table_name: Some(sink_table_name.into()),
1196                or_replace,
1197                create_if_not_exists,
1198                expire_after: expire_after.map(|value| ExpireAfter { value }),
1199                eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1200                comment,
1201                sql,
1202                flow_options,
1203            }),
1204        }
1205    }
1206}
1207
1208/// Drop flow
1209#[derive(Debug, Clone, Serialize, Deserialize)]
1210pub struct DropFlowTask {
1211    pub catalog_name: String,
1212    pub flow_name: String,
1213    pub flow_id: FlowId,
1214    pub drop_if_exists: bool,
1215}
1216
1217impl TryFrom<PbDropFlowTask> for DropFlowTask {
1218    type Error = error::Error;
1219
1220    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1221        let DropFlowExpr {
1222            catalog_name,
1223            flow_name,
1224            flow_id,
1225            drop_if_exists,
1226        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1227            err_msg: "expected drop_flow",
1228        })?;
1229        let flow_id = flow_id
1230            .context(error::InvalidProtoMsgSnafu {
1231                err_msg: "expected flow_id",
1232            })?
1233            .id;
1234        Ok(DropFlowTask {
1235            catalog_name,
1236            flow_name,
1237            flow_id,
1238            drop_if_exists,
1239        })
1240    }
1241}
1242
1243impl From<DropFlowTask> for PbDropFlowTask {
1244    fn from(
1245        DropFlowTask {
1246            catalog_name,
1247            flow_name,
1248            flow_id,
1249            drop_if_exists,
1250        }: DropFlowTask,
1251    ) -> Self {
1252        PbDropFlowTask {
1253            drop_flow: Some(DropFlowExpr {
1254                catalog_name,
1255                flow_name,
1256                flow_id: Some(api::v1::FlowId { id: flow_id }),
1257                drop_if_exists,
1258            }),
1259        }
1260    }
1261}
1262
1263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1264pub struct QueryContext {
1265    pub(crate) current_catalog: String,
1266    pub(crate) current_schema: String,
1267    pub(crate) timezone: String,
1268    pub(crate) extensions: HashMap<String, String>,
1269    pub(crate) channel: u8,
1270}
1271
1272impl QueryContext {
1273    /// Get the current catalog
1274    pub fn current_catalog(&self) -> &str {
1275        &self.current_catalog
1276    }
1277
1278    /// Get the current schema
1279    pub fn current_schema(&self) -> &str {
1280        &self.current_schema
1281    }
1282
1283    /// Get the timezone
1284    pub fn timezone(&self) -> &str {
1285        &self.timezone
1286    }
1287
1288    /// Get the extensions
1289    pub fn extensions(&self) -> &HashMap<String, String> {
1290        &self.extensions
1291    }
1292
1293    /// Get the channel
1294    pub fn channel(&self) -> u8 {
1295        self.channel
1296    }
1297}
1298
1299/// Lightweight query context for flow operations containing only essential fields.
1300/// This is a subset of QueryContext that includes only the fields actually needed
1301/// for flow creation and execution.
1302#[derive(Debug, Clone, Serialize, PartialEq)]
1303pub struct FlowQueryContext {
1304    /// Current catalog name - needed for flow metadata and recovery
1305    pub(crate) catalog: String,
1306    /// Current schema name - needed for table resolution during flow execution
1307    pub(crate) schema: String,
1308    /// Timezone for timestamp operations in the flow
1309    pub(crate) timezone: String,
1310}
1311
1312impl<'de> Deserialize<'de> for FlowQueryContext {
1313    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1314    where
1315        D: serde::Deserializer<'de>,
1316    {
1317        // Support both QueryContext format and FlowQueryContext format
1318        #[derive(Deserialize)]
1319        #[serde(untagged)]
1320        enum ContextCompat {
1321            Flow(FlowQueryContextHelper),
1322            Full(QueryContext),
1323        }
1324
1325        #[derive(Deserialize)]
1326        struct FlowQueryContextHelper {
1327            catalog: String,
1328            schema: String,
1329            timezone: String,
1330        }
1331
1332        match ContextCompat::deserialize(deserializer)? {
1333            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1334                catalog: helper.catalog,
1335                schema: helper.schema,
1336                timezone: helper.timezone,
1337            }),
1338            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1339        }
1340    }
1341}
1342
1343impl From<QueryContextRef> for QueryContext {
1344    fn from(query_context: QueryContextRef) -> Self {
1345        QueryContext {
1346            current_catalog: query_context.current_catalog().to_string(),
1347            current_schema: query_context.current_schema().to_string(),
1348            timezone: query_context.timezone().to_string(),
1349            extensions: query_context.extensions(),
1350            channel: query_context.channel() as u8,
1351        }
1352    }
1353}
1354
1355impl TryFrom<QueryContext> for session::context::QueryContext {
1356    type Error = error::Error;
1357    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1358        Ok(QueryContextBuilder::default()
1359            .current_catalog(value.current_catalog)
1360            .current_schema(value.current_schema)
1361            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1362            .extensions(value.extensions)
1363            .channel((value.channel as u32).into())
1364            .build())
1365    }
1366}
1367
1368impl From<QueryContext> for PbQueryContext {
1369    fn from(
1370        QueryContext {
1371            current_catalog,
1372            current_schema,
1373            timezone,
1374            extensions,
1375            channel,
1376        }: QueryContext,
1377    ) -> Self {
1378        PbQueryContext {
1379            current_catalog,
1380            current_schema,
1381            timezone,
1382            extensions,
1383            channel: channel as u32,
1384            snapshot_seqs: None,
1385            explain: None,
1386        }
1387    }
1388}
1389
1390impl From<QueryContext> for FlowQueryContext {
1391    fn from(ctx: QueryContext) -> Self {
1392        Self {
1393            catalog: ctx.current_catalog,
1394            schema: ctx.current_schema,
1395            timezone: ctx.timezone,
1396        }
1397    }
1398}
1399
1400impl From<QueryContextRef> for FlowQueryContext {
1401    fn from(ctx: QueryContextRef) -> Self {
1402        Self {
1403            catalog: ctx.current_catalog().to_string(),
1404            schema: ctx.current_schema().to_string(),
1405            timezone: ctx.timezone().to_string(),
1406        }
1407    }
1408}
1409
1410impl From<FlowQueryContext> for QueryContext {
1411    fn from(flow_ctx: FlowQueryContext) -> Self {
1412        Self {
1413            current_catalog: flow_ctx.catalog,
1414            current_schema: flow_ctx.schema,
1415            timezone: flow_ctx.timezone,
1416            extensions: HashMap::new(),
1417            channel: 0, // Use default channel for flows
1418        }
1419    }
1420}
1421
1422impl From<FlowQueryContext> for PbQueryContext {
1423    fn from(flow_ctx: FlowQueryContext) -> Self {
1424        let query_ctx: QueryContext = flow_ctx.into();
1425        query_ctx.into()
1426    }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431    use std::sync::Arc;
1432
1433    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1434    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1435    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1436    use store_api::storage::ConcreteDataType;
1437    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1438    use table::test_util::table_info::test_table_info;
1439
1440    use super::{AlterTableTask, CreateTableTask, *};
1441
1442    #[test]
1443    fn test_basic_ser_de_create_table_task() {
1444        let schema = SchemaBuilder::default().build().unwrap();
1445        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1446        let task = CreateTableTask::new(
1447            CreateTableExpr::default(),
1448            Vec::new(),
1449            RawTableInfo::from(table_info),
1450        );
1451
1452        let output = serde_json::to_vec(&task).unwrap();
1453
1454        let de = serde_json::from_slice(&output).unwrap();
1455        assert_eq!(task, de);
1456    }
1457
1458    #[test]
1459    fn test_basic_ser_de_alter_table_task() {
1460        let task = AlterTableTask {
1461            alter_table: AlterTableExpr::default(),
1462        };
1463
1464        let output = serde_json::to_vec(&task).unwrap();
1465
1466        let de = serde_json::from_slice(&output).unwrap();
1467        assert_eq!(task, de);
1468    }
1469
1470    #[test]
1471    fn test_sort_columns() {
1472        // construct RawSchema
1473        let raw_schema = RawSchema {
1474            column_schemas: vec![
1475                ColumnSchema::new(
1476                    "column3".to_string(),
1477                    ConcreteDataType::string_datatype(),
1478                    true,
1479                ),
1480                ColumnSchema::new(
1481                    "column1".to_string(),
1482                    ConcreteDataType::timestamp_millisecond_datatype(),
1483                    false,
1484                )
1485                .with_time_index(true),
1486                ColumnSchema::new(
1487                    "column2".to_string(),
1488                    ConcreteDataType::float64_datatype(),
1489                    true,
1490                ),
1491            ],
1492            timestamp_index: Some(1),
1493            version: 0,
1494        };
1495
1496        // construct RawTableMeta
1497        let raw_table_meta = RawTableMeta {
1498            schema: raw_schema,
1499            primary_key_indices: vec![0],
1500            value_indices: vec![2],
1501            engine: METRIC_ENGINE_NAME.to_string(),
1502            next_column_id: 0,
1503            region_numbers: vec![0],
1504            options: Default::default(),
1505            created_on: Default::default(),
1506            partition_key_indices: Default::default(),
1507            column_ids: Default::default(),
1508        };
1509
1510        // construct RawTableInfo
1511        let raw_table_info = RawTableInfo {
1512            ident: Default::default(),
1513            meta: raw_table_meta,
1514            name: Default::default(),
1515            desc: Default::default(),
1516            catalog_name: Default::default(),
1517            schema_name: Default::default(),
1518            table_type: TableType::Base,
1519        };
1520
1521        // construct create table expr
1522        let create_table_expr = CreateTableExpr {
1523            column_defs: vec![
1524                ColumnDef {
1525                    name: "column3".to_string(),
1526                    semantic_type: SemanticType::Tag as i32,
1527                    ..Default::default()
1528                },
1529                ColumnDef {
1530                    name: "column1".to_string(),
1531                    semantic_type: SemanticType::Timestamp as i32,
1532                    ..Default::default()
1533                },
1534                ColumnDef {
1535                    name: "column2".to_string(),
1536                    semantic_type: SemanticType::Field as i32,
1537                    ..Default::default()
1538                },
1539            ],
1540            primary_keys: vec!["column3".to_string()],
1541            ..Default::default()
1542        };
1543
1544        let mut create_table_task =
1545            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1546
1547        // Call the sort_columns method
1548        create_table_task.sort_columns();
1549
1550        // Assert that the columns are sorted correctly
1551        assert_eq!(
1552            create_table_task.create_table.column_defs[0].name,
1553            "column1".to_string()
1554        );
1555        assert_eq!(
1556            create_table_task.create_table.column_defs[1].name,
1557            "column2".to_string()
1558        );
1559        assert_eq!(
1560            create_table_task.create_table.column_defs[2].name,
1561            "column3".to_string()
1562        );
1563
1564        // Assert that the table_info is updated correctly
1565        assert_eq!(
1566            create_table_task.table_info.meta.schema.timestamp_index,
1567            Some(0)
1568        );
1569        assert_eq!(
1570            create_table_task.table_info.meta.primary_key_indices,
1571            vec![2]
1572        );
1573        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1574    }
1575
1576    #[test]
1577    fn test_flow_query_context_conversion_from_query_context() {
1578        use std::collections::HashMap;
1579        let mut extensions = HashMap::new();
1580        extensions.insert("key1".to_string(), "value1".to_string());
1581        extensions.insert("key2".to_string(), "value2".to_string());
1582
1583        let query_ctx = QueryContext {
1584            current_catalog: "test_catalog".to_string(),
1585            current_schema: "test_schema".to_string(),
1586            timezone: "UTC".to_string(),
1587            extensions,
1588            channel: 5,
1589        };
1590
1591        let flow_ctx: FlowQueryContext = query_ctx.into();
1592
1593        assert_eq!(flow_ctx.catalog, "test_catalog");
1594        assert_eq!(flow_ctx.schema, "test_schema");
1595        assert_eq!(flow_ctx.timezone, "UTC");
1596    }
1597
1598    #[test]
1599    fn test_flow_query_context_conversion_to_query_context() {
1600        let flow_ctx = FlowQueryContext {
1601            catalog: "prod_catalog".to_string(),
1602            schema: "public".to_string(),
1603            timezone: "America/New_York".to_string(),
1604        };
1605
1606        let query_ctx: QueryContext = flow_ctx.clone().into();
1607
1608        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1609        assert_eq!(query_ctx.current_schema, "public");
1610        assert_eq!(query_ctx.timezone, "America/New_York");
1611        assert!(query_ctx.extensions.is_empty());
1612        assert_eq!(query_ctx.channel, 0);
1613
1614        // Test roundtrip conversion
1615        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1616        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1617    }
1618
1619    #[test]
1620    fn test_flow_query_context_conversion_from_query_context_ref() {
1621        use common_time::Timezone;
1622        use session::context::QueryContextBuilder;
1623
1624        let session_ctx = QueryContextBuilder::default()
1625            .current_catalog("session_catalog".to_string())
1626            .current_schema("session_schema".to_string())
1627            .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1628            .build();
1629
1630        let session_ctx_ref = Arc::new(session_ctx);
1631        let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1632
1633        assert_eq!(flow_ctx.catalog, "session_catalog");
1634        assert_eq!(flow_ctx.schema, "session_schema");
1635        assert_eq!(flow_ctx.timezone, "Europe/London");
1636    }
1637
1638    #[test]
1639    fn test_flow_query_context_serialization() {
1640        let flow_ctx = FlowQueryContext {
1641            catalog: "test_catalog".to_string(),
1642            schema: "test_schema".to_string(),
1643            timezone: "UTC".to_string(),
1644        };
1645
1646        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1647        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1648
1649        assert_eq!(flow_ctx, deserialized);
1650
1651        // Verify JSON structure
1652        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1653        assert_eq!(json_value["catalog"], "test_catalog");
1654        assert_eq!(json_value["schema"], "test_schema");
1655        assert_eq!(json_value["timezone"], "UTC");
1656    }
1657
1658    #[test]
1659    fn test_flow_query_context_conversion_to_pb() {
1660        let flow_ctx = FlowQueryContext {
1661            catalog: "pb_catalog".to_string(),
1662            schema: "pb_schema".to_string(),
1663            timezone: "Asia/Tokyo".to_string(),
1664        };
1665
1666        let pb_ctx: PbQueryContext = flow_ctx.into();
1667
1668        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1669        assert_eq!(pb_ctx.current_schema, "pb_schema");
1670        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1671        assert!(pb_ctx.extensions.is_empty());
1672        assert_eq!(pb_ctx.channel, 0);
1673        assert!(pb_ctx.snapshot_seqs.is_none());
1674        assert!(pb_ctx.explain.is_none());
1675    }
1676}