common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27    AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28    CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29    CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30    CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31    DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32    DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33    DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34    TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37    AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38    CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39    DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40    QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{TableId, TableInfo};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57    InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61/// DDL tasks
62#[derive(Debug, Clone)]
63pub enum DdlTask {
64    CreateTable(CreateTableTask),
65    DropTable(DropTableTask),
66    AlterTable(AlterTableTask),
67    TruncateTable(TruncateTableTask),
68    CreateLogicalTables(Vec<CreateTableTask>),
69    DropLogicalTables(Vec<DropTableTask>),
70    AlterLogicalTables(Vec<AlterTableTask>),
71    CreateDatabase(CreateDatabaseTask),
72    DropDatabase(DropDatabaseTask),
73    AlterDatabase(AlterDatabaseTask),
74    CreateFlow(CreateFlowTask),
75    DropFlow(DropFlowTask),
76    #[cfg(feature = "enterprise")]
77    DropTrigger(trigger::DropTriggerTask),
78    CreateView(CreateViewTask),
79    DropView(DropViewTask),
80    #[cfg(feature = "enterprise")]
81    CreateTrigger(trigger::CreateTriggerTask),
82    CommentOn(CommentOnTask),
83}
84
85impl DdlTask {
86    /// Creates a [`DdlTask`] to create a flow.
87    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
88        DdlTask::CreateFlow(expr)
89    }
90
91    /// Creates a [`DdlTask`] to drop a flow.
92    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
93        DdlTask::DropFlow(expr)
94    }
95
96    /// Creates a [`DdlTask`] to drop a view.
97    pub fn new_drop_view(expr: DropViewTask) -> Self {
98        DdlTask::DropView(expr)
99    }
100
101    /// Creates a [`DdlTask`] to create a table.
102    pub fn new_create_table(
103        expr: CreateTableExpr,
104        partitions: Vec<Partition>,
105        table_info: TableInfo,
106    ) -> Self {
107        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
108    }
109
110    /// Creates a [`DdlTask`] to create several logical tables.
111    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self {
112        DdlTask::CreateLogicalTables(
113            table_data
114                .into_iter()
115                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
116                .collect(),
117        )
118    }
119
120    /// Creates a [`DdlTask`] to alter several logical tables.
121    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
122        DdlTask::AlterLogicalTables(
123            table_data
124                .into_iter()
125                .map(|alter_table| AlterTableTask { alter_table })
126                .collect(),
127        )
128    }
129
130    /// Creates a [`DdlTask`] to drop a table.
131    pub fn new_drop_table(
132        catalog: String,
133        schema: String,
134        table: String,
135        table_id: TableId,
136        drop_if_exists: bool,
137    ) -> Self {
138        DdlTask::DropTable(DropTableTask {
139            catalog,
140            schema,
141            table,
142            table_id,
143            drop_if_exists,
144        })
145    }
146
147    /// Creates a [`DdlTask`] to create a database.
148    pub fn new_create_database(
149        catalog: String,
150        schema: String,
151        create_if_not_exists: bool,
152        options: HashMap<String, String>,
153    ) -> Self {
154        DdlTask::CreateDatabase(CreateDatabaseTask {
155            catalog,
156            schema,
157            create_if_not_exists,
158            options,
159        })
160    }
161
162    /// Creates a [`DdlTask`] to drop a database.
163    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
164        DdlTask::DropDatabase(DropDatabaseTask {
165            catalog,
166            schema,
167            drop_if_exists,
168        })
169    }
170
171    /// Creates a [`DdlTask`] to alter a database.
172    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
173        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
174    }
175
176    /// Creates a [`DdlTask`] to alter a table.
177    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
178        DdlTask::AlterTable(AlterTableTask { alter_table })
179    }
180
181    /// Creates a [`DdlTask`] to truncate a table.
182    pub fn new_truncate_table(
183        catalog: String,
184        schema: String,
185        table: String,
186        table_id: TableId,
187        time_ranges: Vec<(Timestamp, Timestamp)>,
188    ) -> Self {
189        DdlTask::TruncateTable(TruncateTableTask {
190            catalog,
191            schema,
192            table,
193            table_id,
194            time_ranges,
195        })
196    }
197
198    /// Creates a [`DdlTask`] to create a view.
199    pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self {
200        DdlTask::CreateView(CreateViewTask {
201            create_view,
202            view_info,
203        })
204    }
205
206    /// Creates a [`DdlTask`] to comment on a table, column, or flow.
207    pub fn new_comment_on(task: CommentOnTask) -> Self {
208        DdlTask::CommentOn(task)
209    }
210}
211
212impl TryFrom<Task> for DdlTask {
213    type Error = error::Error;
214    fn try_from(task: Task) -> Result<Self> {
215        match task {
216            Task::CreateTableTask(create_table) => {
217                Ok(DdlTask::CreateTable(create_table.try_into()?))
218            }
219            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
220            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
221            Task::TruncateTableTask(truncate_table) => {
222                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
223            }
224            Task::CreateTableTasks(create_tables) => {
225                let tasks = create_tables
226                    .tasks
227                    .into_iter()
228                    .map(|task| task.try_into())
229                    .collect::<Result<Vec<_>>>()?;
230
231                Ok(DdlTask::CreateLogicalTables(tasks))
232            }
233            Task::DropTableTasks(drop_tables) => {
234                let tasks = drop_tables
235                    .tasks
236                    .into_iter()
237                    .map(|task| task.try_into())
238                    .collect::<Result<Vec<_>>>()?;
239
240                Ok(DdlTask::DropLogicalTables(tasks))
241            }
242            Task::AlterTableTasks(alter_tables) => {
243                let tasks = alter_tables
244                    .tasks
245                    .into_iter()
246                    .map(|task| task.try_into())
247                    .collect::<Result<Vec<_>>>()?;
248
249                Ok(DdlTask::AlterLogicalTables(tasks))
250            }
251            Task::CreateDatabaseTask(create_database) => {
252                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
253            }
254            Task::DropDatabaseTask(drop_database) => {
255                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
256            }
257            Task::AlterDatabaseTask(alter_database) => {
258                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
259            }
260            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
261            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
262            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
263            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
264            Task::CreateTriggerTask(create_trigger) => {
265                #[cfg(feature = "enterprise")]
266                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
267                #[cfg(not(feature = "enterprise"))]
268                {
269                    let _ = create_trigger;
270                    crate::error::UnsupportedSnafu {
271                        operation: "create trigger",
272                    }
273                    .fail()
274                }
275            }
276            Task::DropTriggerTask(drop_trigger) => {
277                #[cfg(feature = "enterprise")]
278                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
279                #[cfg(not(feature = "enterprise"))]
280                {
281                    let _ = drop_trigger;
282                    crate::error::UnsupportedSnafu {
283                        operation: "drop trigger",
284                    }
285                    .fail()
286                }
287            }
288            Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
289        }
290    }
291}
292
293#[derive(Clone)]
294pub struct SubmitDdlTaskRequest {
295    pub query_context: QueryContext,
296    pub wait: bool,
297    pub timeout: Duration,
298    pub task: DdlTask,
299}
300
301impl SubmitDdlTaskRequest {
302    /// The default constructor for [`SubmitDdlTaskRequest`].
303    pub fn new(query_context: QueryContext, task: DdlTask) -> Self {
304        Self {
305            query_context,
306            wait: Self::default_wait(),
307            timeout: Self::default_timeout(),
308            task,
309        }
310    }
311
312    /// The default timeout for a DDL task.
313    pub fn default_timeout() -> Duration {
314        Duration::from_secs(60)
315    }
316
317    /// The default wait for a DDL task.
318    pub fn default_wait() -> bool {
319        true
320    }
321}
322
323impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
324    type Error = error::Error;
325
326    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
327        let task = match request.task {
328            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
329            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
330            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
331            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
332            DdlTask::CreateLogicalTables(tasks) => {
333                let tasks = tasks
334                    .into_iter()
335                    .map(|task| task.try_into())
336                    .collect::<Result<Vec<_>>>()?;
337
338                Task::CreateTableTasks(PbCreateTableTasks { tasks })
339            }
340            DdlTask::DropLogicalTables(tasks) => {
341                let tasks = tasks
342                    .into_iter()
343                    .map(|task| task.into())
344                    .collect::<Vec<_>>();
345
346                Task::DropTableTasks(PbDropTableTasks { tasks })
347            }
348            DdlTask::AlterLogicalTables(tasks) => {
349                let tasks = tasks
350                    .into_iter()
351                    .map(|task| task.try_into())
352                    .collect::<Result<Vec<_>>>()?;
353
354                Task::AlterTableTasks(PbAlterTableTasks { tasks })
355            }
356            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
357            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
358            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
359            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
360            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
361            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
362            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
363            #[cfg(feature = "enterprise")]
364            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
365            #[cfg(feature = "enterprise")]
366            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
367            DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
368        };
369
370        Ok(Self {
371            header: None,
372            query_context: Some(request.query_context.into()),
373            timeout_secs: request.timeout.as_secs() as u32,
374            wait: request.wait,
375            task: Some(task),
376        })
377    }
378}
379
380#[derive(Debug, Default)]
381pub struct SubmitDdlTaskResponse {
382    pub key: Vec<u8>,
383    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
384    pub table_ids: Vec<TableId>,
385}
386
387impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
388    type Error = error::Error;
389
390    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
391        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
392        Ok(Self {
393            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
394            table_ids,
395        })
396    }
397}
398
399impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
400    fn from(val: SubmitDdlTaskResponse) -> Self {
401        Self {
402            pid: Some(ProcedureId { key: val.key }),
403            table_ids: val
404                .table_ids
405                .into_iter()
406                .map(|id| api::v1::TableId { id })
407                .collect(),
408            ..Default::default()
409        }
410    }
411}
412
413/// A `CREATE VIEW` task.
414#[derive(Debug, PartialEq, Clone)]
415pub struct CreateViewTask {
416    pub create_view: CreateViewExpr,
417    pub view_info: TableInfo,
418}
419
420impl CreateViewTask {
421    /// Returns the [`TableReference`] of view.
422    pub fn table_ref(&self) -> TableReference<'_> {
423        TableReference {
424            catalog: &self.create_view.catalog_name,
425            schema: &self.create_view.schema_name,
426            table: &self.create_view.view_name,
427        }
428    }
429
430    /// Returns the encoded logical plan
431    pub fn raw_logical_plan(&self) -> &Vec<u8> {
432        &self.create_view.logical_plan
433    }
434
435    /// Returns the view definition in SQL
436    pub fn view_definition(&self) -> &str {
437        &self.create_view.definition
438    }
439
440    /// Returns the resolved table names in view's logical plan
441    pub fn table_names(&self) -> HashSet<TableName> {
442        self.create_view
443            .table_names
444            .iter()
445            .map(|t| t.clone().into())
446            .collect()
447    }
448
449    /// Returns the view's columns
450    pub fn columns(&self) -> &Vec<String> {
451        &self.create_view.columns
452    }
453
454    /// Returns the original logical plan's columns
455    pub fn plan_columns(&self) -> &Vec<String> {
456        &self.create_view.plan_columns
457    }
458}
459
460impl TryFrom<PbCreateViewTask> for CreateViewTask {
461    type Error = error::Error;
462
463    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
464        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
465
466        Ok(CreateViewTask {
467            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
468                err_msg: "expected create view",
469            })?,
470            view_info,
471        })
472    }
473}
474
475impl TryFrom<CreateViewTask> for PbCreateViewTask {
476    type Error = error::Error;
477
478    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
479        Ok(PbCreateViewTask {
480            create_view: Some(task.create_view),
481            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
482        })
483    }
484}
485
486impl Serialize for CreateViewTask {
487    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
488    where
489        S: serde::Serializer,
490    {
491        let view_info = serde_json::to_vec(&self.view_info)
492            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
493
494        let pb = PbCreateViewTask {
495            create_view: Some(self.create_view.clone()),
496            view_info,
497        };
498        let buf = pb.encode_to_vec();
499        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
500        serializer.serialize_str(&encoded)
501    }
502}
503
504impl<'de> Deserialize<'de> for CreateViewTask {
505    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
506    where
507        D: serde::Deserializer<'de>,
508    {
509        let encoded = String::deserialize(deserializer)?;
510        let buf = general_purpose::STANDARD_NO_PAD
511            .decode(encoded)
512            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
513        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
514            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
515
516        let expr = CreateViewTask::try_from(expr)
517            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
518
519        Ok(expr)
520    }
521}
522
523/// A `DROP VIEW` task.
524#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
525pub struct DropViewTask {
526    pub catalog: String,
527    pub schema: String,
528    pub view: String,
529    pub view_id: TableId,
530    pub drop_if_exists: bool,
531}
532
533impl DropViewTask {
534    /// Returns the [`TableReference`] of view.
535    pub fn table_ref(&self) -> TableReference<'_> {
536        TableReference {
537            catalog: &self.catalog,
538            schema: &self.schema,
539            table: &self.view,
540        }
541    }
542}
543
544impl TryFrom<PbDropViewTask> for DropViewTask {
545    type Error = error::Error;
546
547    fn try_from(pb: PbDropViewTask) -> Result<Self> {
548        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
549            err_msg: "expected drop view",
550        })?;
551
552        Ok(DropViewTask {
553            catalog: expr.catalog_name,
554            schema: expr.schema_name,
555            view: expr.view_name,
556            view_id: expr
557                .view_id
558                .context(error::InvalidProtoMsgSnafu {
559                    err_msg: "expected view_id",
560                })?
561                .id,
562            drop_if_exists: expr.drop_if_exists,
563        })
564    }
565}
566
567impl From<DropViewTask> for PbDropViewTask {
568    fn from(task: DropViewTask) -> Self {
569        PbDropViewTask {
570            drop_view: Some(DropViewExpr {
571                catalog_name: task.catalog,
572                schema_name: task.schema,
573                view_name: task.view,
574                view_id: Some(api::v1::TableId { id: task.view_id }),
575                drop_if_exists: task.drop_if_exists,
576            }),
577        }
578    }
579}
580
581#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
582pub struct DropTableTask {
583    pub catalog: String,
584    pub schema: String,
585    pub table: String,
586    pub table_id: TableId,
587    #[serde(default)]
588    pub drop_if_exists: bool,
589}
590
591impl DropTableTask {
592    pub fn table_ref(&self) -> TableReference<'_> {
593        TableReference {
594            catalog: &self.catalog,
595            schema: &self.schema,
596            table: &self.table,
597        }
598    }
599
600    pub fn table_name(&self) -> TableName {
601        TableName {
602            catalog_name: self.catalog.clone(),
603            schema_name: self.schema.clone(),
604            table_name: self.table.clone(),
605        }
606    }
607}
608
609impl TryFrom<PbDropTableTask> for DropTableTask {
610    type Error = error::Error;
611
612    fn try_from(pb: PbDropTableTask) -> Result<Self> {
613        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
614            err_msg: "expected drop table",
615        })?;
616
617        Ok(Self {
618            catalog: drop_table.catalog_name,
619            schema: drop_table.schema_name,
620            table: drop_table.table_name,
621            table_id: drop_table
622                .table_id
623                .context(error::InvalidProtoMsgSnafu {
624                    err_msg: "expected table_id",
625                })?
626                .id,
627            drop_if_exists: drop_table.drop_if_exists,
628        })
629    }
630}
631
632impl From<DropTableTask> for PbDropTableTask {
633    fn from(task: DropTableTask) -> Self {
634        PbDropTableTask {
635            drop_table: Some(DropTableExpr {
636                catalog_name: task.catalog,
637                schema_name: task.schema,
638                table_name: task.table,
639                table_id: Some(api::v1::TableId { id: task.table_id }),
640                drop_if_exists: task.drop_if_exists,
641            }),
642        }
643    }
644}
645
646#[derive(Debug, PartialEq, Clone)]
647pub struct CreateTableTask {
648    pub create_table: CreateTableExpr,
649    pub partitions: Vec<Partition>,
650    pub table_info: TableInfo,
651}
652
653impl TryFrom<PbCreateTableTask> for CreateTableTask {
654    type Error = error::Error;
655
656    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
657        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
658
659        Ok(CreateTableTask::new(
660            pb.create_table.context(error::InvalidProtoMsgSnafu {
661                err_msg: "expected create table",
662            })?,
663            pb.partitions,
664            table_info,
665        ))
666    }
667}
668
669impl TryFrom<CreateTableTask> for PbCreateTableTask {
670    type Error = error::Error;
671
672    fn try_from(task: CreateTableTask) -> Result<Self> {
673        Ok(PbCreateTableTask {
674            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
675            create_table: Some(task.create_table),
676            partitions: task.partitions,
677        })
678    }
679}
680
681impl CreateTableTask {
682    pub fn new(
683        expr: CreateTableExpr,
684        partitions: Vec<Partition>,
685        table_info: TableInfo,
686    ) -> CreateTableTask {
687        CreateTableTask {
688            create_table: expr,
689            partitions,
690            table_info,
691        }
692    }
693
694    pub fn table_name(&self) -> TableName {
695        let table = &self.create_table;
696
697        TableName {
698            catalog_name: table.catalog_name.clone(),
699            schema_name: table.schema_name.clone(),
700            table_name: table.table_name.clone(),
701        }
702    }
703
704    pub fn table_ref(&self) -> TableReference<'_> {
705        let table = &self.create_table;
706
707        TableReference {
708            catalog: &table.catalog_name,
709            schema: &table.schema_name,
710            table: &table.table_name,
711        }
712    }
713
714    /// Sets the `table_info`'s table_id.
715    pub fn set_table_id(&mut self, table_id: TableId) {
716        self.table_info.ident.table_id = table_id;
717    }
718
719    /// Sort the columns in [CreateTableExpr] and [TableInfo].
720    ///
721    /// This function won't do any check or verification. Caller should
722    /// ensure this task is valid.
723    pub fn sort_columns(&mut self) {
724        // sort create table expr
725        // sort column_defs by name
726        self.create_table
727            .column_defs
728            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
729
730        self.table_info.sort_columns();
731    }
732}
733
734impl Serialize for CreateTableTask {
735    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
736    where
737        S: serde::Serializer,
738    {
739        let table_info = serde_json::to_vec(&self.table_info)
740            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
741
742        let pb = PbCreateTableTask {
743            create_table: Some(self.create_table.clone()),
744            partitions: self.partitions.clone(),
745            table_info,
746        };
747        let buf = pb.encode_to_vec();
748        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
749        serializer.serialize_str(&encoded)
750    }
751}
752
753impl<'de> Deserialize<'de> for CreateTableTask {
754    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
755    where
756        D: serde::Deserializer<'de>,
757    {
758        let encoded = String::deserialize(deserializer)?;
759        let buf = general_purpose::STANDARD_NO_PAD
760            .decode(encoded)
761            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
762        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
763            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
764
765        let expr = CreateTableTask::try_from(expr)
766            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
767
768        Ok(expr)
769    }
770}
771
772#[derive(Debug, PartialEq, Clone)]
773pub struct AlterTableTask {
774    // TODO(CookiePieWw): Replace proto struct with user-defined struct
775    pub alter_table: AlterTableExpr,
776}
777
778impl AlterTableTask {
779    pub fn validate(&self) -> Result<()> {
780        self.alter_table
781            .kind
782            .as_ref()
783            .context(error::UnexpectedSnafu {
784                err_msg: "'kind' is absent",
785            })?;
786        Ok(())
787    }
788
789    pub fn table_ref(&self) -> TableReference<'_> {
790        TableReference {
791            catalog: &self.alter_table.catalog_name,
792            schema: &self.alter_table.schema_name,
793            table: &self.alter_table.table_name,
794        }
795    }
796
797    pub fn table_name(&self) -> TableName {
798        let table = &self.alter_table;
799
800        TableName {
801            catalog_name: table.catalog_name.clone(),
802            schema_name: table.schema_name.clone(),
803            table_name: table.table_name.clone(),
804        }
805    }
806}
807
808impl TryFrom<PbAlterTableTask> for AlterTableTask {
809    type Error = error::Error;
810
811    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
812        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
813            err_msg: "expected alter_table",
814        })?;
815
816        Ok(AlterTableTask { alter_table })
817    }
818}
819
820impl TryFrom<AlterTableTask> for PbAlterTableTask {
821    type Error = error::Error;
822
823    fn try_from(task: AlterTableTask) -> Result<Self> {
824        Ok(PbAlterTableTask {
825            alter_table: Some(task.alter_table),
826        })
827    }
828}
829
830impl Serialize for AlterTableTask {
831    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
832    where
833        S: serde::Serializer,
834    {
835        let pb = PbAlterTableTask {
836            alter_table: Some(self.alter_table.clone()),
837        };
838        let buf = pb.encode_to_vec();
839        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
840        serializer.serialize_str(&encoded)
841    }
842}
843
844impl<'de> Deserialize<'de> for AlterTableTask {
845    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
846    where
847        D: serde::Deserializer<'de>,
848    {
849        let encoded = String::deserialize(deserializer)?;
850        let buf = general_purpose::STANDARD_NO_PAD
851            .decode(encoded)
852            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
853        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
854            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
855
856        let expr = AlterTableTask::try_from(expr)
857            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
858
859        Ok(expr)
860    }
861}
862
863#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
864pub struct TruncateTableTask {
865    pub catalog: String,
866    pub schema: String,
867    pub table: String,
868    pub table_id: TableId,
869    pub time_ranges: Vec<(Timestamp, Timestamp)>,
870}
871
872impl TruncateTableTask {
873    pub fn table_ref(&self) -> TableReference<'_> {
874        TableReference {
875            catalog: &self.catalog,
876            schema: &self.schema,
877            table: &self.table,
878        }
879    }
880
881    pub fn table_name(&self) -> TableName {
882        TableName {
883            catalog_name: self.catalog.clone(),
884            schema_name: self.schema.clone(),
885            table_name: self.table.clone(),
886        }
887    }
888}
889
890impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
891    type Error = error::Error;
892
893    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
894        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
895            err_msg: "expected truncate table",
896        })?;
897
898        Ok(Self {
899            catalog: truncate_table.catalog_name,
900            schema: truncate_table.schema_name,
901            table: truncate_table.table_name,
902            table_id: truncate_table
903                .table_id
904                .context(error::InvalidProtoMsgSnafu {
905                    err_msg: "expected table_id",
906                })?
907                .id,
908            time_ranges: truncate_table
909                .time_ranges
910                .map(from_pb_time_ranges)
911                .transpose()
912                .map_err(BoxedError::new)
913                .context(ExternalSnafu)?
914                .unwrap_or_default(),
915        })
916    }
917}
918
919impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
920    type Error = error::Error;
921
922    fn try_from(task: TruncateTableTask) -> Result<Self> {
923        Ok(PbTruncateTableTask {
924            truncate_table: Some(TruncateTableExpr {
925                catalog_name: task.catalog,
926                schema_name: task.schema,
927                table_name: task.table,
928                table_id: Some(api::v1::TableId { id: task.table_id }),
929                time_ranges: Some(
930                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
931                ),
932            }),
933        })
934    }
935}
936
937#[serde_as]
938#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
939pub struct CreateDatabaseTask {
940    pub catalog: String,
941    pub schema: String,
942    pub create_if_not_exists: bool,
943    #[serde_as(deserialize_as = "DefaultOnNull")]
944    pub options: HashMap<String, String>,
945}
946
947impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
948    type Error = error::Error;
949
950    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
951        let CreateDatabaseExpr {
952            catalog_name,
953            schema_name,
954            create_if_not_exists,
955            options,
956        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
957            err_msg: "expected create database",
958        })?;
959
960        Ok(CreateDatabaseTask {
961            catalog: catalog_name,
962            schema: schema_name,
963            create_if_not_exists,
964            options,
965        })
966    }
967}
968
969impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
970    type Error = error::Error;
971
972    fn try_from(
973        CreateDatabaseTask {
974            catalog,
975            schema,
976            create_if_not_exists,
977            options,
978        }: CreateDatabaseTask,
979    ) -> Result<Self> {
980        Ok(PbCreateDatabaseTask {
981            create_database: Some(CreateDatabaseExpr {
982                catalog_name: catalog,
983                schema_name: schema,
984                create_if_not_exists,
985                options,
986            }),
987        })
988    }
989}
990
991#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
992pub struct DropDatabaseTask {
993    pub catalog: String,
994    pub schema: String,
995    pub drop_if_exists: bool,
996}
997
998impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
999    type Error = error::Error;
1000
1001    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1002        let DropDatabaseExpr {
1003            catalog_name,
1004            schema_name,
1005            drop_if_exists,
1006        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1007            err_msg: "expected drop database",
1008        })?;
1009
1010        Ok(DropDatabaseTask {
1011            catalog: catalog_name,
1012            schema: schema_name,
1013            drop_if_exists,
1014        })
1015    }
1016}
1017
1018impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1019    type Error = error::Error;
1020
1021    fn try_from(
1022        DropDatabaseTask {
1023            catalog,
1024            schema,
1025            drop_if_exists,
1026        }: DropDatabaseTask,
1027    ) -> Result<Self> {
1028        Ok(PbDropDatabaseTask {
1029            drop_database: Some(DropDatabaseExpr {
1030                catalog_name: catalog,
1031                schema_name: schema,
1032                drop_if_exists,
1033            }),
1034        })
1035    }
1036}
1037
1038#[derive(Debug, PartialEq, Clone)]
1039pub struct AlterDatabaseTask {
1040    pub alter_expr: AlterDatabaseExpr,
1041}
1042
1043impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1044    type Error = error::Error;
1045
1046    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1047        Ok(PbAlterDatabaseTask {
1048            task: Some(task.alter_expr),
1049        })
1050    }
1051}
1052
1053impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1054    type Error = error::Error;
1055
1056    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1057        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1058            err_msg: "expected alter database",
1059        })?;
1060
1061        Ok(AlterDatabaseTask { alter_expr })
1062    }
1063}
1064
1065impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1066    type Error = error::Error;
1067
1068    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1069        match pb {
1070            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1071                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1072                    options
1073                        .set_database_options
1074                        .into_iter()
1075                        .map(SetDatabaseOption::try_from)
1076                        .collect::<Result<Vec<_>>>()?,
1077                )))
1078            }
1079            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1080                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1081                    options
1082                        .keys
1083                        .iter()
1084                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1085                        .collect::<Result<Vec<_>>>()?,
1086                )),
1087            ),
1088        }
1089    }
1090}
1091
1092const TTL_KEY: &str = "ttl";
1093
1094impl TryFrom<PbOption> for SetDatabaseOption {
1095    type Error = error::Error;
1096
1097    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1098        let key_lower = key.to_ascii_lowercase();
1099        match key_lower.as_str() {
1100            TTL_KEY => {
1101                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1102                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1103
1104                Ok(SetDatabaseOption::Ttl(ttl))
1105            }
1106            _ => {
1107                if validate_database_option(&key_lower) {
1108                    Ok(SetDatabaseOption::Other(key_lower, value))
1109                } else {
1110                    InvalidSetDatabaseOptionSnafu { key, value }.fail()
1111                }
1112            }
1113        }
1114    }
1115}
1116
1117#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1118pub enum SetDatabaseOption {
1119    Ttl(DatabaseTimeToLive),
1120    Other(String, String),
1121}
1122
1123#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1124pub enum UnsetDatabaseOption {
1125    Ttl,
1126    Other(String),
1127}
1128
1129impl TryFrom<&str> for UnsetDatabaseOption {
1130    type Error = error::Error;
1131
1132    fn try_from(key: &str) -> Result<Self> {
1133        let key_lower = key.to_ascii_lowercase();
1134        match key_lower.as_str() {
1135            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1136            _ => {
1137                if validate_database_option(&key_lower) {
1138                    Ok(UnsetDatabaseOption::Other(key_lower))
1139                } else {
1140                    InvalidUnsetDatabaseOptionSnafu { key }.fail()
1141                }
1142            }
1143        }
1144    }
1145}
1146
1147#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1148pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1149
1150#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1151pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1152
1153#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1154pub enum AlterDatabaseKind {
1155    SetDatabaseOptions(SetDatabaseOptions),
1156    UnsetDatabaseOptions(UnsetDatabaseOptions),
1157}
1158
1159impl AlterDatabaseTask {
1160    pub fn catalog(&self) -> &str {
1161        &self.alter_expr.catalog_name
1162    }
1163
1164    pub fn schema(&self) -> &str {
1165        &self.alter_expr.catalog_name
1166    }
1167}
1168
1169/// Create flow
1170#[derive(Debug, Clone, Serialize, Deserialize)]
1171pub struct CreateFlowTask {
1172    pub catalog_name: String,
1173    pub flow_name: String,
1174    pub source_table_names: Vec<TableName>,
1175    pub sink_table_name: TableName,
1176    pub or_replace: bool,
1177    pub create_if_not_exists: bool,
1178    /// Duration in seconds. Data older than this duration will not be used.
1179    pub expire_after: Option<i64>,
1180    pub eval_interval_secs: Option<i64>,
1181    pub comment: String,
1182    pub sql: String,
1183    pub flow_options: HashMap<String, String>,
1184}
1185
1186impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1187    type Error = error::Error;
1188
1189    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1190        let CreateFlowExpr {
1191            catalog_name,
1192            flow_name,
1193            source_table_names,
1194            sink_table_name,
1195            or_replace,
1196            create_if_not_exists,
1197            expire_after,
1198            eval_interval,
1199            comment,
1200            sql,
1201            flow_options,
1202        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1203            err_msg: "expected create_flow",
1204        })?;
1205
1206        Ok(CreateFlowTask {
1207            catalog_name,
1208            flow_name,
1209            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1210            sink_table_name: sink_table_name
1211                .context(error::InvalidProtoMsgSnafu {
1212                    err_msg: "expected sink_table_name",
1213                })?
1214                .into(),
1215            or_replace,
1216            create_if_not_exists,
1217            expire_after: expire_after.map(|e| e.value),
1218            eval_interval_secs: eval_interval.map(|e| e.seconds),
1219            comment,
1220            sql,
1221            flow_options,
1222        })
1223    }
1224}
1225
1226impl From<CreateFlowTask> for PbCreateFlowTask {
1227    fn from(
1228        CreateFlowTask {
1229            catalog_name,
1230            flow_name,
1231            source_table_names,
1232            sink_table_name,
1233            or_replace,
1234            create_if_not_exists,
1235            expire_after,
1236            eval_interval_secs: eval_interval,
1237            comment,
1238            sql,
1239            flow_options,
1240        }: CreateFlowTask,
1241    ) -> Self {
1242        PbCreateFlowTask {
1243            create_flow: Some(CreateFlowExpr {
1244                catalog_name,
1245                flow_name,
1246                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1247                sink_table_name: Some(sink_table_name.into()),
1248                or_replace,
1249                create_if_not_exists,
1250                expire_after: expire_after.map(|value| ExpireAfter { value }),
1251                eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1252                comment,
1253                sql,
1254                flow_options,
1255            }),
1256        }
1257    }
1258}
1259
1260/// Drop flow
1261#[derive(Debug, Clone, Serialize, Deserialize)]
1262pub struct DropFlowTask {
1263    pub catalog_name: String,
1264    pub flow_name: String,
1265    pub flow_id: FlowId,
1266    pub drop_if_exists: bool,
1267}
1268
1269impl TryFrom<PbDropFlowTask> for DropFlowTask {
1270    type Error = error::Error;
1271
1272    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1273        let DropFlowExpr {
1274            catalog_name,
1275            flow_name,
1276            flow_id,
1277            drop_if_exists,
1278        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1279            err_msg: "expected drop_flow",
1280        })?;
1281        let flow_id = flow_id
1282            .context(error::InvalidProtoMsgSnafu {
1283                err_msg: "expected flow_id",
1284            })?
1285            .id;
1286        Ok(DropFlowTask {
1287            catalog_name,
1288            flow_name,
1289            flow_id,
1290            drop_if_exists,
1291        })
1292    }
1293}
1294
1295impl From<DropFlowTask> for PbDropFlowTask {
1296    fn from(
1297        DropFlowTask {
1298            catalog_name,
1299            flow_name,
1300            flow_id,
1301            drop_if_exists,
1302        }: DropFlowTask,
1303    ) -> Self {
1304        PbDropFlowTask {
1305            drop_flow: Some(DropFlowExpr {
1306                catalog_name,
1307                flow_name,
1308                flow_id: Some(api::v1::FlowId { id: flow_id }),
1309                drop_if_exists,
1310            }),
1311        }
1312    }
1313}
1314
1315/// Represents the ID of the object being commented on (Table or Flow).
1316#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1317pub enum CommentObjectId {
1318    Table(TableId),
1319    Flow(FlowId),
1320}
1321
1322/// Comment on table, column, or flow
1323#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1324pub struct CommentOnTask {
1325    pub catalog_name: String,
1326    pub schema_name: String,
1327    pub object_type: CommentObjectType,
1328    pub object_name: String,
1329    /// Column name (only for Column comments)
1330    pub column_name: Option<String>,
1331    /// Object ID (Table or Flow) for validation and cache invalidation
1332    pub object_id: Option<CommentObjectId>,
1333    pub comment: Option<String>,
1334}
1335
1336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1337pub enum CommentObjectType {
1338    Table,
1339    Column,
1340    Flow,
1341}
1342
1343impl CommentOnTask {
1344    pub fn table_ref(&self) -> TableReference<'_> {
1345        TableReference {
1346            catalog: &self.catalog_name,
1347            schema: &self.schema_name,
1348            table: &self.object_name,
1349        }
1350    }
1351}
1352
1353// Proto conversions for CommentObjectType
1354impl From<CommentObjectType> for PbCommentObjectType {
1355    fn from(object_type: CommentObjectType) -> Self {
1356        match object_type {
1357            CommentObjectType::Table => PbCommentObjectType::Table,
1358            CommentObjectType::Column => PbCommentObjectType::Column,
1359            CommentObjectType::Flow => PbCommentObjectType::Flow,
1360        }
1361    }
1362}
1363
1364impl TryFrom<i32> for CommentObjectType {
1365    type Error = error::Error;
1366
1367    fn try_from(value: i32) -> Result<Self> {
1368        match value {
1369            0 => Ok(CommentObjectType::Table),
1370            1 => Ok(CommentObjectType::Column),
1371            2 => Ok(CommentObjectType::Flow),
1372            _ => error::InvalidProtoMsgSnafu {
1373                err_msg: format!(
1374                    "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1375                    value
1376                ),
1377            }
1378            .fail(),
1379        }
1380    }
1381}
1382
1383// Proto conversions for CommentOnTask
1384impl TryFrom<PbCommentOnTask> for CommentOnTask {
1385    type Error = error::Error;
1386
1387    fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1388        let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1389            err_msg: "expected comment_on",
1390        })?;
1391
1392        Ok(CommentOnTask {
1393            catalog_name: comment_on.catalog_name,
1394            schema_name: comment_on.schema_name,
1395            object_type: comment_on.object_type.try_into()?,
1396            object_name: comment_on.object_name,
1397            column_name: if comment_on.column_name.is_empty() {
1398                None
1399            } else {
1400                Some(comment_on.column_name)
1401            },
1402            comment: if comment_on.comment.is_empty() {
1403                None
1404            } else {
1405                Some(comment_on.comment)
1406            },
1407            object_id: None,
1408        })
1409    }
1410}
1411
1412impl From<CommentOnTask> for PbCommentOnTask {
1413    fn from(task: CommentOnTask) -> Self {
1414        let pb_object_type: PbCommentObjectType = task.object_type.into();
1415        PbCommentOnTask {
1416            comment_on: Some(CommentOnExpr {
1417                catalog_name: task.catalog_name,
1418                schema_name: task.schema_name,
1419                object_type: pb_object_type as i32,
1420                object_name: task.object_name,
1421                column_name: task.column_name.unwrap_or_default(),
1422                comment: task.comment.unwrap_or_default(),
1423            }),
1424        }
1425    }
1426}
1427
1428#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1429pub struct QueryContext {
1430    pub current_catalog: String,
1431    pub current_schema: String,
1432    pub timezone: String,
1433    pub extensions: HashMap<String, String>,
1434    pub channel: u8,
1435}
1436
1437impl QueryContext {
1438    /// Get the current catalog
1439    pub fn current_catalog(&self) -> &str {
1440        &self.current_catalog
1441    }
1442
1443    /// Get the current schema
1444    pub fn current_schema(&self) -> &str {
1445        &self.current_schema
1446    }
1447
1448    /// Get the timezone
1449    pub fn timezone(&self) -> &str {
1450        &self.timezone
1451    }
1452
1453    /// Get the extensions
1454    pub fn extensions(&self) -> &HashMap<String, String> {
1455        &self.extensions
1456    }
1457
1458    /// Get the channel
1459    pub fn channel(&self) -> u8 {
1460        self.channel
1461    }
1462}
1463
1464/// Lightweight query context for flow operations containing only essential fields.
1465/// This is a subset of QueryContext that includes only the fields actually needed
1466/// for flow creation and execution.
1467#[derive(Debug, Clone, Serialize, PartialEq)]
1468pub struct FlowQueryContext {
1469    /// Current catalog name - needed for flow metadata and recovery
1470    pub catalog: String,
1471    /// Current schema name - needed for table resolution during flow execution
1472    pub schema: String,
1473    /// Timezone for timestamp operations in the flow
1474    pub timezone: String,
1475}
1476
1477impl<'de> Deserialize<'de> for FlowQueryContext {
1478    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1479    where
1480        D: serde::Deserializer<'de>,
1481    {
1482        // Support both QueryContext format and FlowQueryContext format
1483        #[derive(Deserialize)]
1484        #[serde(untagged)]
1485        enum ContextCompat {
1486            Flow(FlowQueryContextHelper),
1487            Full(QueryContext),
1488        }
1489
1490        #[derive(Deserialize)]
1491        struct FlowQueryContextHelper {
1492            catalog: String,
1493            schema: String,
1494            timezone: String,
1495        }
1496
1497        match ContextCompat::deserialize(deserializer)? {
1498            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1499                catalog: helper.catalog,
1500                schema: helper.schema,
1501                timezone: helper.timezone,
1502            }),
1503            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1504        }
1505    }
1506}
1507
1508impl From<PbQueryContext> for QueryContext {
1509    fn from(pb_ctx: PbQueryContext) -> Self {
1510        Self {
1511            current_catalog: pb_ctx.current_catalog,
1512            current_schema: pb_ctx.current_schema,
1513            timezone: pb_ctx.timezone,
1514            extensions: pb_ctx.extensions,
1515            channel: pb_ctx.channel as u8,
1516        }
1517    }
1518}
1519
1520impl From<QueryContext> for PbQueryContext {
1521    fn from(
1522        QueryContext {
1523            current_catalog,
1524            current_schema,
1525            timezone,
1526            extensions,
1527            channel,
1528        }: QueryContext,
1529    ) -> Self {
1530        PbQueryContext {
1531            current_catalog,
1532            current_schema,
1533            timezone,
1534            extensions,
1535            channel: channel as u32,
1536            snapshot_seqs: None,
1537            explain: None,
1538        }
1539    }
1540}
1541
1542impl From<QueryContext> for FlowQueryContext {
1543    fn from(ctx: QueryContext) -> Self {
1544        Self {
1545            catalog: ctx.current_catalog,
1546            schema: ctx.current_schema,
1547            timezone: ctx.timezone,
1548        }
1549    }
1550}
1551
1552impl From<FlowQueryContext> for QueryContext {
1553    fn from(flow_ctx: FlowQueryContext) -> Self {
1554        Self {
1555            current_catalog: flow_ctx.catalog,
1556            current_schema: flow_ctx.schema,
1557            timezone: flow_ctx.timezone,
1558            extensions: HashMap::new(),
1559            channel: 0, // Use default channel for flows
1560        }
1561    }
1562}
1563
1564impl From<FlowQueryContext> for PbQueryContext {
1565    fn from(flow_ctx: FlowQueryContext) -> Self {
1566        let query_ctx: QueryContext = flow_ctx.into();
1567        query_ctx.into()
1568    }
1569}
1570
1571#[cfg(test)]
1572mod tests {
1573    use std::sync::Arc;
1574
1575    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1576    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
1577    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1578    use store_api::storage::ConcreteDataType;
1579    use table::metadata::{TableInfo, TableMeta, TableType};
1580    use table::test_util::table_info::test_table_info;
1581
1582    use super::{AlterTableTask, CreateTableTask, *};
1583
1584    #[test]
1585    fn test_basic_ser_de_create_table_task() {
1586        let schema = SchemaBuilder::default().build().unwrap();
1587        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1588        let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info);
1589
1590        let output = serde_json::to_vec(&task).unwrap();
1591
1592        let de = serde_json::from_slice(&output).unwrap();
1593        assert_eq!(task, de);
1594    }
1595
1596    #[test]
1597    fn test_basic_ser_de_alter_table_task() {
1598        let task = AlterTableTask {
1599            alter_table: AlterTableExpr::default(),
1600        };
1601
1602        let output = serde_json::to_vec(&task).unwrap();
1603
1604        let de = serde_json::from_slice(&output).unwrap();
1605        assert_eq!(task, de);
1606    }
1607
1608    #[test]
1609    fn test_sort_columns() {
1610        // construct RawSchema
1611        let schema = Arc::new(Schema::new(vec![
1612            ColumnSchema::new(
1613                "column3".to_string(),
1614                ConcreteDataType::string_datatype(),
1615                true,
1616            ),
1617            ColumnSchema::new(
1618                "column1".to_string(),
1619                ConcreteDataType::timestamp_millisecond_datatype(),
1620                false,
1621            )
1622            .with_time_index(true),
1623            ColumnSchema::new(
1624                "column2".to_string(),
1625                ConcreteDataType::float64_datatype(),
1626                true,
1627            ),
1628        ]));
1629
1630        // construct RawTableMeta
1631        let meta = TableMeta {
1632            schema,
1633            primary_key_indices: vec![0],
1634            value_indices: vec![2],
1635            engine: METRIC_ENGINE_NAME.to_string(),
1636            next_column_id: 0,
1637            options: Default::default(),
1638            created_on: Default::default(),
1639            updated_on: Default::default(),
1640            partition_key_indices: Default::default(),
1641            column_ids: Default::default(),
1642        };
1643
1644        // construct TableInfo
1645        let raw_table_info = TableInfo {
1646            ident: Default::default(),
1647            meta,
1648            name: Default::default(),
1649            desc: Default::default(),
1650            catalog_name: Default::default(),
1651            schema_name: Default::default(),
1652            table_type: TableType::Base,
1653        };
1654
1655        // construct create table expr
1656        let create_table_expr = CreateTableExpr {
1657            column_defs: vec![
1658                ColumnDef {
1659                    name: "column3".to_string(),
1660                    semantic_type: SemanticType::Tag as i32,
1661                    ..Default::default()
1662                },
1663                ColumnDef {
1664                    name: "column1".to_string(),
1665                    semantic_type: SemanticType::Timestamp as i32,
1666                    ..Default::default()
1667                },
1668                ColumnDef {
1669                    name: "column2".to_string(),
1670                    semantic_type: SemanticType::Field as i32,
1671                    ..Default::default()
1672                },
1673            ],
1674            primary_keys: vec!["column3".to_string()],
1675            ..Default::default()
1676        };
1677
1678        let mut create_table_task =
1679            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1680
1681        // Call the sort_columns method
1682        create_table_task.sort_columns();
1683
1684        // Assert that the columns are sorted correctly
1685        assert_eq!(
1686            create_table_task.create_table.column_defs[0].name,
1687            "column1".to_string()
1688        );
1689        assert_eq!(
1690            create_table_task.create_table.column_defs[1].name,
1691            "column2".to_string()
1692        );
1693        assert_eq!(
1694            create_table_task.create_table.column_defs[2].name,
1695            "column3".to_string()
1696        );
1697
1698        // Assert that the table_info is updated correctly
1699        assert_eq!(
1700            create_table_task.table_info.meta.schema.timestamp_index(),
1701            Some(0)
1702        );
1703        assert_eq!(
1704            create_table_task.table_info.meta.primary_key_indices,
1705            vec![2]
1706        );
1707        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1708    }
1709
1710    #[test]
1711    fn test_flow_query_context_conversion_from_query_context() {
1712        use std::collections::HashMap;
1713        let mut extensions = HashMap::new();
1714        extensions.insert("key1".to_string(), "value1".to_string());
1715        extensions.insert("key2".to_string(), "value2".to_string());
1716
1717        let query_ctx = QueryContext {
1718            current_catalog: "test_catalog".to_string(),
1719            current_schema: "test_schema".to_string(),
1720            timezone: "UTC".to_string(),
1721            extensions,
1722            channel: 5,
1723        };
1724
1725        let flow_ctx: FlowQueryContext = query_ctx.into();
1726
1727        assert_eq!(flow_ctx.catalog, "test_catalog");
1728        assert_eq!(flow_ctx.schema, "test_schema");
1729        assert_eq!(flow_ctx.timezone, "UTC");
1730    }
1731
1732    #[test]
1733    fn test_flow_query_context_conversion_to_query_context() {
1734        let flow_ctx = FlowQueryContext {
1735            catalog: "prod_catalog".to_string(),
1736            schema: "public".to_string(),
1737            timezone: "America/New_York".to_string(),
1738        };
1739
1740        let query_ctx: QueryContext = flow_ctx.clone().into();
1741
1742        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1743        assert_eq!(query_ctx.current_schema, "public");
1744        assert_eq!(query_ctx.timezone, "America/New_York");
1745        assert!(query_ctx.extensions.is_empty());
1746        assert_eq!(query_ctx.channel, 0);
1747
1748        // Test roundtrip conversion
1749        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1750        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1751    }
1752
1753    #[test]
1754    fn test_flow_query_context_serialization() {
1755        let flow_ctx = FlowQueryContext {
1756            catalog: "test_catalog".to_string(),
1757            schema: "test_schema".to_string(),
1758            timezone: "UTC".to_string(),
1759        };
1760
1761        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1762        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1763
1764        assert_eq!(flow_ctx, deserialized);
1765
1766        // Verify JSON structure
1767        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1768        assert_eq!(json_value["catalog"], "test_catalog");
1769        assert_eq!(json_value["schema"], "test_schema");
1770        assert_eq!(json_value["timezone"], "UTC");
1771    }
1772
1773    #[test]
1774    fn test_flow_query_context_conversion_to_pb() {
1775        let flow_ctx = FlowQueryContext {
1776            catalog: "pb_catalog".to_string(),
1777            schema: "pb_schema".to_string(),
1778            timezone: "Asia/Tokyo".to_string(),
1779        };
1780
1781        let pb_ctx: PbQueryContext = flow_ctx.into();
1782
1783        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1784        assert_eq!(pb_ctx.current_schema, "pb_schema");
1785        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1786        assert!(pb_ctx.extensions.is_empty());
1787        assert_eq!(pb_ctx.channel, 0);
1788        assert!(pb_ctx.snapshot_seqs.is_none());
1789        assert!(pb_ctx.explain.is_none());
1790    }
1791}