common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26    AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
27    CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
28    CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
29    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
30    DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
31    DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
32    DropViewTask as PbDropViewTask, Partition, ProcedureId,
33    TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36    AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
37    CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
38    Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
39};
40use base64::engine::general_purpose;
41use base64::Engine as _;
42use common_error::ext::BoxedError;
43use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use serde_with::{serde_as, DefaultOnNull};
47use session::context::{QueryContextBuilder, QueryContextRef};
48use snafu::{OptionExt, ResultExt};
49use table::metadata::{RawTableInfo, TableId};
50use table::table_name::TableName;
51use table::table_reference::TableReference;
52
53use crate::error::{
54    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
55    InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
56};
57use crate::key::FlowId;
58
59/// DDL tasks
60#[derive(Debug, Clone)]
61pub enum DdlTask {
62    CreateTable(CreateTableTask),
63    DropTable(DropTableTask),
64    AlterTable(AlterTableTask),
65    TruncateTable(TruncateTableTask),
66    CreateLogicalTables(Vec<CreateTableTask>),
67    DropLogicalTables(Vec<DropTableTask>),
68    AlterLogicalTables(Vec<AlterTableTask>),
69    CreateDatabase(CreateDatabaseTask),
70    DropDatabase(DropDatabaseTask),
71    AlterDatabase(AlterDatabaseTask),
72    CreateFlow(CreateFlowTask),
73    DropFlow(DropFlowTask),
74    #[cfg(feature = "enterprise")]
75    DropTrigger(trigger::DropTriggerTask),
76    CreateView(CreateViewTask),
77    DropView(DropViewTask),
78    #[cfg(feature = "enterprise")]
79    CreateTrigger(trigger::CreateTriggerTask),
80}
81
82impl DdlTask {
83    /// Creates a [`DdlTask`] to create a flow.
84    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
85        DdlTask::CreateFlow(expr)
86    }
87
88    /// Creates a [`DdlTask`] to drop a flow.
89    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
90        DdlTask::DropFlow(expr)
91    }
92
93    /// Creates a [`DdlTask`] to drop a view.
94    pub fn new_drop_view(expr: DropViewTask) -> Self {
95        DdlTask::DropView(expr)
96    }
97
98    /// Creates a [`DdlTask`] to create a table.
99    pub fn new_create_table(
100        expr: CreateTableExpr,
101        partitions: Vec<Partition>,
102        table_info: RawTableInfo,
103    ) -> Self {
104        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
105    }
106
107    /// Creates a [`DdlTask`] to create several logical tables.
108    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
109        DdlTask::CreateLogicalTables(
110            table_data
111                .into_iter()
112                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
113                .collect(),
114        )
115    }
116
117    /// Creates a [`DdlTask`] to alter several logical tables.
118    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
119        DdlTask::AlterLogicalTables(
120            table_data
121                .into_iter()
122                .map(|alter_table| AlterTableTask { alter_table })
123                .collect(),
124        )
125    }
126
127    /// Creates a [`DdlTask`] to drop a table.
128    pub fn new_drop_table(
129        catalog: String,
130        schema: String,
131        table: String,
132        table_id: TableId,
133        drop_if_exists: bool,
134    ) -> Self {
135        DdlTask::DropTable(DropTableTask {
136            catalog,
137            schema,
138            table,
139            table_id,
140            drop_if_exists,
141        })
142    }
143
144    /// Creates a [`DdlTask`] to create a database.
145    pub fn new_create_database(
146        catalog: String,
147        schema: String,
148        create_if_not_exists: bool,
149        options: HashMap<String, String>,
150    ) -> Self {
151        DdlTask::CreateDatabase(CreateDatabaseTask {
152            catalog,
153            schema,
154            create_if_not_exists,
155            options,
156        })
157    }
158
159    /// Creates a [`DdlTask`] to drop a database.
160    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
161        DdlTask::DropDatabase(DropDatabaseTask {
162            catalog,
163            schema,
164            drop_if_exists,
165        })
166    }
167
168    /// Creates a [`DdlTask`] to alter a database.
169    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
170        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
171    }
172
173    /// Creates a [`DdlTask`] to alter a table.
174    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
175        DdlTask::AlterTable(AlterTableTask { alter_table })
176    }
177
178    /// Creates a [`DdlTask`] to truncate a table.
179    pub fn new_truncate_table(
180        catalog: String,
181        schema: String,
182        table: String,
183        table_id: TableId,
184        time_ranges: Vec<(Timestamp, Timestamp)>,
185    ) -> Self {
186        DdlTask::TruncateTable(TruncateTableTask {
187            catalog,
188            schema,
189            table,
190            table_id,
191            time_ranges,
192        })
193    }
194
195    /// Creates a [`DdlTask`] to create a view.
196    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
197        DdlTask::CreateView(CreateViewTask {
198            create_view,
199            view_info,
200        })
201    }
202}
203
204impl TryFrom<Task> for DdlTask {
205    type Error = error::Error;
206    fn try_from(task: Task) -> Result<Self> {
207        match task {
208            Task::CreateTableTask(create_table) => {
209                Ok(DdlTask::CreateTable(create_table.try_into()?))
210            }
211            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
212            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
213            Task::TruncateTableTask(truncate_table) => {
214                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
215            }
216            Task::CreateTableTasks(create_tables) => {
217                let tasks = create_tables
218                    .tasks
219                    .into_iter()
220                    .map(|task| task.try_into())
221                    .collect::<Result<Vec<_>>>()?;
222
223                Ok(DdlTask::CreateLogicalTables(tasks))
224            }
225            Task::DropTableTasks(drop_tables) => {
226                let tasks = drop_tables
227                    .tasks
228                    .into_iter()
229                    .map(|task| task.try_into())
230                    .collect::<Result<Vec<_>>>()?;
231
232                Ok(DdlTask::DropLogicalTables(tasks))
233            }
234            Task::AlterTableTasks(alter_tables) => {
235                let tasks = alter_tables
236                    .tasks
237                    .into_iter()
238                    .map(|task| task.try_into())
239                    .collect::<Result<Vec<_>>>()?;
240
241                Ok(DdlTask::AlterLogicalTables(tasks))
242            }
243            Task::CreateDatabaseTask(create_database) => {
244                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
245            }
246            Task::DropDatabaseTask(drop_database) => {
247                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
248            }
249            Task::AlterDatabaseTask(alter_database) => {
250                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
251            }
252            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
253            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
254            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
255            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
256            Task::CreateTriggerTask(create_trigger) => {
257                #[cfg(feature = "enterprise")]
258                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
259                #[cfg(not(feature = "enterprise"))]
260                {
261                    let _ = create_trigger;
262                    crate::error::UnsupportedSnafu {
263                        operation: "create trigger",
264                    }
265                    .fail()
266                }
267            }
268            Task::DropTriggerTask(drop_trigger) => {
269                #[cfg(feature = "enterprise")]
270                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
271                #[cfg(not(feature = "enterprise"))]
272                {
273                    let _ = drop_trigger;
274                    crate::error::UnsupportedSnafu {
275                        operation: "drop trigger",
276                    }
277                    .fail()
278                }
279            }
280        }
281    }
282}
283
284#[derive(Clone)]
285pub struct SubmitDdlTaskRequest {
286    pub query_context: QueryContextRef,
287    pub task: DdlTask,
288}
289
290impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
291    type Error = error::Error;
292
293    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
294        let task = match request.task {
295            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
296            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
297            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
298            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
299            DdlTask::CreateLogicalTables(tasks) => {
300                let tasks = tasks
301                    .into_iter()
302                    .map(|task| task.try_into())
303                    .collect::<Result<Vec<_>>>()?;
304
305                Task::CreateTableTasks(PbCreateTableTasks { tasks })
306            }
307            DdlTask::DropLogicalTables(tasks) => {
308                let tasks = tasks
309                    .into_iter()
310                    .map(|task| task.into())
311                    .collect::<Vec<_>>();
312
313                Task::DropTableTasks(PbDropTableTasks { tasks })
314            }
315            DdlTask::AlterLogicalTables(tasks) => {
316                let tasks = tasks
317                    .into_iter()
318                    .map(|task| task.try_into())
319                    .collect::<Result<Vec<_>>>()?;
320
321                Task::AlterTableTasks(PbAlterTableTasks { tasks })
322            }
323            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
324            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
325            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
326            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
327            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
328            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
329            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
330            #[cfg(feature = "enterprise")]
331            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
332            #[cfg(feature = "enterprise")]
333            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
334        };
335
336        Ok(Self {
337            header: None,
338            query_context: Some((*request.query_context).clone().into()),
339            task: Some(task),
340        })
341    }
342}
343
344#[derive(Debug, Default)]
345pub struct SubmitDdlTaskResponse {
346    pub key: Vec<u8>,
347    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
348    pub table_ids: Vec<TableId>,
349}
350
351impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
352    type Error = error::Error;
353
354    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
355        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
356        Ok(Self {
357            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
358            table_ids,
359        })
360    }
361}
362
363impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
364    fn from(val: SubmitDdlTaskResponse) -> Self {
365        Self {
366            pid: Some(ProcedureId { key: val.key }),
367            table_ids: val
368                .table_ids
369                .into_iter()
370                .map(|id| api::v1::TableId { id })
371                .collect(),
372            ..Default::default()
373        }
374    }
375}
376
377/// A `CREATE VIEW` task.
378#[derive(Debug, PartialEq, Clone)]
379pub struct CreateViewTask {
380    pub create_view: CreateViewExpr,
381    pub view_info: RawTableInfo,
382}
383
384impl CreateViewTask {
385    /// Returns the [`TableReference`] of view.
386    pub fn table_ref(&self) -> TableReference {
387        TableReference {
388            catalog: &self.create_view.catalog_name,
389            schema: &self.create_view.schema_name,
390            table: &self.create_view.view_name,
391        }
392    }
393
394    /// Returns the encoded logical plan
395    pub fn raw_logical_plan(&self) -> &Vec<u8> {
396        &self.create_view.logical_plan
397    }
398
399    /// Returns the view definition in SQL
400    pub fn view_definition(&self) -> &str {
401        &self.create_view.definition
402    }
403
404    /// Returns the resolved table names in view's logical plan
405    pub fn table_names(&self) -> HashSet<TableName> {
406        self.create_view
407            .table_names
408            .iter()
409            .map(|t| t.clone().into())
410            .collect()
411    }
412
413    /// Returns the view's columns
414    pub fn columns(&self) -> &Vec<String> {
415        &self.create_view.columns
416    }
417
418    /// Returns the original logical plan's columns
419    pub fn plan_columns(&self) -> &Vec<String> {
420        &self.create_view.plan_columns
421    }
422}
423
424impl TryFrom<PbCreateViewTask> for CreateViewTask {
425    type Error = error::Error;
426
427    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
428        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
429
430        Ok(CreateViewTask {
431            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
432                err_msg: "expected create view",
433            })?,
434            view_info,
435        })
436    }
437}
438
439impl TryFrom<CreateViewTask> for PbCreateViewTask {
440    type Error = error::Error;
441
442    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
443        Ok(PbCreateViewTask {
444            create_view: Some(task.create_view),
445            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
446        })
447    }
448}
449
450impl Serialize for CreateViewTask {
451    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
452    where
453        S: serde::Serializer,
454    {
455        let view_info = serde_json::to_vec(&self.view_info)
456            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
457
458        let pb = PbCreateViewTask {
459            create_view: Some(self.create_view.clone()),
460            view_info,
461        };
462        let buf = pb.encode_to_vec();
463        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
464        serializer.serialize_str(&encoded)
465    }
466}
467
468impl<'de> Deserialize<'de> for CreateViewTask {
469    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
470    where
471        D: serde::Deserializer<'de>,
472    {
473        let encoded = String::deserialize(deserializer)?;
474        let buf = general_purpose::STANDARD_NO_PAD
475            .decode(encoded)
476            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
477        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
478            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
479
480        let expr = CreateViewTask::try_from(expr)
481            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
482
483        Ok(expr)
484    }
485}
486
487/// A `DROP VIEW` task.
488#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
489pub struct DropViewTask {
490    pub catalog: String,
491    pub schema: String,
492    pub view: String,
493    pub view_id: TableId,
494    pub drop_if_exists: bool,
495}
496
497impl DropViewTask {
498    /// Returns the [`TableReference`] of view.
499    pub fn table_ref(&self) -> TableReference {
500        TableReference {
501            catalog: &self.catalog,
502            schema: &self.schema,
503            table: &self.view,
504        }
505    }
506}
507
508impl TryFrom<PbDropViewTask> for DropViewTask {
509    type Error = error::Error;
510
511    fn try_from(pb: PbDropViewTask) -> Result<Self> {
512        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
513            err_msg: "expected drop view",
514        })?;
515
516        Ok(DropViewTask {
517            catalog: expr.catalog_name,
518            schema: expr.schema_name,
519            view: expr.view_name,
520            view_id: expr
521                .view_id
522                .context(error::InvalidProtoMsgSnafu {
523                    err_msg: "expected view_id",
524                })?
525                .id,
526            drop_if_exists: expr.drop_if_exists,
527        })
528    }
529}
530
531impl From<DropViewTask> for PbDropViewTask {
532    fn from(task: DropViewTask) -> Self {
533        PbDropViewTask {
534            drop_view: Some(DropViewExpr {
535                catalog_name: task.catalog,
536                schema_name: task.schema,
537                view_name: task.view,
538                view_id: Some(api::v1::TableId { id: task.view_id }),
539                drop_if_exists: task.drop_if_exists,
540            }),
541        }
542    }
543}
544
545#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
546pub struct DropTableTask {
547    pub catalog: String,
548    pub schema: String,
549    pub table: String,
550    pub table_id: TableId,
551    #[serde(default)]
552    pub drop_if_exists: bool,
553}
554
555impl DropTableTask {
556    pub fn table_ref(&self) -> TableReference {
557        TableReference {
558            catalog: &self.catalog,
559            schema: &self.schema,
560            table: &self.table,
561        }
562    }
563
564    pub fn table_name(&self) -> TableName {
565        TableName {
566            catalog_name: self.catalog.to_string(),
567            schema_name: self.schema.to_string(),
568            table_name: self.table.to_string(),
569        }
570    }
571}
572
573impl TryFrom<PbDropTableTask> for DropTableTask {
574    type Error = error::Error;
575
576    fn try_from(pb: PbDropTableTask) -> Result<Self> {
577        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
578            err_msg: "expected drop table",
579        })?;
580
581        Ok(Self {
582            catalog: drop_table.catalog_name,
583            schema: drop_table.schema_name,
584            table: drop_table.table_name,
585            table_id: drop_table
586                .table_id
587                .context(error::InvalidProtoMsgSnafu {
588                    err_msg: "expected table_id",
589                })?
590                .id,
591            drop_if_exists: drop_table.drop_if_exists,
592        })
593    }
594}
595
596impl From<DropTableTask> for PbDropTableTask {
597    fn from(task: DropTableTask) -> Self {
598        PbDropTableTask {
599            drop_table: Some(DropTableExpr {
600                catalog_name: task.catalog,
601                schema_name: task.schema,
602                table_name: task.table,
603                table_id: Some(api::v1::TableId { id: task.table_id }),
604                drop_if_exists: task.drop_if_exists,
605            }),
606        }
607    }
608}
609
610#[derive(Debug, PartialEq, Clone)]
611pub struct CreateTableTask {
612    pub create_table: CreateTableExpr,
613    pub partitions: Vec<Partition>,
614    pub table_info: RawTableInfo,
615}
616
617impl TryFrom<PbCreateTableTask> for CreateTableTask {
618    type Error = error::Error;
619
620    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
621        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
622
623        Ok(CreateTableTask::new(
624            pb.create_table.context(error::InvalidProtoMsgSnafu {
625                err_msg: "expected create table",
626            })?,
627            pb.partitions,
628            table_info,
629        ))
630    }
631}
632
633impl TryFrom<CreateTableTask> for PbCreateTableTask {
634    type Error = error::Error;
635
636    fn try_from(task: CreateTableTask) -> Result<Self> {
637        Ok(PbCreateTableTask {
638            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
639            create_table: Some(task.create_table),
640            partitions: task.partitions,
641        })
642    }
643}
644
645impl CreateTableTask {
646    pub fn new(
647        expr: CreateTableExpr,
648        partitions: Vec<Partition>,
649        table_info: RawTableInfo,
650    ) -> CreateTableTask {
651        CreateTableTask {
652            create_table: expr,
653            partitions,
654            table_info,
655        }
656    }
657
658    pub fn table_name(&self) -> TableName {
659        let table = &self.create_table;
660
661        TableName {
662            catalog_name: table.catalog_name.to_string(),
663            schema_name: table.schema_name.to_string(),
664            table_name: table.table_name.to_string(),
665        }
666    }
667
668    pub fn table_ref(&self) -> TableReference {
669        let table = &self.create_table;
670
671        TableReference {
672            catalog: &table.catalog_name,
673            schema: &table.schema_name,
674            table: &table.table_name,
675        }
676    }
677
678    /// Sets the `table_info`'s table_id.
679    pub fn set_table_id(&mut self, table_id: TableId) {
680        self.table_info.ident.table_id = table_id;
681    }
682
683    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
684    ///
685    /// This function won't do any check or verification. Caller should
686    /// ensure this task is valid.
687    pub fn sort_columns(&mut self) {
688        // sort create table expr
689        // sort column_defs by name
690        self.create_table
691            .column_defs
692            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
693
694        self.table_info.sort_columns();
695    }
696}
697
698impl Serialize for CreateTableTask {
699    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
700    where
701        S: serde::Serializer,
702    {
703        let table_info = serde_json::to_vec(&self.table_info)
704            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
705
706        let pb = PbCreateTableTask {
707            create_table: Some(self.create_table.clone()),
708            partitions: self.partitions.clone(),
709            table_info,
710        };
711        let buf = pb.encode_to_vec();
712        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
713        serializer.serialize_str(&encoded)
714    }
715}
716
717impl<'de> Deserialize<'de> for CreateTableTask {
718    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
719    where
720        D: serde::Deserializer<'de>,
721    {
722        let encoded = String::deserialize(deserializer)?;
723        let buf = general_purpose::STANDARD_NO_PAD
724            .decode(encoded)
725            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
726        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
727            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
728
729        let expr = CreateTableTask::try_from(expr)
730            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
731
732        Ok(expr)
733    }
734}
735
736#[derive(Debug, PartialEq, Clone)]
737pub struct AlterTableTask {
738    // TODO(CookiePieWw): Replace proto struct with user-defined struct
739    pub alter_table: AlterTableExpr,
740}
741
742impl AlterTableTask {
743    pub fn validate(&self) -> Result<()> {
744        self.alter_table
745            .kind
746            .as_ref()
747            .context(error::UnexpectedSnafu {
748                err_msg: "'kind' is absent",
749            })?;
750        Ok(())
751    }
752
753    pub fn table_ref(&self) -> TableReference {
754        TableReference {
755            catalog: &self.alter_table.catalog_name,
756            schema: &self.alter_table.schema_name,
757            table: &self.alter_table.table_name,
758        }
759    }
760
761    pub fn table_name(&self) -> TableName {
762        let table = &self.alter_table;
763
764        TableName {
765            catalog_name: table.catalog_name.to_string(),
766            schema_name: table.schema_name.to_string(),
767            table_name: table.table_name.to_string(),
768        }
769    }
770}
771
772impl TryFrom<PbAlterTableTask> for AlterTableTask {
773    type Error = error::Error;
774
775    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
776        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
777            err_msg: "expected alter_table",
778        })?;
779
780        Ok(AlterTableTask { alter_table })
781    }
782}
783
784impl TryFrom<AlterTableTask> for PbAlterTableTask {
785    type Error = error::Error;
786
787    fn try_from(task: AlterTableTask) -> Result<Self> {
788        Ok(PbAlterTableTask {
789            alter_table: Some(task.alter_table),
790        })
791    }
792}
793
794impl Serialize for AlterTableTask {
795    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
796    where
797        S: serde::Serializer,
798    {
799        let pb = PbAlterTableTask {
800            alter_table: Some(self.alter_table.clone()),
801        };
802        let buf = pb.encode_to_vec();
803        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
804        serializer.serialize_str(&encoded)
805    }
806}
807
808impl<'de> Deserialize<'de> for AlterTableTask {
809    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
810    where
811        D: serde::Deserializer<'de>,
812    {
813        let encoded = String::deserialize(deserializer)?;
814        let buf = general_purpose::STANDARD_NO_PAD
815            .decode(encoded)
816            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
817        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
818            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
819
820        let expr = AlterTableTask::try_from(expr)
821            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
822
823        Ok(expr)
824    }
825}
826
827#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
828pub struct TruncateTableTask {
829    pub catalog: String,
830    pub schema: String,
831    pub table: String,
832    pub table_id: TableId,
833    pub time_ranges: Vec<(Timestamp, Timestamp)>,
834}
835
836impl TruncateTableTask {
837    pub fn table_ref(&self) -> TableReference {
838        TableReference {
839            catalog: &self.catalog,
840            schema: &self.schema,
841            table: &self.table,
842        }
843    }
844
845    pub fn table_name(&self) -> TableName {
846        TableName {
847            catalog_name: self.catalog.to_string(),
848            schema_name: self.schema.to_string(),
849            table_name: self.table.to_string(),
850        }
851    }
852}
853
854impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
855    type Error = error::Error;
856
857    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
858        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
859            err_msg: "expected truncate table",
860        })?;
861
862        Ok(Self {
863            catalog: truncate_table.catalog_name,
864            schema: truncate_table.schema_name,
865            table: truncate_table.table_name,
866            table_id: truncate_table
867                .table_id
868                .context(error::InvalidProtoMsgSnafu {
869                    err_msg: "expected table_id",
870                })?
871                .id,
872            time_ranges: truncate_table
873                .time_ranges
874                .map(from_pb_time_ranges)
875                .transpose()
876                .map_err(BoxedError::new)
877                .context(ExternalSnafu)?
878                .unwrap_or_default(),
879        })
880    }
881}
882
883impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
884    type Error = error::Error;
885
886    fn try_from(task: TruncateTableTask) -> Result<Self> {
887        Ok(PbTruncateTableTask {
888            truncate_table: Some(TruncateTableExpr {
889                catalog_name: task.catalog,
890                schema_name: task.schema,
891                table_name: task.table,
892                table_id: Some(api::v1::TableId { id: task.table_id }),
893                time_ranges: Some(
894                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
895                ),
896            }),
897        })
898    }
899}
900
901#[serde_as]
902#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
903pub struct CreateDatabaseTask {
904    pub catalog: String,
905    pub schema: String,
906    pub create_if_not_exists: bool,
907    #[serde_as(deserialize_as = "DefaultOnNull")]
908    pub options: HashMap<String, String>,
909}
910
911impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
912    type Error = error::Error;
913
914    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
915        let CreateDatabaseExpr {
916            catalog_name,
917            schema_name,
918            create_if_not_exists,
919            options,
920        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
921            err_msg: "expected create database",
922        })?;
923
924        Ok(CreateDatabaseTask {
925            catalog: catalog_name,
926            schema: schema_name,
927            create_if_not_exists,
928            options,
929        })
930    }
931}
932
933impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
934    type Error = error::Error;
935
936    fn try_from(
937        CreateDatabaseTask {
938            catalog,
939            schema,
940            create_if_not_exists,
941            options,
942        }: CreateDatabaseTask,
943    ) -> Result<Self> {
944        Ok(PbCreateDatabaseTask {
945            create_database: Some(CreateDatabaseExpr {
946                catalog_name: catalog,
947                schema_name: schema,
948                create_if_not_exists,
949                options,
950            }),
951        })
952    }
953}
954
955#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
956pub struct DropDatabaseTask {
957    pub catalog: String,
958    pub schema: String,
959    pub drop_if_exists: bool,
960}
961
962impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
963    type Error = error::Error;
964
965    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
966        let DropDatabaseExpr {
967            catalog_name,
968            schema_name,
969            drop_if_exists,
970        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
971            err_msg: "expected drop database",
972        })?;
973
974        Ok(DropDatabaseTask {
975            catalog: catalog_name,
976            schema: schema_name,
977            drop_if_exists,
978        })
979    }
980}
981
982impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
983    type Error = error::Error;
984
985    fn try_from(
986        DropDatabaseTask {
987            catalog,
988            schema,
989            drop_if_exists,
990        }: DropDatabaseTask,
991    ) -> Result<Self> {
992        Ok(PbDropDatabaseTask {
993            drop_database: Some(DropDatabaseExpr {
994                catalog_name: catalog,
995                schema_name: schema,
996                drop_if_exists,
997            }),
998        })
999    }
1000}
1001
1002#[derive(Debug, PartialEq, Clone)]
1003pub struct AlterDatabaseTask {
1004    pub alter_expr: AlterDatabaseExpr,
1005}
1006
1007impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1008    type Error = error::Error;
1009
1010    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1011        Ok(PbAlterDatabaseTask {
1012            task: Some(task.alter_expr),
1013        })
1014    }
1015}
1016
1017impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1018    type Error = error::Error;
1019
1020    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1021        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1022            err_msg: "expected alter database",
1023        })?;
1024
1025        Ok(AlterDatabaseTask { alter_expr })
1026    }
1027}
1028
1029impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1030    type Error = error::Error;
1031
1032    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1033        match pb {
1034            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1035                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1036                    options
1037                        .set_database_options
1038                        .into_iter()
1039                        .map(SetDatabaseOption::try_from)
1040                        .collect::<Result<Vec<_>>>()?,
1041                )))
1042            }
1043            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1044                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1045                    options
1046                        .keys
1047                        .iter()
1048                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1049                        .collect::<Result<Vec<_>>>()?,
1050                )),
1051            ),
1052        }
1053    }
1054}
1055
1056const TTL_KEY: &str = "ttl";
1057
1058impl TryFrom<PbOption> for SetDatabaseOption {
1059    type Error = error::Error;
1060
1061    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1062        match key.to_ascii_lowercase().as_str() {
1063            TTL_KEY => {
1064                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1065                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1066
1067                Ok(SetDatabaseOption::Ttl(ttl))
1068            }
1069            _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1070        }
1071    }
1072}
1073
1074#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1075pub enum SetDatabaseOption {
1076    Ttl(DatabaseTimeToLive),
1077}
1078
1079#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1080pub enum UnsetDatabaseOption {
1081    Ttl,
1082}
1083
1084impl TryFrom<&str> for UnsetDatabaseOption {
1085    type Error = error::Error;
1086
1087    fn try_from(key: &str) -> Result<Self> {
1088        match key.to_ascii_lowercase().as_str() {
1089            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1090            _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1091        }
1092    }
1093}
1094
1095#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1096pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1097
1098#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1099pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1100
1101#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1102pub enum AlterDatabaseKind {
1103    SetDatabaseOptions(SetDatabaseOptions),
1104    UnsetDatabaseOptions(UnsetDatabaseOptions),
1105}
1106
1107impl AlterDatabaseTask {
1108    pub fn catalog(&self) -> &str {
1109        &self.alter_expr.catalog_name
1110    }
1111
1112    pub fn schema(&self) -> &str {
1113        &self.alter_expr.catalog_name
1114    }
1115}
1116
1117/// Create flow
1118#[derive(Debug, Clone, Serialize, Deserialize)]
1119pub struct CreateFlowTask {
1120    pub catalog_name: String,
1121    pub flow_name: String,
1122    pub source_table_names: Vec<TableName>,
1123    pub sink_table_name: TableName,
1124    pub or_replace: bool,
1125    pub create_if_not_exists: bool,
1126    /// Duration in seconds. Data older than this duration will not be used.
1127    pub expire_after: Option<i64>,
1128    pub comment: String,
1129    pub sql: String,
1130    pub flow_options: HashMap<String, String>,
1131}
1132
1133impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1134    type Error = error::Error;
1135
1136    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1137        let CreateFlowExpr {
1138            catalog_name,
1139            flow_name,
1140            source_table_names,
1141            sink_table_name,
1142            or_replace,
1143            create_if_not_exists,
1144            expire_after,
1145            comment,
1146            sql,
1147            flow_options,
1148        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1149            err_msg: "expected create_flow",
1150        })?;
1151
1152        Ok(CreateFlowTask {
1153            catalog_name,
1154            flow_name,
1155            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1156            sink_table_name: sink_table_name
1157                .context(error::InvalidProtoMsgSnafu {
1158                    err_msg: "expected sink_table_name",
1159                })?
1160                .into(),
1161            or_replace,
1162            create_if_not_exists,
1163            expire_after: expire_after.map(|e| e.value),
1164            comment,
1165            sql,
1166            flow_options,
1167        })
1168    }
1169}
1170
1171impl From<CreateFlowTask> for PbCreateFlowTask {
1172    fn from(
1173        CreateFlowTask {
1174            catalog_name,
1175            flow_name,
1176            source_table_names,
1177            sink_table_name,
1178            or_replace,
1179            create_if_not_exists,
1180            expire_after,
1181            comment,
1182            sql,
1183            flow_options,
1184        }: CreateFlowTask,
1185    ) -> Self {
1186        PbCreateFlowTask {
1187            create_flow: Some(CreateFlowExpr {
1188                catalog_name,
1189                flow_name,
1190                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1191                sink_table_name: Some(sink_table_name.into()),
1192                or_replace,
1193                create_if_not_exists,
1194                expire_after: expire_after.map(|value| ExpireAfter { value }),
1195                comment,
1196                sql,
1197                flow_options,
1198            }),
1199        }
1200    }
1201}
1202
1203/// Drop flow
1204#[derive(Debug, Clone, Serialize, Deserialize)]
1205pub struct DropFlowTask {
1206    pub catalog_name: String,
1207    pub flow_name: String,
1208    pub flow_id: FlowId,
1209    pub drop_if_exists: bool,
1210}
1211
1212impl TryFrom<PbDropFlowTask> for DropFlowTask {
1213    type Error = error::Error;
1214
1215    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1216        let DropFlowExpr {
1217            catalog_name,
1218            flow_name,
1219            flow_id,
1220            drop_if_exists,
1221        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1222            err_msg: "expected drop_flow",
1223        })?;
1224        let flow_id = flow_id
1225            .context(error::InvalidProtoMsgSnafu {
1226                err_msg: "expected flow_id",
1227            })?
1228            .id;
1229        Ok(DropFlowTask {
1230            catalog_name,
1231            flow_name,
1232            flow_id,
1233            drop_if_exists,
1234        })
1235    }
1236}
1237
1238impl From<DropFlowTask> for PbDropFlowTask {
1239    fn from(
1240        DropFlowTask {
1241            catalog_name,
1242            flow_name,
1243            flow_id,
1244            drop_if_exists,
1245        }: DropFlowTask,
1246    ) -> Self {
1247        PbDropFlowTask {
1248            drop_flow: Some(DropFlowExpr {
1249                catalog_name,
1250                flow_name,
1251                flow_id: Some(api::v1::FlowId { id: flow_id }),
1252                drop_if_exists,
1253            }),
1254        }
1255    }
1256}
1257
1258#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1259pub struct QueryContext {
1260    pub(crate) current_catalog: String,
1261    pub(crate) current_schema: String,
1262    pub(crate) timezone: String,
1263    pub(crate) extensions: HashMap<String, String>,
1264    pub(crate) channel: u8,
1265}
1266
1267impl QueryContext {
1268    /// Get the current catalog
1269    pub fn current_catalog(&self) -> &str {
1270        &self.current_catalog
1271    }
1272
1273    /// Get the current schema
1274    pub fn current_schema(&self) -> &str {
1275        &self.current_schema
1276    }
1277
1278    /// Get the timezone
1279    pub fn timezone(&self) -> &str {
1280        &self.timezone
1281    }
1282
1283    /// Get the extensions
1284    pub fn extensions(&self) -> &HashMap<String, String> {
1285        &self.extensions
1286    }
1287
1288    /// Get the channel
1289    pub fn channel(&self) -> u8 {
1290        self.channel
1291    }
1292}
1293
1294/// Lightweight query context for flow operations containing only essential fields.
1295/// This is a subset of QueryContext that includes only the fields actually needed
1296/// for flow creation and execution.
1297#[derive(Debug, Clone, Serialize, PartialEq)]
1298pub struct FlowQueryContext {
1299    /// Current catalog name - needed for flow metadata and recovery
1300    pub(crate) catalog: String,
1301    /// Current schema name - needed for table resolution during flow execution
1302    pub(crate) schema: String,
1303    /// Timezone for timestamp operations in the flow
1304    pub(crate) timezone: String,
1305}
1306
1307impl<'de> Deserialize<'de> for FlowQueryContext {
1308    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1309    where
1310        D: serde::Deserializer<'de>,
1311    {
1312        // Support both QueryContext format and FlowQueryContext format
1313        #[derive(Deserialize)]
1314        #[serde(untagged)]
1315        enum ContextCompat {
1316            Flow(FlowQueryContextHelper),
1317            Full(QueryContext),
1318        }
1319
1320        #[derive(Deserialize)]
1321        struct FlowQueryContextHelper {
1322            catalog: String,
1323            schema: String,
1324            timezone: String,
1325        }
1326
1327        match ContextCompat::deserialize(deserializer)? {
1328            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1329                catalog: helper.catalog,
1330                schema: helper.schema,
1331                timezone: helper.timezone,
1332            }),
1333            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1334        }
1335    }
1336}
1337
1338impl From<QueryContextRef> for QueryContext {
1339    fn from(query_context: QueryContextRef) -> Self {
1340        QueryContext {
1341            current_catalog: query_context.current_catalog().to_string(),
1342            current_schema: query_context.current_schema().to_string(),
1343            timezone: query_context.timezone().to_string(),
1344            extensions: query_context.extensions(),
1345            channel: query_context.channel() as u8,
1346        }
1347    }
1348}
1349
1350impl TryFrom<QueryContext> for session::context::QueryContext {
1351    type Error = error::Error;
1352    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1353        Ok(QueryContextBuilder::default()
1354            .current_catalog(value.current_catalog)
1355            .current_schema(value.current_schema)
1356            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1357            .extensions(value.extensions)
1358            .channel((value.channel as u32).into())
1359            .build())
1360    }
1361}
1362
1363impl From<QueryContext> for PbQueryContext {
1364    fn from(
1365        QueryContext {
1366            current_catalog,
1367            current_schema,
1368            timezone,
1369            extensions,
1370            channel,
1371        }: QueryContext,
1372    ) -> Self {
1373        PbQueryContext {
1374            current_catalog,
1375            current_schema,
1376            timezone,
1377            extensions,
1378            channel: channel as u32,
1379            snapshot_seqs: None,
1380            explain: None,
1381        }
1382    }
1383}
1384
1385impl From<QueryContext> for FlowQueryContext {
1386    fn from(ctx: QueryContext) -> Self {
1387        Self {
1388            catalog: ctx.current_catalog,
1389            schema: ctx.current_schema,
1390            timezone: ctx.timezone,
1391        }
1392    }
1393}
1394
1395impl From<QueryContextRef> for FlowQueryContext {
1396    fn from(ctx: QueryContextRef) -> Self {
1397        Self {
1398            catalog: ctx.current_catalog().to_string(),
1399            schema: ctx.current_schema().to_string(),
1400            timezone: ctx.timezone().to_string(),
1401        }
1402    }
1403}
1404
1405impl From<FlowQueryContext> for QueryContext {
1406    fn from(flow_ctx: FlowQueryContext) -> Self {
1407        Self {
1408            current_catalog: flow_ctx.catalog,
1409            current_schema: flow_ctx.schema,
1410            timezone: flow_ctx.timezone,
1411            extensions: HashMap::new(),
1412            channel: 0, // Use default channel for flows
1413        }
1414    }
1415}
1416
1417impl From<FlowQueryContext> for PbQueryContext {
1418    fn from(flow_ctx: FlowQueryContext) -> Self {
1419        let query_ctx: QueryContext = flow_ctx.into();
1420        query_ctx.into()
1421    }
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426    use std::sync::Arc;
1427
1428    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1429    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1430    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1431    use store_api::storage::ConcreteDataType;
1432    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1433    use table::test_util::table_info::test_table_info;
1434
1435    use super::{AlterTableTask, CreateTableTask, *};
1436
1437    #[test]
1438    fn test_basic_ser_de_create_table_task() {
1439        let schema = SchemaBuilder::default().build().unwrap();
1440        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1441        let task = CreateTableTask::new(
1442            CreateTableExpr::default(),
1443            Vec::new(),
1444            RawTableInfo::from(table_info),
1445        );
1446
1447        let output = serde_json::to_vec(&task).unwrap();
1448
1449        let de = serde_json::from_slice(&output).unwrap();
1450        assert_eq!(task, de);
1451    }
1452
1453    #[test]
1454    fn test_basic_ser_de_alter_table_task() {
1455        let task = AlterTableTask {
1456            alter_table: AlterTableExpr::default(),
1457        };
1458
1459        let output = serde_json::to_vec(&task).unwrap();
1460
1461        let de = serde_json::from_slice(&output).unwrap();
1462        assert_eq!(task, de);
1463    }
1464
1465    #[test]
1466    fn test_sort_columns() {
1467        // construct RawSchema
1468        let raw_schema = RawSchema {
1469            column_schemas: vec![
1470                ColumnSchema::new(
1471                    "column3".to_string(),
1472                    ConcreteDataType::string_datatype(),
1473                    true,
1474                ),
1475                ColumnSchema::new(
1476                    "column1".to_string(),
1477                    ConcreteDataType::timestamp_millisecond_datatype(),
1478                    false,
1479                )
1480                .with_time_index(true),
1481                ColumnSchema::new(
1482                    "column2".to_string(),
1483                    ConcreteDataType::float64_datatype(),
1484                    true,
1485                ),
1486            ],
1487            timestamp_index: Some(1),
1488            version: 0,
1489        };
1490
1491        // construct RawTableMeta
1492        let raw_table_meta = RawTableMeta {
1493            schema: raw_schema,
1494            primary_key_indices: vec![0],
1495            value_indices: vec![2],
1496            engine: METRIC_ENGINE_NAME.to_string(),
1497            next_column_id: 0,
1498            region_numbers: vec![0],
1499            options: Default::default(),
1500            created_on: Default::default(),
1501            partition_key_indices: Default::default(),
1502            column_ids: Default::default(),
1503        };
1504
1505        // construct RawTableInfo
1506        let raw_table_info = RawTableInfo {
1507            ident: Default::default(),
1508            meta: raw_table_meta,
1509            name: Default::default(),
1510            desc: Default::default(),
1511            catalog_name: Default::default(),
1512            schema_name: Default::default(),
1513            table_type: TableType::Base,
1514        };
1515
1516        // construct create table expr
1517        let create_table_expr = CreateTableExpr {
1518            column_defs: vec![
1519                ColumnDef {
1520                    name: "column3".to_string(),
1521                    semantic_type: SemanticType::Tag as i32,
1522                    ..Default::default()
1523                },
1524                ColumnDef {
1525                    name: "column1".to_string(),
1526                    semantic_type: SemanticType::Timestamp as i32,
1527                    ..Default::default()
1528                },
1529                ColumnDef {
1530                    name: "column2".to_string(),
1531                    semantic_type: SemanticType::Field as i32,
1532                    ..Default::default()
1533                },
1534            ],
1535            primary_keys: vec!["column3".to_string()],
1536            ..Default::default()
1537        };
1538
1539        let mut create_table_task =
1540            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1541
1542        // Call the sort_columns method
1543        create_table_task.sort_columns();
1544
1545        // Assert that the columns are sorted correctly
1546        assert_eq!(
1547            create_table_task.create_table.column_defs[0].name,
1548            "column1".to_string()
1549        );
1550        assert_eq!(
1551            create_table_task.create_table.column_defs[1].name,
1552            "column2".to_string()
1553        );
1554        assert_eq!(
1555            create_table_task.create_table.column_defs[2].name,
1556            "column3".to_string()
1557        );
1558
1559        // Assert that the table_info is updated correctly
1560        assert_eq!(
1561            create_table_task.table_info.meta.schema.timestamp_index,
1562            Some(0)
1563        );
1564        assert_eq!(
1565            create_table_task.table_info.meta.primary_key_indices,
1566            vec![2]
1567        );
1568        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1569    }
1570
1571    #[test]
1572    fn test_flow_query_context_conversion_from_query_context() {
1573        use std::collections::HashMap;
1574        let mut extensions = HashMap::new();
1575        extensions.insert("key1".to_string(), "value1".to_string());
1576        extensions.insert("key2".to_string(), "value2".to_string());
1577
1578        let query_ctx = QueryContext {
1579            current_catalog: "test_catalog".to_string(),
1580            current_schema: "test_schema".to_string(),
1581            timezone: "UTC".to_string(),
1582            extensions,
1583            channel: 5,
1584        };
1585
1586        let flow_ctx: FlowQueryContext = query_ctx.into();
1587
1588        assert_eq!(flow_ctx.catalog, "test_catalog");
1589        assert_eq!(flow_ctx.schema, "test_schema");
1590        assert_eq!(flow_ctx.timezone, "UTC");
1591    }
1592
1593    #[test]
1594    fn test_flow_query_context_conversion_to_query_context() {
1595        let flow_ctx = FlowQueryContext {
1596            catalog: "prod_catalog".to_string(),
1597            schema: "public".to_string(),
1598            timezone: "America/New_York".to_string(),
1599        };
1600
1601        let query_ctx: QueryContext = flow_ctx.clone().into();
1602
1603        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1604        assert_eq!(query_ctx.current_schema, "public");
1605        assert_eq!(query_ctx.timezone, "America/New_York");
1606        assert!(query_ctx.extensions.is_empty());
1607        assert_eq!(query_ctx.channel, 0);
1608
1609        // Test roundtrip conversion
1610        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1611        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1612    }
1613
1614    #[test]
1615    fn test_flow_query_context_conversion_from_query_context_ref() {
1616        use common_time::Timezone;
1617        use session::context::QueryContextBuilder;
1618
1619        let session_ctx = QueryContextBuilder::default()
1620            .current_catalog("session_catalog".to_string())
1621            .current_schema("session_schema".to_string())
1622            .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1623            .build();
1624
1625        let session_ctx_ref = Arc::new(session_ctx);
1626        let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1627
1628        assert_eq!(flow_ctx.catalog, "session_catalog");
1629        assert_eq!(flow_ctx.schema, "session_schema");
1630        assert_eq!(flow_ctx.timezone, "Europe/London");
1631    }
1632
1633    #[test]
1634    fn test_flow_query_context_serialization() {
1635        let flow_ctx = FlowQueryContext {
1636            catalog: "test_catalog".to_string(),
1637            schema: "test_schema".to_string(),
1638            timezone: "UTC".to_string(),
1639        };
1640
1641        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1642        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1643
1644        assert_eq!(flow_ctx, deserialized);
1645
1646        // Verify JSON structure
1647        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1648        assert_eq!(json_value["catalog"], "test_catalog");
1649        assert_eq!(json_value["schema"], "test_schema");
1650        assert_eq!(json_value["timezone"], "UTC");
1651    }
1652
1653    #[test]
1654    fn test_flow_query_context_conversion_to_pb() {
1655        let flow_ctx = FlowQueryContext {
1656            catalog: "pb_catalog".to_string(),
1657            schema: "pb_schema".to_string(),
1658            timezone: "Asia/Tokyo".to_string(),
1659        };
1660
1661        let pb_ctx: PbQueryContext = flow_ctx.into();
1662
1663        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1664        assert_eq!(pb_ctx.current_schema, "pb_schema");
1665        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1666        assert!(pb_ctx.extensions.is_empty());
1667        assert_eq!(pb_ctx.channel, 0);
1668        assert!(pb_ctx.snapshot_seqs.is_none());
1669        assert!(pb_ctx.explain.is_none());
1670    }
1671}