common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
22use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
23use api::v1::meta::ddl_task_request::Task;
24use api::v1::meta::{
25    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
26    AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
27    CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
28    CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
29    CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
30    DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
31    DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
32    DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
33    TruncateTableTask as PbTruncateTableTask,
34};
35use api::v1::{
36    AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
37    CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
38    DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
39    QueryContext as PbQueryContext, TruncateTableExpr,
40};
41use base64::Engine as _;
42use base64::engine::general_purpose;
43use common_error::ext::BoxedError;
44use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
45use prost::Message;
46use serde::{Deserialize, Serialize};
47use serde_with::{DefaultOnNull, serde_as};
48use session::context::{QueryContextBuilder, QueryContextRef};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{RawTableInfo, TableId};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57    InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61/// DDL tasks
62#[derive(Debug, Clone)]
63pub enum DdlTask {
64    CreateTable(CreateTableTask),
65    DropTable(DropTableTask),
66    AlterTable(AlterTableTask),
67    TruncateTable(TruncateTableTask),
68    CreateLogicalTables(Vec<CreateTableTask>),
69    DropLogicalTables(Vec<DropTableTask>),
70    AlterLogicalTables(Vec<AlterTableTask>),
71    CreateDatabase(CreateDatabaseTask),
72    DropDatabase(DropDatabaseTask),
73    AlterDatabase(AlterDatabaseTask),
74    CreateFlow(CreateFlowTask),
75    DropFlow(DropFlowTask),
76    #[cfg(feature = "enterprise")]
77    DropTrigger(trigger::DropTriggerTask),
78    CreateView(CreateViewTask),
79    DropView(DropViewTask),
80    #[cfg(feature = "enterprise")]
81    CreateTrigger(trigger::CreateTriggerTask),
82    CommentOn(CommentOnTask),
83}
84
85impl DdlTask {
86    /// Creates a [`DdlTask`] to create a flow.
87    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
88        DdlTask::CreateFlow(expr)
89    }
90
91    /// Creates a [`DdlTask`] to drop a flow.
92    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
93        DdlTask::DropFlow(expr)
94    }
95
96    /// Creates a [`DdlTask`] to drop a view.
97    pub fn new_drop_view(expr: DropViewTask) -> Self {
98        DdlTask::DropView(expr)
99    }
100
101    /// Creates a [`DdlTask`] to create a table.
102    pub fn new_create_table(
103        expr: CreateTableExpr,
104        partitions: Vec<Partition>,
105        table_info: RawTableInfo,
106    ) -> Self {
107        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
108    }
109
110    /// Creates a [`DdlTask`] to create several logical tables.
111    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
112        DdlTask::CreateLogicalTables(
113            table_data
114                .into_iter()
115                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
116                .collect(),
117        )
118    }
119
120    /// Creates a [`DdlTask`] to alter several logical tables.
121    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
122        DdlTask::AlterLogicalTables(
123            table_data
124                .into_iter()
125                .map(|alter_table| AlterTableTask { alter_table })
126                .collect(),
127        )
128    }
129
130    /// Creates a [`DdlTask`] to drop a table.
131    pub fn new_drop_table(
132        catalog: String,
133        schema: String,
134        table: String,
135        table_id: TableId,
136        drop_if_exists: bool,
137    ) -> Self {
138        DdlTask::DropTable(DropTableTask {
139            catalog,
140            schema,
141            table,
142            table_id,
143            drop_if_exists,
144        })
145    }
146
147    /// Creates a [`DdlTask`] to create a database.
148    pub fn new_create_database(
149        catalog: String,
150        schema: String,
151        create_if_not_exists: bool,
152        options: HashMap<String, String>,
153    ) -> Self {
154        DdlTask::CreateDatabase(CreateDatabaseTask {
155            catalog,
156            schema,
157            create_if_not_exists,
158            options,
159        })
160    }
161
162    /// Creates a [`DdlTask`] to drop a database.
163    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
164        DdlTask::DropDatabase(DropDatabaseTask {
165            catalog,
166            schema,
167            drop_if_exists,
168        })
169    }
170
171    /// Creates a [`DdlTask`] to alter a database.
172    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
173        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
174    }
175
176    /// Creates a [`DdlTask`] to alter a table.
177    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
178        DdlTask::AlterTable(AlterTableTask { alter_table })
179    }
180
181    /// Creates a [`DdlTask`] to truncate a table.
182    pub fn new_truncate_table(
183        catalog: String,
184        schema: String,
185        table: String,
186        table_id: TableId,
187        time_ranges: Vec<(Timestamp, Timestamp)>,
188    ) -> Self {
189        DdlTask::TruncateTable(TruncateTableTask {
190            catalog,
191            schema,
192            table,
193            table_id,
194            time_ranges,
195        })
196    }
197
198    /// Creates a [`DdlTask`] to create a view.
199    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
200        DdlTask::CreateView(CreateViewTask {
201            create_view,
202            view_info,
203        })
204    }
205
206    /// Creates a [`DdlTask`] to comment on a table, column, or flow.
207    pub fn new_comment_on(task: CommentOnTask) -> Self {
208        DdlTask::CommentOn(task)
209    }
210}
211
212impl TryFrom<Task> for DdlTask {
213    type Error = error::Error;
214    fn try_from(task: Task) -> Result<Self> {
215        match task {
216            Task::CreateTableTask(create_table) => {
217                Ok(DdlTask::CreateTable(create_table.try_into()?))
218            }
219            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
220            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
221            Task::TruncateTableTask(truncate_table) => {
222                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
223            }
224            Task::CreateTableTasks(create_tables) => {
225                let tasks = create_tables
226                    .tasks
227                    .into_iter()
228                    .map(|task| task.try_into())
229                    .collect::<Result<Vec<_>>>()?;
230
231                Ok(DdlTask::CreateLogicalTables(tasks))
232            }
233            Task::DropTableTasks(drop_tables) => {
234                let tasks = drop_tables
235                    .tasks
236                    .into_iter()
237                    .map(|task| task.try_into())
238                    .collect::<Result<Vec<_>>>()?;
239
240                Ok(DdlTask::DropLogicalTables(tasks))
241            }
242            Task::AlterTableTasks(alter_tables) => {
243                let tasks = alter_tables
244                    .tasks
245                    .into_iter()
246                    .map(|task| task.try_into())
247                    .collect::<Result<Vec<_>>>()?;
248
249                Ok(DdlTask::AlterLogicalTables(tasks))
250            }
251            Task::CreateDatabaseTask(create_database) => {
252                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
253            }
254            Task::DropDatabaseTask(drop_database) => {
255                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
256            }
257            Task::AlterDatabaseTask(alter_database) => {
258                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
259            }
260            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
261            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
262            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
263            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
264            Task::CreateTriggerTask(create_trigger) => {
265                #[cfg(feature = "enterprise")]
266                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
267                #[cfg(not(feature = "enterprise"))]
268                {
269                    let _ = create_trigger;
270                    crate::error::UnsupportedSnafu {
271                        operation: "create trigger",
272                    }
273                    .fail()
274                }
275            }
276            Task::DropTriggerTask(drop_trigger) => {
277                #[cfg(feature = "enterprise")]
278                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
279                #[cfg(not(feature = "enterprise"))]
280                {
281                    let _ = drop_trigger;
282                    crate::error::UnsupportedSnafu {
283                        operation: "drop trigger",
284                    }
285                    .fail()
286                }
287            }
288            Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
289        }
290    }
291}
292
293#[derive(Clone)]
294pub struct SubmitDdlTaskRequest {
295    pub query_context: QueryContextRef,
296    pub task: DdlTask,
297}
298
299impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
300    type Error = error::Error;
301
302    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
303        let task = match request.task {
304            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
305            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
306            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
307            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
308            DdlTask::CreateLogicalTables(tasks) => {
309                let tasks = tasks
310                    .into_iter()
311                    .map(|task| task.try_into())
312                    .collect::<Result<Vec<_>>>()?;
313
314                Task::CreateTableTasks(PbCreateTableTasks { tasks })
315            }
316            DdlTask::DropLogicalTables(tasks) => {
317                let tasks = tasks
318                    .into_iter()
319                    .map(|task| task.into())
320                    .collect::<Vec<_>>();
321
322                Task::DropTableTasks(PbDropTableTasks { tasks })
323            }
324            DdlTask::AlterLogicalTables(tasks) => {
325                let tasks = tasks
326                    .into_iter()
327                    .map(|task| task.try_into())
328                    .collect::<Result<Vec<_>>>()?;
329
330                Task::AlterTableTasks(PbAlterTableTasks { tasks })
331            }
332            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
333            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
334            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
335            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
336            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
337            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
338            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
339            #[cfg(feature = "enterprise")]
340            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
341            #[cfg(feature = "enterprise")]
342            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
343            DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
344        };
345
346        Ok(Self {
347            header: None,
348            query_context: Some((*request.query_context).clone().into()),
349            task: Some(task),
350        })
351    }
352}
353
354#[derive(Debug, Default)]
355pub struct SubmitDdlTaskResponse {
356    pub key: Vec<u8>,
357    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
358    pub table_ids: Vec<TableId>,
359}
360
361impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
362    type Error = error::Error;
363
364    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
365        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
366        Ok(Self {
367            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
368            table_ids,
369        })
370    }
371}
372
373impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
374    fn from(val: SubmitDdlTaskResponse) -> Self {
375        Self {
376            pid: Some(ProcedureId { key: val.key }),
377            table_ids: val
378                .table_ids
379                .into_iter()
380                .map(|id| api::v1::TableId { id })
381                .collect(),
382            ..Default::default()
383        }
384    }
385}
386
387/// A `CREATE VIEW` task.
388#[derive(Debug, PartialEq, Clone)]
389pub struct CreateViewTask {
390    pub create_view: CreateViewExpr,
391    pub view_info: RawTableInfo,
392}
393
394impl CreateViewTask {
395    /// Returns the [`TableReference`] of view.
396    pub fn table_ref(&self) -> TableReference<'_> {
397        TableReference {
398            catalog: &self.create_view.catalog_name,
399            schema: &self.create_view.schema_name,
400            table: &self.create_view.view_name,
401        }
402    }
403
404    /// Returns the encoded logical plan
405    pub fn raw_logical_plan(&self) -> &Vec<u8> {
406        &self.create_view.logical_plan
407    }
408
409    /// Returns the view definition in SQL
410    pub fn view_definition(&self) -> &str {
411        &self.create_view.definition
412    }
413
414    /// Returns the resolved table names in view's logical plan
415    pub fn table_names(&self) -> HashSet<TableName> {
416        self.create_view
417            .table_names
418            .iter()
419            .map(|t| t.clone().into())
420            .collect()
421    }
422
423    /// Returns the view's columns
424    pub fn columns(&self) -> &Vec<String> {
425        &self.create_view.columns
426    }
427
428    /// Returns the original logical plan's columns
429    pub fn plan_columns(&self) -> &Vec<String> {
430        &self.create_view.plan_columns
431    }
432}
433
434impl TryFrom<PbCreateViewTask> for CreateViewTask {
435    type Error = error::Error;
436
437    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
438        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
439
440        Ok(CreateViewTask {
441            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
442                err_msg: "expected create view",
443            })?,
444            view_info,
445        })
446    }
447}
448
449impl TryFrom<CreateViewTask> for PbCreateViewTask {
450    type Error = error::Error;
451
452    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
453        Ok(PbCreateViewTask {
454            create_view: Some(task.create_view),
455            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
456        })
457    }
458}
459
460impl Serialize for CreateViewTask {
461    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
462    where
463        S: serde::Serializer,
464    {
465        let view_info = serde_json::to_vec(&self.view_info)
466            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
467
468        let pb = PbCreateViewTask {
469            create_view: Some(self.create_view.clone()),
470            view_info,
471        };
472        let buf = pb.encode_to_vec();
473        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
474        serializer.serialize_str(&encoded)
475    }
476}
477
478impl<'de> Deserialize<'de> for CreateViewTask {
479    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
480    where
481        D: serde::Deserializer<'de>,
482    {
483        let encoded = String::deserialize(deserializer)?;
484        let buf = general_purpose::STANDARD_NO_PAD
485            .decode(encoded)
486            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
487        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
488            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
489
490        let expr = CreateViewTask::try_from(expr)
491            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
492
493        Ok(expr)
494    }
495}
496
497/// A `DROP VIEW` task.
498#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
499pub struct DropViewTask {
500    pub catalog: String,
501    pub schema: String,
502    pub view: String,
503    pub view_id: TableId,
504    pub drop_if_exists: bool,
505}
506
507impl DropViewTask {
508    /// Returns the [`TableReference`] of view.
509    pub fn table_ref(&self) -> TableReference<'_> {
510        TableReference {
511            catalog: &self.catalog,
512            schema: &self.schema,
513            table: &self.view,
514        }
515    }
516}
517
518impl TryFrom<PbDropViewTask> for DropViewTask {
519    type Error = error::Error;
520
521    fn try_from(pb: PbDropViewTask) -> Result<Self> {
522        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
523            err_msg: "expected drop view",
524        })?;
525
526        Ok(DropViewTask {
527            catalog: expr.catalog_name,
528            schema: expr.schema_name,
529            view: expr.view_name,
530            view_id: expr
531                .view_id
532                .context(error::InvalidProtoMsgSnafu {
533                    err_msg: "expected view_id",
534                })?
535                .id,
536            drop_if_exists: expr.drop_if_exists,
537        })
538    }
539}
540
541impl From<DropViewTask> for PbDropViewTask {
542    fn from(task: DropViewTask) -> Self {
543        PbDropViewTask {
544            drop_view: Some(DropViewExpr {
545                catalog_name: task.catalog,
546                schema_name: task.schema,
547                view_name: task.view,
548                view_id: Some(api::v1::TableId { id: task.view_id }),
549                drop_if_exists: task.drop_if_exists,
550            }),
551        }
552    }
553}
554
555#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
556pub struct DropTableTask {
557    pub catalog: String,
558    pub schema: String,
559    pub table: String,
560    pub table_id: TableId,
561    #[serde(default)]
562    pub drop_if_exists: bool,
563}
564
565impl DropTableTask {
566    pub fn table_ref(&self) -> TableReference<'_> {
567        TableReference {
568            catalog: &self.catalog,
569            schema: &self.schema,
570            table: &self.table,
571        }
572    }
573
574    pub fn table_name(&self) -> TableName {
575        TableName {
576            catalog_name: self.catalog.clone(),
577            schema_name: self.schema.clone(),
578            table_name: self.table.clone(),
579        }
580    }
581}
582
583impl TryFrom<PbDropTableTask> for DropTableTask {
584    type Error = error::Error;
585
586    fn try_from(pb: PbDropTableTask) -> Result<Self> {
587        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
588            err_msg: "expected drop table",
589        })?;
590
591        Ok(Self {
592            catalog: drop_table.catalog_name,
593            schema: drop_table.schema_name,
594            table: drop_table.table_name,
595            table_id: drop_table
596                .table_id
597                .context(error::InvalidProtoMsgSnafu {
598                    err_msg: "expected table_id",
599                })?
600                .id,
601            drop_if_exists: drop_table.drop_if_exists,
602        })
603    }
604}
605
606impl From<DropTableTask> for PbDropTableTask {
607    fn from(task: DropTableTask) -> Self {
608        PbDropTableTask {
609            drop_table: Some(DropTableExpr {
610                catalog_name: task.catalog,
611                schema_name: task.schema,
612                table_name: task.table,
613                table_id: Some(api::v1::TableId { id: task.table_id }),
614                drop_if_exists: task.drop_if_exists,
615            }),
616        }
617    }
618}
619
620#[derive(Debug, PartialEq, Clone)]
621pub struct CreateTableTask {
622    pub create_table: CreateTableExpr,
623    pub partitions: Vec<Partition>,
624    pub table_info: RawTableInfo,
625}
626
627impl TryFrom<PbCreateTableTask> for CreateTableTask {
628    type Error = error::Error;
629
630    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
631        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
632
633        Ok(CreateTableTask::new(
634            pb.create_table.context(error::InvalidProtoMsgSnafu {
635                err_msg: "expected create table",
636            })?,
637            pb.partitions,
638            table_info,
639        ))
640    }
641}
642
643impl TryFrom<CreateTableTask> for PbCreateTableTask {
644    type Error = error::Error;
645
646    fn try_from(task: CreateTableTask) -> Result<Self> {
647        Ok(PbCreateTableTask {
648            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
649            create_table: Some(task.create_table),
650            partitions: task.partitions,
651        })
652    }
653}
654
655impl CreateTableTask {
656    pub fn new(
657        expr: CreateTableExpr,
658        partitions: Vec<Partition>,
659        table_info: RawTableInfo,
660    ) -> CreateTableTask {
661        CreateTableTask {
662            create_table: expr,
663            partitions,
664            table_info,
665        }
666    }
667
668    pub fn table_name(&self) -> TableName {
669        let table = &self.create_table;
670
671        TableName {
672            catalog_name: table.catalog_name.clone(),
673            schema_name: table.schema_name.clone(),
674            table_name: table.table_name.clone(),
675        }
676    }
677
678    pub fn table_ref(&self) -> TableReference<'_> {
679        let table = &self.create_table;
680
681        TableReference {
682            catalog: &table.catalog_name,
683            schema: &table.schema_name,
684            table: &table.table_name,
685        }
686    }
687
688    /// Sets the `table_info`'s table_id.
689    pub fn set_table_id(&mut self, table_id: TableId) {
690        self.table_info.ident.table_id = table_id;
691    }
692
693    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
694    ///
695    /// This function won't do any check or verification. Caller should
696    /// ensure this task is valid.
697    pub fn sort_columns(&mut self) {
698        // sort create table expr
699        // sort column_defs by name
700        self.create_table
701            .column_defs
702            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
703
704        self.table_info.sort_columns();
705    }
706}
707
708impl Serialize for CreateTableTask {
709    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
710    where
711        S: serde::Serializer,
712    {
713        let table_info = serde_json::to_vec(&self.table_info)
714            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
715
716        let pb = PbCreateTableTask {
717            create_table: Some(self.create_table.clone()),
718            partitions: self.partitions.clone(),
719            table_info,
720        };
721        let buf = pb.encode_to_vec();
722        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
723        serializer.serialize_str(&encoded)
724    }
725}
726
727impl<'de> Deserialize<'de> for CreateTableTask {
728    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
729    where
730        D: serde::Deserializer<'de>,
731    {
732        let encoded = String::deserialize(deserializer)?;
733        let buf = general_purpose::STANDARD_NO_PAD
734            .decode(encoded)
735            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
736        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
737            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
738
739        let expr = CreateTableTask::try_from(expr)
740            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
741
742        Ok(expr)
743    }
744}
745
746#[derive(Debug, PartialEq, Clone)]
747pub struct AlterTableTask {
748    // TODO(CookiePieWw): Replace proto struct with user-defined struct
749    pub alter_table: AlterTableExpr,
750}
751
752impl AlterTableTask {
753    pub fn validate(&self) -> Result<()> {
754        self.alter_table
755            .kind
756            .as_ref()
757            .context(error::UnexpectedSnafu {
758                err_msg: "'kind' is absent",
759            })?;
760        Ok(())
761    }
762
763    pub fn table_ref(&self) -> TableReference<'_> {
764        TableReference {
765            catalog: &self.alter_table.catalog_name,
766            schema: &self.alter_table.schema_name,
767            table: &self.alter_table.table_name,
768        }
769    }
770
771    pub fn table_name(&self) -> TableName {
772        let table = &self.alter_table;
773
774        TableName {
775            catalog_name: table.catalog_name.clone(),
776            schema_name: table.schema_name.clone(),
777            table_name: table.table_name.clone(),
778        }
779    }
780}
781
782impl TryFrom<PbAlterTableTask> for AlterTableTask {
783    type Error = error::Error;
784
785    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
786        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
787            err_msg: "expected alter_table",
788        })?;
789
790        Ok(AlterTableTask { alter_table })
791    }
792}
793
794impl TryFrom<AlterTableTask> for PbAlterTableTask {
795    type Error = error::Error;
796
797    fn try_from(task: AlterTableTask) -> Result<Self> {
798        Ok(PbAlterTableTask {
799            alter_table: Some(task.alter_table),
800        })
801    }
802}
803
804impl Serialize for AlterTableTask {
805    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
806    where
807        S: serde::Serializer,
808    {
809        let pb = PbAlterTableTask {
810            alter_table: Some(self.alter_table.clone()),
811        };
812        let buf = pb.encode_to_vec();
813        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
814        serializer.serialize_str(&encoded)
815    }
816}
817
818impl<'de> Deserialize<'de> for AlterTableTask {
819    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
820    where
821        D: serde::Deserializer<'de>,
822    {
823        let encoded = String::deserialize(deserializer)?;
824        let buf = general_purpose::STANDARD_NO_PAD
825            .decode(encoded)
826            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
827        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
828            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
829
830        let expr = AlterTableTask::try_from(expr)
831            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
832
833        Ok(expr)
834    }
835}
836
837#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
838pub struct TruncateTableTask {
839    pub catalog: String,
840    pub schema: String,
841    pub table: String,
842    pub table_id: TableId,
843    pub time_ranges: Vec<(Timestamp, Timestamp)>,
844}
845
846impl TruncateTableTask {
847    pub fn table_ref(&self) -> TableReference<'_> {
848        TableReference {
849            catalog: &self.catalog,
850            schema: &self.schema,
851            table: &self.table,
852        }
853    }
854
855    pub fn table_name(&self) -> TableName {
856        TableName {
857            catalog_name: self.catalog.clone(),
858            schema_name: self.schema.clone(),
859            table_name: self.table.clone(),
860        }
861    }
862}
863
864impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
865    type Error = error::Error;
866
867    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
868        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
869            err_msg: "expected truncate table",
870        })?;
871
872        Ok(Self {
873            catalog: truncate_table.catalog_name,
874            schema: truncate_table.schema_name,
875            table: truncate_table.table_name,
876            table_id: truncate_table
877                .table_id
878                .context(error::InvalidProtoMsgSnafu {
879                    err_msg: "expected table_id",
880                })?
881                .id,
882            time_ranges: truncate_table
883                .time_ranges
884                .map(from_pb_time_ranges)
885                .transpose()
886                .map_err(BoxedError::new)
887                .context(ExternalSnafu)?
888                .unwrap_or_default(),
889        })
890    }
891}
892
893impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
894    type Error = error::Error;
895
896    fn try_from(task: TruncateTableTask) -> Result<Self> {
897        Ok(PbTruncateTableTask {
898            truncate_table: Some(TruncateTableExpr {
899                catalog_name: task.catalog,
900                schema_name: task.schema,
901                table_name: task.table,
902                table_id: Some(api::v1::TableId { id: task.table_id }),
903                time_ranges: Some(
904                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
905                ),
906            }),
907        })
908    }
909}
910
911#[serde_as]
912#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
913pub struct CreateDatabaseTask {
914    pub catalog: String,
915    pub schema: String,
916    pub create_if_not_exists: bool,
917    #[serde_as(deserialize_as = "DefaultOnNull")]
918    pub options: HashMap<String, String>,
919}
920
921impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
922    type Error = error::Error;
923
924    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
925        let CreateDatabaseExpr {
926            catalog_name,
927            schema_name,
928            create_if_not_exists,
929            options,
930        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
931            err_msg: "expected create database",
932        })?;
933
934        Ok(CreateDatabaseTask {
935            catalog: catalog_name,
936            schema: schema_name,
937            create_if_not_exists,
938            options,
939        })
940    }
941}
942
943impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
944    type Error = error::Error;
945
946    fn try_from(
947        CreateDatabaseTask {
948            catalog,
949            schema,
950            create_if_not_exists,
951            options,
952        }: CreateDatabaseTask,
953    ) -> Result<Self> {
954        Ok(PbCreateDatabaseTask {
955            create_database: Some(CreateDatabaseExpr {
956                catalog_name: catalog,
957                schema_name: schema,
958                create_if_not_exists,
959                options,
960            }),
961        })
962    }
963}
964
965#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
966pub struct DropDatabaseTask {
967    pub catalog: String,
968    pub schema: String,
969    pub drop_if_exists: bool,
970}
971
972impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
973    type Error = error::Error;
974
975    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
976        let DropDatabaseExpr {
977            catalog_name,
978            schema_name,
979            drop_if_exists,
980        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
981            err_msg: "expected drop database",
982        })?;
983
984        Ok(DropDatabaseTask {
985            catalog: catalog_name,
986            schema: schema_name,
987            drop_if_exists,
988        })
989    }
990}
991
992impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
993    type Error = error::Error;
994
995    fn try_from(
996        DropDatabaseTask {
997            catalog,
998            schema,
999            drop_if_exists,
1000        }: DropDatabaseTask,
1001    ) -> Result<Self> {
1002        Ok(PbDropDatabaseTask {
1003            drop_database: Some(DropDatabaseExpr {
1004                catalog_name: catalog,
1005                schema_name: schema,
1006                drop_if_exists,
1007            }),
1008        })
1009    }
1010}
1011
1012#[derive(Debug, PartialEq, Clone)]
1013pub struct AlterDatabaseTask {
1014    pub alter_expr: AlterDatabaseExpr,
1015}
1016
1017impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1018    type Error = error::Error;
1019
1020    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1021        Ok(PbAlterDatabaseTask {
1022            task: Some(task.alter_expr),
1023        })
1024    }
1025}
1026
1027impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1028    type Error = error::Error;
1029
1030    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1031        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1032            err_msg: "expected alter database",
1033        })?;
1034
1035        Ok(AlterDatabaseTask { alter_expr })
1036    }
1037}
1038
1039impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1040    type Error = error::Error;
1041
1042    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1043        match pb {
1044            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1045                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1046                    options
1047                        .set_database_options
1048                        .into_iter()
1049                        .map(SetDatabaseOption::try_from)
1050                        .collect::<Result<Vec<_>>>()?,
1051                )))
1052            }
1053            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1054                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1055                    options
1056                        .keys
1057                        .iter()
1058                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1059                        .collect::<Result<Vec<_>>>()?,
1060                )),
1061            ),
1062        }
1063    }
1064}
1065
1066const TTL_KEY: &str = "ttl";
1067
1068impl TryFrom<PbOption> for SetDatabaseOption {
1069    type Error = error::Error;
1070
1071    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1072        let key_lower = key.to_ascii_lowercase();
1073        match key_lower.as_str() {
1074            TTL_KEY => {
1075                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1076                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1077
1078                Ok(SetDatabaseOption::Ttl(ttl))
1079            }
1080            _ => {
1081                if validate_database_option(&key_lower) {
1082                    Ok(SetDatabaseOption::Other(key_lower, value))
1083                } else {
1084                    InvalidSetDatabaseOptionSnafu { key, value }.fail()
1085                }
1086            }
1087        }
1088    }
1089}
1090
1091#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1092pub enum SetDatabaseOption {
1093    Ttl(DatabaseTimeToLive),
1094    Other(String, String),
1095}
1096
1097#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1098pub enum UnsetDatabaseOption {
1099    Ttl,
1100    Other(String),
1101}
1102
1103impl TryFrom<&str> for UnsetDatabaseOption {
1104    type Error = error::Error;
1105
1106    fn try_from(key: &str) -> Result<Self> {
1107        let key_lower = key.to_ascii_lowercase();
1108        match key_lower.as_str() {
1109            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1110            _ => {
1111                if validate_database_option(&key_lower) {
1112                    Ok(UnsetDatabaseOption::Other(key_lower))
1113                } else {
1114                    InvalidUnsetDatabaseOptionSnafu { key }.fail()
1115                }
1116            }
1117        }
1118    }
1119}
1120
1121#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1122pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1123
1124#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1125pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1126
1127#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1128pub enum AlterDatabaseKind {
1129    SetDatabaseOptions(SetDatabaseOptions),
1130    UnsetDatabaseOptions(UnsetDatabaseOptions),
1131}
1132
1133impl AlterDatabaseTask {
1134    pub fn catalog(&self) -> &str {
1135        &self.alter_expr.catalog_name
1136    }
1137
1138    pub fn schema(&self) -> &str {
1139        &self.alter_expr.catalog_name
1140    }
1141}
1142
1143/// Create flow
1144#[derive(Debug, Clone, Serialize, Deserialize)]
1145pub struct CreateFlowTask {
1146    pub catalog_name: String,
1147    pub flow_name: String,
1148    pub source_table_names: Vec<TableName>,
1149    pub sink_table_name: TableName,
1150    pub or_replace: bool,
1151    pub create_if_not_exists: bool,
1152    /// Duration in seconds. Data older than this duration will not be used.
1153    pub expire_after: Option<i64>,
1154    pub eval_interval_secs: Option<i64>,
1155    pub comment: String,
1156    pub sql: String,
1157    pub flow_options: HashMap<String, String>,
1158}
1159
1160impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1161    type Error = error::Error;
1162
1163    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1164        let CreateFlowExpr {
1165            catalog_name,
1166            flow_name,
1167            source_table_names,
1168            sink_table_name,
1169            or_replace,
1170            create_if_not_exists,
1171            expire_after,
1172            eval_interval,
1173            comment,
1174            sql,
1175            flow_options,
1176        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1177            err_msg: "expected create_flow",
1178        })?;
1179
1180        Ok(CreateFlowTask {
1181            catalog_name,
1182            flow_name,
1183            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1184            sink_table_name: sink_table_name
1185                .context(error::InvalidProtoMsgSnafu {
1186                    err_msg: "expected sink_table_name",
1187                })?
1188                .into(),
1189            or_replace,
1190            create_if_not_exists,
1191            expire_after: expire_after.map(|e| e.value),
1192            eval_interval_secs: eval_interval.map(|e| e.seconds),
1193            comment,
1194            sql,
1195            flow_options,
1196        })
1197    }
1198}
1199
1200impl From<CreateFlowTask> for PbCreateFlowTask {
1201    fn from(
1202        CreateFlowTask {
1203            catalog_name,
1204            flow_name,
1205            source_table_names,
1206            sink_table_name,
1207            or_replace,
1208            create_if_not_exists,
1209            expire_after,
1210            eval_interval_secs: eval_interval,
1211            comment,
1212            sql,
1213            flow_options,
1214        }: CreateFlowTask,
1215    ) -> Self {
1216        PbCreateFlowTask {
1217            create_flow: Some(CreateFlowExpr {
1218                catalog_name,
1219                flow_name,
1220                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1221                sink_table_name: Some(sink_table_name.into()),
1222                or_replace,
1223                create_if_not_exists,
1224                expire_after: expire_after.map(|value| ExpireAfter { value }),
1225                eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1226                comment,
1227                sql,
1228                flow_options,
1229            }),
1230        }
1231    }
1232}
1233
1234/// Drop flow
1235#[derive(Debug, Clone, Serialize, Deserialize)]
1236pub struct DropFlowTask {
1237    pub catalog_name: String,
1238    pub flow_name: String,
1239    pub flow_id: FlowId,
1240    pub drop_if_exists: bool,
1241}
1242
1243impl TryFrom<PbDropFlowTask> for DropFlowTask {
1244    type Error = error::Error;
1245
1246    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1247        let DropFlowExpr {
1248            catalog_name,
1249            flow_name,
1250            flow_id,
1251            drop_if_exists,
1252        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1253            err_msg: "expected drop_flow",
1254        })?;
1255        let flow_id = flow_id
1256            .context(error::InvalidProtoMsgSnafu {
1257                err_msg: "expected flow_id",
1258            })?
1259            .id;
1260        Ok(DropFlowTask {
1261            catalog_name,
1262            flow_name,
1263            flow_id,
1264            drop_if_exists,
1265        })
1266    }
1267}
1268
1269impl From<DropFlowTask> for PbDropFlowTask {
1270    fn from(
1271        DropFlowTask {
1272            catalog_name,
1273            flow_name,
1274            flow_id,
1275            drop_if_exists,
1276        }: DropFlowTask,
1277    ) -> Self {
1278        PbDropFlowTask {
1279            drop_flow: Some(DropFlowExpr {
1280                catalog_name,
1281                flow_name,
1282                flow_id: Some(api::v1::FlowId { id: flow_id }),
1283                drop_if_exists,
1284            }),
1285        }
1286    }
1287}
1288
1289/// Represents the ID of the object being commented on (Table or Flow).
1290#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1291pub enum CommentObjectId {
1292    Table(TableId),
1293    Flow(FlowId),
1294}
1295
1296/// Comment on table, column, or flow
1297#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1298pub struct CommentOnTask {
1299    pub catalog_name: String,
1300    pub schema_name: String,
1301    pub object_type: CommentObjectType,
1302    pub object_name: String,
1303    /// Column name (only for Column comments)
1304    pub column_name: Option<String>,
1305    /// Object ID (Table or Flow) for validation and cache invalidation
1306    pub object_id: Option<CommentObjectId>,
1307    pub comment: Option<String>,
1308}
1309
1310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1311pub enum CommentObjectType {
1312    Table,
1313    Column,
1314    Flow,
1315}
1316
1317impl CommentOnTask {
1318    pub fn table_ref(&self) -> TableReference<'_> {
1319        TableReference {
1320            catalog: &self.catalog_name,
1321            schema: &self.schema_name,
1322            table: &self.object_name,
1323        }
1324    }
1325}
1326
1327// Proto conversions for CommentObjectType
1328impl From<CommentObjectType> for PbCommentObjectType {
1329    fn from(object_type: CommentObjectType) -> Self {
1330        match object_type {
1331            CommentObjectType::Table => PbCommentObjectType::Table,
1332            CommentObjectType::Column => PbCommentObjectType::Column,
1333            CommentObjectType::Flow => PbCommentObjectType::Flow,
1334        }
1335    }
1336}
1337
1338impl TryFrom<i32> for CommentObjectType {
1339    type Error = error::Error;
1340
1341    fn try_from(value: i32) -> Result<Self> {
1342        match value {
1343            0 => Ok(CommentObjectType::Table),
1344            1 => Ok(CommentObjectType::Column),
1345            2 => Ok(CommentObjectType::Flow),
1346            _ => error::InvalidProtoMsgSnafu {
1347                err_msg: format!(
1348                    "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1349                    value
1350                ),
1351            }
1352            .fail(),
1353        }
1354    }
1355}
1356
1357// Proto conversions for CommentOnTask
1358impl TryFrom<PbCommentOnTask> for CommentOnTask {
1359    type Error = error::Error;
1360
1361    fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1362        let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1363            err_msg: "expected comment_on",
1364        })?;
1365
1366        Ok(CommentOnTask {
1367            catalog_name: comment_on.catalog_name,
1368            schema_name: comment_on.schema_name,
1369            object_type: comment_on.object_type.try_into()?,
1370            object_name: comment_on.object_name,
1371            column_name: if comment_on.column_name.is_empty() {
1372                None
1373            } else {
1374                Some(comment_on.column_name)
1375            },
1376            comment: if comment_on.comment.is_empty() {
1377                None
1378            } else {
1379                Some(comment_on.comment)
1380            },
1381            object_id: None,
1382        })
1383    }
1384}
1385
1386impl From<CommentOnTask> for PbCommentOnTask {
1387    fn from(task: CommentOnTask) -> Self {
1388        let pb_object_type: PbCommentObjectType = task.object_type.into();
1389        PbCommentOnTask {
1390            comment_on: Some(CommentOnExpr {
1391                catalog_name: task.catalog_name,
1392                schema_name: task.schema_name,
1393                object_type: pb_object_type as i32,
1394                object_name: task.object_name,
1395                column_name: task.column_name.unwrap_or_default(),
1396                comment: task.comment.unwrap_or_default(),
1397            }),
1398        }
1399    }
1400}
1401
1402#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1403pub struct QueryContext {
1404    pub(crate) current_catalog: String,
1405    pub(crate) current_schema: String,
1406    pub(crate) timezone: String,
1407    pub(crate) extensions: HashMap<String, String>,
1408    pub(crate) channel: u8,
1409}
1410
1411impl QueryContext {
1412    /// Get the current catalog
1413    pub fn current_catalog(&self) -> &str {
1414        &self.current_catalog
1415    }
1416
1417    /// Get the current schema
1418    pub fn current_schema(&self) -> &str {
1419        &self.current_schema
1420    }
1421
1422    /// Get the timezone
1423    pub fn timezone(&self) -> &str {
1424        &self.timezone
1425    }
1426
1427    /// Get the extensions
1428    pub fn extensions(&self) -> &HashMap<String, String> {
1429        &self.extensions
1430    }
1431
1432    /// Get the channel
1433    pub fn channel(&self) -> u8 {
1434        self.channel
1435    }
1436}
1437
1438/// Lightweight query context for flow operations containing only essential fields.
1439/// This is a subset of QueryContext that includes only the fields actually needed
1440/// for flow creation and execution.
1441#[derive(Debug, Clone, Serialize, PartialEq)]
1442pub struct FlowQueryContext {
1443    /// Current catalog name - needed for flow metadata and recovery
1444    pub(crate) catalog: String,
1445    /// Current schema name - needed for table resolution during flow execution
1446    pub(crate) schema: String,
1447    /// Timezone for timestamp operations in the flow
1448    pub(crate) timezone: String,
1449}
1450
1451impl<'de> Deserialize<'de> for FlowQueryContext {
1452    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1453    where
1454        D: serde::Deserializer<'de>,
1455    {
1456        // Support both QueryContext format and FlowQueryContext format
1457        #[derive(Deserialize)]
1458        #[serde(untagged)]
1459        enum ContextCompat {
1460            Flow(FlowQueryContextHelper),
1461            Full(QueryContext),
1462        }
1463
1464        #[derive(Deserialize)]
1465        struct FlowQueryContextHelper {
1466            catalog: String,
1467            schema: String,
1468            timezone: String,
1469        }
1470
1471        match ContextCompat::deserialize(deserializer)? {
1472            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1473                catalog: helper.catalog,
1474                schema: helper.schema,
1475                timezone: helper.timezone,
1476            }),
1477            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1478        }
1479    }
1480}
1481
1482impl From<QueryContextRef> for QueryContext {
1483    fn from(query_context: QueryContextRef) -> Self {
1484        QueryContext {
1485            current_catalog: query_context.current_catalog().to_string(),
1486            current_schema: query_context.current_schema().clone(),
1487            timezone: query_context.timezone().to_string(),
1488            extensions: query_context.extensions(),
1489            channel: query_context.channel() as u8,
1490        }
1491    }
1492}
1493
1494impl TryFrom<QueryContext> for session::context::QueryContext {
1495    type Error = error::Error;
1496    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1497        Ok(QueryContextBuilder::default()
1498            .current_catalog(value.current_catalog)
1499            .current_schema(value.current_schema)
1500            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1501            .extensions(value.extensions)
1502            .channel((value.channel as u32).into())
1503            .build())
1504    }
1505}
1506
1507impl From<QueryContext> for PbQueryContext {
1508    fn from(
1509        QueryContext {
1510            current_catalog,
1511            current_schema,
1512            timezone,
1513            extensions,
1514            channel,
1515        }: QueryContext,
1516    ) -> Self {
1517        PbQueryContext {
1518            current_catalog,
1519            current_schema,
1520            timezone,
1521            extensions,
1522            channel: channel as u32,
1523            snapshot_seqs: None,
1524            explain: None,
1525        }
1526    }
1527}
1528
1529impl From<QueryContext> for FlowQueryContext {
1530    fn from(ctx: QueryContext) -> Self {
1531        Self {
1532            catalog: ctx.current_catalog,
1533            schema: ctx.current_schema,
1534            timezone: ctx.timezone,
1535        }
1536    }
1537}
1538
1539impl From<QueryContextRef> for FlowQueryContext {
1540    fn from(ctx: QueryContextRef) -> Self {
1541        Self {
1542            catalog: ctx.current_catalog().to_string(),
1543            schema: ctx.current_schema().clone(),
1544            timezone: ctx.timezone().to_string(),
1545        }
1546    }
1547}
1548
1549impl From<FlowQueryContext> for QueryContext {
1550    fn from(flow_ctx: FlowQueryContext) -> Self {
1551        Self {
1552            current_catalog: flow_ctx.catalog,
1553            current_schema: flow_ctx.schema,
1554            timezone: flow_ctx.timezone,
1555            extensions: HashMap::new(),
1556            channel: 0, // Use default channel for flows
1557        }
1558    }
1559}
1560
1561impl From<FlowQueryContext> for PbQueryContext {
1562    fn from(flow_ctx: FlowQueryContext) -> Self {
1563        let query_ctx: QueryContext = flow_ctx.into();
1564        query_ctx.into()
1565    }
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570    use std::sync::Arc;
1571
1572    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1573    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1574    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1575    use store_api::storage::ConcreteDataType;
1576    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1577    use table::test_util::table_info::test_table_info;
1578
1579    use super::{AlterTableTask, CreateTableTask, *};
1580
1581    #[test]
1582    fn test_basic_ser_de_create_table_task() {
1583        let schema = SchemaBuilder::default().build().unwrap();
1584        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1585        let task = CreateTableTask::new(
1586            CreateTableExpr::default(),
1587            Vec::new(),
1588            RawTableInfo::from(table_info),
1589        );
1590
1591        let output = serde_json::to_vec(&task).unwrap();
1592
1593        let de = serde_json::from_slice(&output).unwrap();
1594        assert_eq!(task, de);
1595    }
1596
1597    #[test]
1598    fn test_basic_ser_de_alter_table_task() {
1599        let task = AlterTableTask {
1600            alter_table: AlterTableExpr::default(),
1601        };
1602
1603        let output = serde_json::to_vec(&task).unwrap();
1604
1605        let de = serde_json::from_slice(&output).unwrap();
1606        assert_eq!(task, de);
1607    }
1608
1609    #[test]
1610    fn test_sort_columns() {
1611        // construct RawSchema
1612        let raw_schema = RawSchema {
1613            column_schemas: vec![
1614                ColumnSchema::new(
1615                    "column3".to_string(),
1616                    ConcreteDataType::string_datatype(),
1617                    true,
1618                ),
1619                ColumnSchema::new(
1620                    "column1".to_string(),
1621                    ConcreteDataType::timestamp_millisecond_datatype(),
1622                    false,
1623                )
1624                .with_time_index(true),
1625                ColumnSchema::new(
1626                    "column2".to_string(),
1627                    ConcreteDataType::float64_datatype(),
1628                    true,
1629                ),
1630            ],
1631            timestamp_index: Some(1),
1632            version: 0,
1633        };
1634
1635        // construct RawTableMeta
1636        let raw_table_meta = RawTableMeta {
1637            schema: raw_schema,
1638            primary_key_indices: vec![0],
1639            value_indices: vec![2],
1640            engine: METRIC_ENGINE_NAME.to_string(),
1641            next_column_id: 0,
1642            options: Default::default(),
1643            created_on: Default::default(),
1644            updated_on: Default::default(),
1645            partition_key_indices: Default::default(),
1646            column_ids: Default::default(),
1647        };
1648
1649        // construct RawTableInfo
1650        let raw_table_info = RawTableInfo {
1651            ident: Default::default(),
1652            meta: raw_table_meta,
1653            name: Default::default(),
1654            desc: Default::default(),
1655            catalog_name: Default::default(),
1656            schema_name: Default::default(),
1657            table_type: TableType::Base,
1658        };
1659
1660        // construct create table expr
1661        let create_table_expr = CreateTableExpr {
1662            column_defs: vec![
1663                ColumnDef {
1664                    name: "column3".to_string(),
1665                    semantic_type: SemanticType::Tag as i32,
1666                    ..Default::default()
1667                },
1668                ColumnDef {
1669                    name: "column1".to_string(),
1670                    semantic_type: SemanticType::Timestamp as i32,
1671                    ..Default::default()
1672                },
1673                ColumnDef {
1674                    name: "column2".to_string(),
1675                    semantic_type: SemanticType::Field as i32,
1676                    ..Default::default()
1677                },
1678            ],
1679            primary_keys: vec!["column3".to_string()],
1680            ..Default::default()
1681        };
1682
1683        let mut create_table_task =
1684            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1685
1686        // Call the sort_columns method
1687        create_table_task.sort_columns();
1688
1689        // Assert that the columns are sorted correctly
1690        assert_eq!(
1691            create_table_task.create_table.column_defs[0].name,
1692            "column1".to_string()
1693        );
1694        assert_eq!(
1695            create_table_task.create_table.column_defs[1].name,
1696            "column2".to_string()
1697        );
1698        assert_eq!(
1699            create_table_task.create_table.column_defs[2].name,
1700            "column3".to_string()
1701        );
1702
1703        // Assert that the table_info is updated correctly
1704        assert_eq!(
1705            create_table_task.table_info.meta.schema.timestamp_index,
1706            Some(0)
1707        );
1708        assert_eq!(
1709            create_table_task.table_info.meta.primary_key_indices,
1710            vec![2]
1711        );
1712        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1713    }
1714
1715    #[test]
1716    fn test_flow_query_context_conversion_from_query_context() {
1717        use std::collections::HashMap;
1718        let mut extensions = HashMap::new();
1719        extensions.insert("key1".to_string(), "value1".to_string());
1720        extensions.insert("key2".to_string(), "value2".to_string());
1721
1722        let query_ctx = QueryContext {
1723            current_catalog: "test_catalog".to_string(),
1724            current_schema: "test_schema".to_string(),
1725            timezone: "UTC".to_string(),
1726            extensions,
1727            channel: 5,
1728        };
1729
1730        let flow_ctx: FlowQueryContext = query_ctx.into();
1731
1732        assert_eq!(flow_ctx.catalog, "test_catalog");
1733        assert_eq!(flow_ctx.schema, "test_schema");
1734        assert_eq!(flow_ctx.timezone, "UTC");
1735    }
1736
1737    #[test]
1738    fn test_flow_query_context_conversion_to_query_context() {
1739        let flow_ctx = FlowQueryContext {
1740            catalog: "prod_catalog".to_string(),
1741            schema: "public".to_string(),
1742            timezone: "America/New_York".to_string(),
1743        };
1744
1745        let query_ctx: QueryContext = flow_ctx.clone().into();
1746
1747        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1748        assert_eq!(query_ctx.current_schema, "public");
1749        assert_eq!(query_ctx.timezone, "America/New_York");
1750        assert!(query_ctx.extensions.is_empty());
1751        assert_eq!(query_ctx.channel, 0);
1752
1753        // Test roundtrip conversion
1754        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1755        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1756    }
1757
1758    #[test]
1759    fn test_flow_query_context_conversion_from_query_context_ref() {
1760        use common_time::Timezone;
1761        use session::context::QueryContextBuilder;
1762
1763        let session_ctx = QueryContextBuilder::default()
1764            .current_catalog("session_catalog".to_string())
1765            .current_schema("session_schema".to_string())
1766            .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1767            .build();
1768
1769        let session_ctx_ref = Arc::new(session_ctx);
1770        let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1771
1772        assert_eq!(flow_ctx.catalog, "session_catalog");
1773        assert_eq!(flow_ctx.schema, "session_schema");
1774        assert_eq!(flow_ctx.timezone, "Europe/London");
1775    }
1776
1777    #[test]
1778    fn test_flow_query_context_serialization() {
1779        let flow_ctx = FlowQueryContext {
1780            catalog: "test_catalog".to_string(),
1781            schema: "test_schema".to_string(),
1782            timezone: "UTC".to_string(),
1783        };
1784
1785        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1786        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1787
1788        assert_eq!(flow_ctx, deserialized);
1789
1790        // Verify JSON structure
1791        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1792        assert_eq!(json_value["catalog"], "test_catalog");
1793        assert_eq!(json_value["schema"], "test_schema");
1794        assert_eq!(json_value["timezone"], "UTC");
1795    }
1796
1797    #[test]
1798    fn test_flow_query_context_conversion_to_pb() {
1799        let flow_ctx = FlowQueryContext {
1800            catalog: "pb_catalog".to_string(),
1801            schema: "pb_schema".to_string(),
1802            timezone: "Asia/Tokyo".to_string(),
1803        };
1804
1805        let pb_ctx: PbQueryContext = flow_ctx.into();
1806
1807        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1808        assert_eq!(pb_ctx.current_schema, "pb_schema");
1809        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1810        assert!(pb_ctx.extensions.is_empty());
1811        assert_eq!(pb_ctx.channel, 0);
1812        assert!(pb_ctx.snapshot_seqs.is_none());
1813        assert!(pb_ctx.explain.is_none());
1814    }
1815}