common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27    AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28    CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29    CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30    CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31    DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32    DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33    DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34    TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37    AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38    CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39    DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40    QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use session::context::{QueryContextBuilder, QueryContextRef};
50use snafu::{OptionExt, ResultExt};
51use table::metadata::{RawTableInfo, TableId};
52use table::requests::validate_database_option;
53use table::table_name::TableName;
54use table::table_reference::TableReference;
55
56use crate::error::{
57    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
58    InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
59};
60use crate::key::FlowId;
61
62/// DDL tasks
63#[derive(Debug, Clone)]
64pub enum DdlTask {
65    CreateTable(CreateTableTask),
66    DropTable(DropTableTask),
67    AlterTable(AlterTableTask),
68    TruncateTable(TruncateTableTask),
69    CreateLogicalTables(Vec<CreateTableTask>),
70    DropLogicalTables(Vec<DropTableTask>),
71    AlterLogicalTables(Vec<AlterTableTask>),
72    CreateDatabase(CreateDatabaseTask),
73    DropDatabase(DropDatabaseTask),
74    AlterDatabase(AlterDatabaseTask),
75    CreateFlow(CreateFlowTask),
76    DropFlow(DropFlowTask),
77    #[cfg(feature = "enterprise")]
78    DropTrigger(trigger::DropTriggerTask),
79    CreateView(CreateViewTask),
80    DropView(DropViewTask),
81    #[cfg(feature = "enterprise")]
82    CreateTrigger(trigger::CreateTriggerTask),
83    CommentOn(CommentOnTask),
84}
85
86impl DdlTask {
87    /// Creates a [`DdlTask`] to create a flow.
88    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
89        DdlTask::CreateFlow(expr)
90    }
91
92    /// Creates a [`DdlTask`] to drop a flow.
93    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
94        DdlTask::DropFlow(expr)
95    }
96
97    /// Creates a [`DdlTask`] to drop a view.
98    pub fn new_drop_view(expr: DropViewTask) -> Self {
99        DdlTask::DropView(expr)
100    }
101
102    /// Creates a [`DdlTask`] to create a table.
103    pub fn new_create_table(
104        expr: CreateTableExpr,
105        partitions: Vec<Partition>,
106        table_info: RawTableInfo,
107    ) -> Self {
108        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
109    }
110
111    /// Creates a [`DdlTask`] to create several logical tables.
112    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
113        DdlTask::CreateLogicalTables(
114            table_data
115                .into_iter()
116                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
117                .collect(),
118        )
119    }
120
121    /// Creates a [`DdlTask`] to alter several logical tables.
122    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
123        DdlTask::AlterLogicalTables(
124            table_data
125                .into_iter()
126                .map(|alter_table| AlterTableTask { alter_table })
127                .collect(),
128        )
129    }
130
131    /// Creates a [`DdlTask`] to drop a table.
132    pub fn new_drop_table(
133        catalog: String,
134        schema: String,
135        table: String,
136        table_id: TableId,
137        drop_if_exists: bool,
138    ) -> Self {
139        DdlTask::DropTable(DropTableTask {
140            catalog,
141            schema,
142            table,
143            table_id,
144            drop_if_exists,
145        })
146    }
147
148    /// Creates a [`DdlTask`] to create a database.
149    pub fn new_create_database(
150        catalog: String,
151        schema: String,
152        create_if_not_exists: bool,
153        options: HashMap<String, String>,
154    ) -> Self {
155        DdlTask::CreateDatabase(CreateDatabaseTask {
156            catalog,
157            schema,
158            create_if_not_exists,
159            options,
160        })
161    }
162
163    /// Creates a [`DdlTask`] to drop a database.
164    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
165        DdlTask::DropDatabase(DropDatabaseTask {
166            catalog,
167            schema,
168            drop_if_exists,
169        })
170    }
171
172    /// Creates a [`DdlTask`] to alter a database.
173    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
174        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
175    }
176
177    /// Creates a [`DdlTask`] to alter a table.
178    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
179        DdlTask::AlterTable(AlterTableTask { alter_table })
180    }
181
182    /// Creates a [`DdlTask`] to truncate a table.
183    pub fn new_truncate_table(
184        catalog: String,
185        schema: String,
186        table: String,
187        table_id: TableId,
188        time_ranges: Vec<(Timestamp, Timestamp)>,
189    ) -> Self {
190        DdlTask::TruncateTable(TruncateTableTask {
191            catalog,
192            schema,
193            table,
194            table_id,
195            time_ranges,
196        })
197    }
198
199    /// Creates a [`DdlTask`] to create a view.
200    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
201        DdlTask::CreateView(CreateViewTask {
202            create_view,
203            view_info,
204        })
205    }
206
207    /// Creates a [`DdlTask`] to comment on a table, column, or flow.
208    pub fn new_comment_on(task: CommentOnTask) -> Self {
209        DdlTask::CommentOn(task)
210    }
211}
212
213impl TryFrom<Task> for DdlTask {
214    type Error = error::Error;
215    fn try_from(task: Task) -> Result<Self> {
216        match task {
217            Task::CreateTableTask(create_table) => {
218                Ok(DdlTask::CreateTable(create_table.try_into()?))
219            }
220            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
221            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
222            Task::TruncateTableTask(truncate_table) => {
223                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
224            }
225            Task::CreateTableTasks(create_tables) => {
226                let tasks = create_tables
227                    .tasks
228                    .into_iter()
229                    .map(|task| task.try_into())
230                    .collect::<Result<Vec<_>>>()?;
231
232                Ok(DdlTask::CreateLogicalTables(tasks))
233            }
234            Task::DropTableTasks(drop_tables) => {
235                let tasks = drop_tables
236                    .tasks
237                    .into_iter()
238                    .map(|task| task.try_into())
239                    .collect::<Result<Vec<_>>>()?;
240
241                Ok(DdlTask::DropLogicalTables(tasks))
242            }
243            Task::AlterTableTasks(alter_tables) => {
244                let tasks = alter_tables
245                    .tasks
246                    .into_iter()
247                    .map(|task| task.try_into())
248                    .collect::<Result<Vec<_>>>()?;
249
250                Ok(DdlTask::AlterLogicalTables(tasks))
251            }
252            Task::CreateDatabaseTask(create_database) => {
253                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
254            }
255            Task::DropDatabaseTask(drop_database) => {
256                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
257            }
258            Task::AlterDatabaseTask(alter_database) => {
259                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
260            }
261            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
262            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
263            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
264            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
265            Task::CreateTriggerTask(create_trigger) => {
266                #[cfg(feature = "enterprise")]
267                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
268                #[cfg(not(feature = "enterprise"))]
269                {
270                    let _ = create_trigger;
271                    crate::error::UnsupportedSnafu {
272                        operation: "create trigger",
273                    }
274                    .fail()
275                }
276            }
277            Task::DropTriggerTask(drop_trigger) => {
278                #[cfg(feature = "enterprise")]
279                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
280                #[cfg(not(feature = "enterprise"))]
281                {
282                    let _ = drop_trigger;
283                    crate::error::UnsupportedSnafu {
284                        operation: "drop trigger",
285                    }
286                    .fail()
287                }
288            }
289            Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
290        }
291    }
292}
293
294#[derive(Clone)]
295pub struct SubmitDdlTaskRequest {
296    pub query_context: QueryContextRef,
297    pub wait: bool,
298    pub timeout: Duration,
299    pub task: DdlTask,
300}
301
302impl SubmitDdlTaskRequest {
303    /// The default constructor for [`SubmitDdlTaskRequest`].
304    pub fn new(query_context: QueryContextRef, task: DdlTask) -> Self {
305        Self {
306            query_context,
307            wait: Self::default_wait(),
308            timeout: Self::default_timeout(),
309            task,
310        }
311    }
312
313    /// The default timeout for a DDL task.
314    pub fn default_timeout() -> Duration {
315        Duration::from_secs(60)
316    }
317
318    /// The default wait for a DDL task.
319    pub fn default_wait() -> bool {
320        true
321    }
322}
323
324impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
325    type Error = error::Error;
326
327    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
328        let task = match request.task {
329            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
330            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
331            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
332            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
333            DdlTask::CreateLogicalTables(tasks) => {
334                let tasks = tasks
335                    .into_iter()
336                    .map(|task| task.try_into())
337                    .collect::<Result<Vec<_>>>()?;
338
339                Task::CreateTableTasks(PbCreateTableTasks { tasks })
340            }
341            DdlTask::DropLogicalTables(tasks) => {
342                let tasks = tasks
343                    .into_iter()
344                    .map(|task| task.into())
345                    .collect::<Vec<_>>();
346
347                Task::DropTableTasks(PbDropTableTasks { tasks })
348            }
349            DdlTask::AlterLogicalTables(tasks) => {
350                let tasks = tasks
351                    .into_iter()
352                    .map(|task| task.try_into())
353                    .collect::<Result<Vec<_>>>()?;
354
355                Task::AlterTableTasks(PbAlterTableTasks { tasks })
356            }
357            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
358            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
359            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
360            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
361            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
362            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
363            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
364            #[cfg(feature = "enterprise")]
365            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
366            #[cfg(feature = "enterprise")]
367            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
368            DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
369        };
370
371        Ok(Self {
372            header: None,
373            query_context: Some((*request.query_context).clone().into()),
374            timeout_secs: request.timeout.as_secs() as u32,
375            wait: request.wait,
376            task: Some(task),
377        })
378    }
379}
380
381#[derive(Debug, Default)]
382pub struct SubmitDdlTaskResponse {
383    pub key: Vec<u8>,
384    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
385    pub table_ids: Vec<TableId>,
386}
387
388impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
389    type Error = error::Error;
390
391    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
392        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
393        Ok(Self {
394            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
395            table_ids,
396        })
397    }
398}
399
400impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
401    fn from(val: SubmitDdlTaskResponse) -> Self {
402        Self {
403            pid: Some(ProcedureId { key: val.key }),
404            table_ids: val
405                .table_ids
406                .into_iter()
407                .map(|id| api::v1::TableId { id })
408                .collect(),
409            ..Default::default()
410        }
411    }
412}
413
414/// A `CREATE VIEW` task.
415#[derive(Debug, PartialEq, Clone)]
416pub struct CreateViewTask {
417    pub create_view: CreateViewExpr,
418    pub view_info: RawTableInfo,
419}
420
421impl CreateViewTask {
422    /// Returns the [`TableReference`] of view.
423    pub fn table_ref(&self) -> TableReference<'_> {
424        TableReference {
425            catalog: &self.create_view.catalog_name,
426            schema: &self.create_view.schema_name,
427            table: &self.create_view.view_name,
428        }
429    }
430
431    /// Returns the encoded logical plan
432    pub fn raw_logical_plan(&self) -> &Vec<u8> {
433        &self.create_view.logical_plan
434    }
435
436    /// Returns the view definition in SQL
437    pub fn view_definition(&self) -> &str {
438        &self.create_view.definition
439    }
440
441    /// Returns the resolved table names in view's logical plan
442    pub fn table_names(&self) -> HashSet<TableName> {
443        self.create_view
444            .table_names
445            .iter()
446            .map(|t| t.clone().into())
447            .collect()
448    }
449
450    /// Returns the view's columns
451    pub fn columns(&self) -> &Vec<String> {
452        &self.create_view.columns
453    }
454
455    /// Returns the original logical plan's columns
456    pub fn plan_columns(&self) -> &Vec<String> {
457        &self.create_view.plan_columns
458    }
459}
460
461impl TryFrom<PbCreateViewTask> for CreateViewTask {
462    type Error = error::Error;
463
464    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
465        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
466
467        Ok(CreateViewTask {
468            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
469                err_msg: "expected create view",
470            })?,
471            view_info,
472        })
473    }
474}
475
476impl TryFrom<CreateViewTask> for PbCreateViewTask {
477    type Error = error::Error;
478
479    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
480        Ok(PbCreateViewTask {
481            create_view: Some(task.create_view),
482            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
483        })
484    }
485}
486
487impl Serialize for CreateViewTask {
488    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
489    where
490        S: serde::Serializer,
491    {
492        let view_info = serde_json::to_vec(&self.view_info)
493            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
494
495        let pb = PbCreateViewTask {
496            create_view: Some(self.create_view.clone()),
497            view_info,
498        };
499        let buf = pb.encode_to_vec();
500        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
501        serializer.serialize_str(&encoded)
502    }
503}
504
505impl<'de> Deserialize<'de> for CreateViewTask {
506    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
507    where
508        D: serde::Deserializer<'de>,
509    {
510        let encoded = String::deserialize(deserializer)?;
511        let buf = general_purpose::STANDARD_NO_PAD
512            .decode(encoded)
513            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
514        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
515            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
516
517        let expr = CreateViewTask::try_from(expr)
518            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
519
520        Ok(expr)
521    }
522}
523
524/// A `DROP VIEW` task.
525#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
526pub struct DropViewTask {
527    pub catalog: String,
528    pub schema: String,
529    pub view: String,
530    pub view_id: TableId,
531    pub drop_if_exists: bool,
532}
533
534impl DropViewTask {
535    /// Returns the [`TableReference`] of view.
536    pub fn table_ref(&self) -> TableReference<'_> {
537        TableReference {
538            catalog: &self.catalog,
539            schema: &self.schema,
540            table: &self.view,
541        }
542    }
543}
544
545impl TryFrom<PbDropViewTask> for DropViewTask {
546    type Error = error::Error;
547
548    fn try_from(pb: PbDropViewTask) -> Result<Self> {
549        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
550            err_msg: "expected drop view",
551        })?;
552
553        Ok(DropViewTask {
554            catalog: expr.catalog_name,
555            schema: expr.schema_name,
556            view: expr.view_name,
557            view_id: expr
558                .view_id
559                .context(error::InvalidProtoMsgSnafu {
560                    err_msg: "expected view_id",
561                })?
562                .id,
563            drop_if_exists: expr.drop_if_exists,
564        })
565    }
566}
567
568impl From<DropViewTask> for PbDropViewTask {
569    fn from(task: DropViewTask) -> Self {
570        PbDropViewTask {
571            drop_view: Some(DropViewExpr {
572                catalog_name: task.catalog,
573                schema_name: task.schema,
574                view_name: task.view,
575                view_id: Some(api::v1::TableId { id: task.view_id }),
576                drop_if_exists: task.drop_if_exists,
577            }),
578        }
579    }
580}
581
582#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
583pub struct DropTableTask {
584    pub catalog: String,
585    pub schema: String,
586    pub table: String,
587    pub table_id: TableId,
588    #[serde(default)]
589    pub drop_if_exists: bool,
590}
591
592impl DropTableTask {
593    pub fn table_ref(&self) -> TableReference<'_> {
594        TableReference {
595            catalog: &self.catalog,
596            schema: &self.schema,
597            table: &self.table,
598        }
599    }
600
601    pub fn table_name(&self) -> TableName {
602        TableName {
603            catalog_name: self.catalog.clone(),
604            schema_name: self.schema.clone(),
605            table_name: self.table.clone(),
606        }
607    }
608}
609
610impl TryFrom<PbDropTableTask> for DropTableTask {
611    type Error = error::Error;
612
613    fn try_from(pb: PbDropTableTask) -> Result<Self> {
614        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
615            err_msg: "expected drop table",
616        })?;
617
618        Ok(Self {
619            catalog: drop_table.catalog_name,
620            schema: drop_table.schema_name,
621            table: drop_table.table_name,
622            table_id: drop_table
623                .table_id
624                .context(error::InvalidProtoMsgSnafu {
625                    err_msg: "expected table_id",
626                })?
627                .id,
628            drop_if_exists: drop_table.drop_if_exists,
629        })
630    }
631}
632
633impl From<DropTableTask> for PbDropTableTask {
634    fn from(task: DropTableTask) -> Self {
635        PbDropTableTask {
636            drop_table: Some(DropTableExpr {
637                catalog_name: task.catalog,
638                schema_name: task.schema,
639                table_name: task.table,
640                table_id: Some(api::v1::TableId { id: task.table_id }),
641                drop_if_exists: task.drop_if_exists,
642            }),
643        }
644    }
645}
646
647#[derive(Debug, PartialEq, Clone)]
648pub struct CreateTableTask {
649    pub create_table: CreateTableExpr,
650    pub partitions: Vec<Partition>,
651    pub table_info: RawTableInfo,
652}
653
654impl TryFrom<PbCreateTableTask> for CreateTableTask {
655    type Error = error::Error;
656
657    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
658        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
659
660        Ok(CreateTableTask::new(
661            pb.create_table.context(error::InvalidProtoMsgSnafu {
662                err_msg: "expected create table",
663            })?,
664            pb.partitions,
665            table_info,
666        ))
667    }
668}
669
670impl TryFrom<CreateTableTask> for PbCreateTableTask {
671    type Error = error::Error;
672
673    fn try_from(task: CreateTableTask) -> Result<Self> {
674        Ok(PbCreateTableTask {
675            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
676            create_table: Some(task.create_table),
677            partitions: task.partitions,
678        })
679    }
680}
681
682impl CreateTableTask {
683    pub fn new(
684        expr: CreateTableExpr,
685        partitions: Vec<Partition>,
686        table_info: RawTableInfo,
687    ) -> CreateTableTask {
688        CreateTableTask {
689            create_table: expr,
690            partitions,
691            table_info,
692        }
693    }
694
695    pub fn table_name(&self) -> TableName {
696        let table = &self.create_table;
697
698        TableName {
699            catalog_name: table.catalog_name.clone(),
700            schema_name: table.schema_name.clone(),
701            table_name: table.table_name.clone(),
702        }
703    }
704
705    pub fn table_ref(&self) -> TableReference<'_> {
706        let table = &self.create_table;
707
708        TableReference {
709            catalog: &table.catalog_name,
710            schema: &table.schema_name,
711            table: &table.table_name,
712        }
713    }
714
715    /// Sets the `table_info`'s table_id.
716    pub fn set_table_id(&mut self, table_id: TableId) {
717        self.table_info.ident.table_id = table_id;
718    }
719
720    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
721    ///
722    /// This function won't do any check or verification. Caller should
723    /// ensure this task is valid.
724    pub fn sort_columns(&mut self) {
725        // sort create table expr
726        // sort column_defs by name
727        self.create_table
728            .column_defs
729            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
730
731        self.table_info.sort_columns();
732    }
733}
734
735impl Serialize for CreateTableTask {
736    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
737    where
738        S: serde::Serializer,
739    {
740        let table_info = serde_json::to_vec(&self.table_info)
741            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
742
743        let pb = PbCreateTableTask {
744            create_table: Some(self.create_table.clone()),
745            partitions: self.partitions.clone(),
746            table_info,
747        };
748        let buf = pb.encode_to_vec();
749        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
750        serializer.serialize_str(&encoded)
751    }
752}
753
754impl<'de> Deserialize<'de> for CreateTableTask {
755    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
756    where
757        D: serde::Deserializer<'de>,
758    {
759        let encoded = String::deserialize(deserializer)?;
760        let buf = general_purpose::STANDARD_NO_PAD
761            .decode(encoded)
762            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
763        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
764            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
765
766        let expr = CreateTableTask::try_from(expr)
767            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
768
769        Ok(expr)
770    }
771}
772
773#[derive(Debug, PartialEq, Clone)]
774pub struct AlterTableTask {
775    // TODO(CookiePieWw): Replace proto struct with user-defined struct
776    pub alter_table: AlterTableExpr,
777}
778
779impl AlterTableTask {
780    pub fn validate(&self) -> Result<()> {
781        self.alter_table
782            .kind
783            .as_ref()
784            .context(error::UnexpectedSnafu {
785                err_msg: "'kind' is absent",
786            })?;
787        Ok(())
788    }
789
790    pub fn table_ref(&self) -> TableReference<'_> {
791        TableReference {
792            catalog: &self.alter_table.catalog_name,
793            schema: &self.alter_table.schema_name,
794            table: &self.alter_table.table_name,
795        }
796    }
797
798    pub fn table_name(&self) -> TableName {
799        let table = &self.alter_table;
800
801        TableName {
802            catalog_name: table.catalog_name.clone(),
803            schema_name: table.schema_name.clone(),
804            table_name: table.table_name.clone(),
805        }
806    }
807}
808
809impl TryFrom<PbAlterTableTask> for AlterTableTask {
810    type Error = error::Error;
811
812    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
813        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
814            err_msg: "expected alter_table",
815        })?;
816
817        Ok(AlterTableTask { alter_table })
818    }
819}
820
821impl TryFrom<AlterTableTask> for PbAlterTableTask {
822    type Error = error::Error;
823
824    fn try_from(task: AlterTableTask) -> Result<Self> {
825        Ok(PbAlterTableTask {
826            alter_table: Some(task.alter_table),
827        })
828    }
829}
830
831impl Serialize for AlterTableTask {
832    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
833    where
834        S: serde::Serializer,
835    {
836        let pb = PbAlterTableTask {
837            alter_table: Some(self.alter_table.clone()),
838        };
839        let buf = pb.encode_to_vec();
840        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
841        serializer.serialize_str(&encoded)
842    }
843}
844
845impl<'de> Deserialize<'de> for AlterTableTask {
846    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
847    where
848        D: serde::Deserializer<'de>,
849    {
850        let encoded = String::deserialize(deserializer)?;
851        let buf = general_purpose::STANDARD_NO_PAD
852            .decode(encoded)
853            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
854        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
855            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
856
857        let expr = AlterTableTask::try_from(expr)
858            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
859
860        Ok(expr)
861    }
862}
863
864#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
865pub struct TruncateTableTask {
866    pub catalog: String,
867    pub schema: String,
868    pub table: String,
869    pub table_id: TableId,
870    pub time_ranges: Vec<(Timestamp, Timestamp)>,
871}
872
873impl TruncateTableTask {
874    pub fn table_ref(&self) -> TableReference<'_> {
875        TableReference {
876            catalog: &self.catalog,
877            schema: &self.schema,
878            table: &self.table,
879        }
880    }
881
882    pub fn table_name(&self) -> TableName {
883        TableName {
884            catalog_name: self.catalog.clone(),
885            schema_name: self.schema.clone(),
886            table_name: self.table.clone(),
887        }
888    }
889}
890
891impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
892    type Error = error::Error;
893
894    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
895        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
896            err_msg: "expected truncate table",
897        })?;
898
899        Ok(Self {
900            catalog: truncate_table.catalog_name,
901            schema: truncate_table.schema_name,
902            table: truncate_table.table_name,
903            table_id: truncate_table
904                .table_id
905                .context(error::InvalidProtoMsgSnafu {
906                    err_msg: "expected table_id",
907                })?
908                .id,
909            time_ranges: truncate_table
910                .time_ranges
911                .map(from_pb_time_ranges)
912                .transpose()
913                .map_err(BoxedError::new)
914                .context(ExternalSnafu)?
915                .unwrap_or_default(),
916        })
917    }
918}
919
920impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
921    type Error = error::Error;
922
923    fn try_from(task: TruncateTableTask) -> Result<Self> {
924        Ok(PbTruncateTableTask {
925            truncate_table: Some(TruncateTableExpr {
926                catalog_name: task.catalog,
927                schema_name: task.schema,
928                table_name: task.table,
929                table_id: Some(api::v1::TableId { id: task.table_id }),
930                time_ranges: Some(
931                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
932                ),
933            }),
934        })
935    }
936}
937
938#[serde_as]
939#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
940pub struct CreateDatabaseTask {
941    pub catalog: String,
942    pub schema: String,
943    pub create_if_not_exists: bool,
944    #[serde_as(deserialize_as = "DefaultOnNull")]
945    pub options: HashMap<String, String>,
946}
947
948impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
949    type Error = error::Error;
950
951    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
952        let CreateDatabaseExpr {
953            catalog_name,
954            schema_name,
955            create_if_not_exists,
956            options,
957        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
958            err_msg: "expected create database",
959        })?;
960
961        Ok(CreateDatabaseTask {
962            catalog: catalog_name,
963            schema: schema_name,
964            create_if_not_exists,
965            options,
966        })
967    }
968}
969
970impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
971    type Error = error::Error;
972
973    fn try_from(
974        CreateDatabaseTask {
975            catalog,
976            schema,
977            create_if_not_exists,
978            options,
979        }: CreateDatabaseTask,
980    ) -> Result<Self> {
981        Ok(PbCreateDatabaseTask {
982            create_database: Some(CreateDatabaseExpr {
983                catalog_name: catalog,
984                schema_name: schema,
985                create_if_not_exists,
986                options,
987            }),
988        })
989    }
990}
991
992#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
993pub struct DropDatabaseTask {
994    pub catalog: String,
995    pub schema: String,
996    pub drop_if_exists: bool,
997}
998
999impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
1000    type Error = error::Error;
1001
1002    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1003        let DropDatabaseExpr {
1004            catalog_name,
1005            schema_name,
1006            drop_if_exists,
1007        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1008            err_msg: "expected drop database",
1009        })?;
1010
1011        Ok(DropDatabaseTask {
1012            catalog: catalog_name,
1013            schema: schema_name,
1014            drop_if_exists,
1015        })
1016    }
1017}
1018
1019impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1020    type Error = error::Error;
1021
1022    fn try_from(
1023        DropDatabaseTask {
1024            catalog,
1025            schema,
1026            drop_if_exists,
1027        }: DropDatabaseTask,
1028    ) -> Result<Self> {
1029        Ok(PbDropDatabaseTask {
1030            drop_database: Some(DropDatabaseExpr {
1031                catalog_name: catalog,
1032                schema_name: schema,
1033                drop_if_exists,
1034            }),
1035        })
1036    }
1037}
1038
1039#[derive(Debug, PartialEq, Clone)]
1040pub struct AlterDatabaseTask {
1041    pub alter_expr: AlterDatabaseExpr,
1042}
1043
1044impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1045    type Error = error::Error;
1046
1047    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1048        Ok(PbAlterDatabaseTask {
1049            task: Some(task.alter_expr),
1050        })
1051    }
1052}
1053
1054impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1055    type Error = error::Error;
1056
1057    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1058        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1059            err_msg: "expected alter database",
1060        })?;
1061
1062        Ok(AlterDatabaseTask { alter_expr })
1063    }
1064}
1065
1066impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1067    type Error = error::Error;
1068
1069    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1070        match pb {
1071            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1072                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1073                    options
1074                        .set_database_options
1075                        .into_iter()
1076                        .map(SetDatabaseOption::try_from)
1077                        .collect::<Result<Vec<_>>>()?,
1078                )))
1079            }
1080            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1081                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1082                    options
1083                        .keys
1084                        .iter()
1085                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1086                        .collect::<Result<Vec<_>>>()?,
1087                )),
1088            ),
1089        }
1090    }
1091}
1092
1093const TTL_KEY: &str = "ttl";
1094
1095impl TryFrom<PbOption> for SetDatabaseOption {
1096    type Error = error::Error;
1097
1098    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1099        let key_lower = key.to_ascii_lowercase();
1100        match key_lower.as_str() {
1101            TTL_KEY => {
1102                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1103                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1104
1105                Ok(SetDatabaseOption::Ttl(ttl))
1106            }
1107            _ => {
1108                if validate_database_option(&key_lower) {
1109                    Ok(SetDatabaseOption::Other(key_lower, value))
1110                } else {
1111                    InvalidSetDatabaseOptionSnafu { key, value }.fail()
1112                }
1113            }
1114        }
1115    }
1116}
1117
1118#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1119pub enum SetDatabaseOption {
1120    Ttl(DatabaseTimeToLive),
1121    Other(String, String),
1122}
1123
1124#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1125pub enum UnsetDatabaseOption {
1126    Ttl,
1127    Other(String),
1128}
1129
1130impl TryFrom<&str> for UnsetDatabaseOption {
1131    type Error = error::Error;
1132
1133    fn try_from(key: &str) -> Result<Self> {
1134        let key_lower = key.to_ascii_lowercase();
1135        match key_lower.as_str() {
1136            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1137            _ => {
1138                if validate_database_option(&key_lower) {
1139                    Ok(UnsetDatabaseOption::Other(key_lower))
1140                } else {
1141                    InvalidUnsetDatabaseOptionSnafu { key }.fail()
1142                }
1143            }
1144        }
1145    }
1146}
1147
1148#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1149pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1150
1151#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1152pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1153
1154#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1155pub enum AlterDatabaseKind {
1156    SetDatabaseOptions(SetDatabaseOptions),
1157    UnsetDatabaseOptions(UnsetDatabaseOptions),
1158}
1159
1160impl AlterDatabaseTask {
1161    pub fn catalog(&self) -> &str {
1162        &self.alter_expr.catalog_name
1163    }
1164
1165    pub fn schema(&self) -> &str {
1166        &self.alter_expr.catalog_name
1167    }
1168}
1169
1170/// Create flow
1171#[derive(Debug, Clone, Serialize, Deserialize)]
1172pub struct CreateFlowTask {
1173    pub catalog_name: String,
1174    pub flow_name: String,
1175    pub source_table_names: Vec<TableName>,
1176    pub sink_table_name: TableName,
1177    pub or_replace: bool,
1178    pub create_if_not_exists: bool,
1179    /// Duration in seconds. Data older than this duration will not be used.
1180    pub expire_after: Option<i64>,
1181    pub eval_interval_secs: Option<i64>,
1182    pub comment: String,
1183    pub sql: String,
1184    pub flow_options: HashMap<String, String>,
1185}
1186
1187impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1188    type Error = error::Error;
1189
1190    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1191        let CreateFlowExpr {
1192            catalog_name,
1193            flow_name,
1194            source_table_names,
1195            sink_table_name,
1196            or_replace,
1197            create_if_not_exists,
1198            expire_after,
1199            eval_interval,
1200            comment,
1201            sql,
1202            flow_options,
1203        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1204            err_msg: "expected create_flow",
1205        })?;
1206
1207        Ok(CreateFlowTask {
1208            catalog_name,
1209            flow_name,
1210            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1211            sink_table_name: sink_table_name
1212                .context(error::InvalidProtoMsgSnafu {
1213                    err_msg: "expected sink_table_name",
1214                })?
1215                .into(),
1216            or_replace,
1217            create_if_not_exists,
1218            expire_after: expire_after.map(|e| e.value),
1219            eval_interval_secs: eval_interval.map(|e| e.seconds),
1220            comment,
1221            sql,
1222            flow_options,
1223        })
1224    }
1225}
1226
1227impl From<CreateFlowTask> for PbCreateFlowTask {
1228    fn from(
1229        CreateFlowTask {
1230            catalog_name,
1231            flow_name,
1232            source_table_names,
1233            sink_table_name,
1234            or_replace,
1235            create_if_not_exists,
1236            expire_after,
1237            eval_interval_secs: eval_interval,
1238            comment,
1239            sql,
1240            flow_options,
1241        }: CreateFlowTask,
1242    ) -> Self {
1243        PbCreateFlowTask {
1244            create_flow: Some(CreateFlowExpr {
1245                catalog_name,
1246                flow_name,
1247                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1248                sink_table_name: Some(sink_table_name.into()),
1249                or_replace,
1250                create_if_not_exists,
1251                expire_after: expire_after.map(|value| ExpireAfter { value }),
1252                eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1253                comment,
1254                sql,
1255                flow_options,
1256            }),
1257        }
1258    }
1259}
1260
1261/// Drop flow
1262#[derive(Debug, Clone, Serialize, Deserialize)]
1263pub struct DropFlowTask {
1264    pub catalog_name: String,
1265    pub flow_name: String,
1266    pub flow_id: FlowId,
1267    pub drop_if_exists: bool,
1268}
1269
1270impl TryFrom<PbDropFlowTask> for DropFlowTask {
1271    type Error = error::Error;
1272
1273    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1274        let DropFlowExpr {
1275            catalog_name,
1276            flow_name,
1277            flow_id,
1278            drop_if_exists,
1279        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1280            err_msg: "expected drop_flow",
1281        })?;
1282        let flow_id = flow_id
1283            .context(error::InvalidProtoMsgSnafu {
1284                err_msg: "expected flow_id",
1285            })?
1286            .id;
1287        Ok(DropFlowTask {
1288            catalog_name,
1289            flow_name,
1290            flow_id,
1291            drop_if_exists,
1292        })
1293    }
1294}
1295
1296impl From<DropFlowTask> for PbDropFlowTask {
1297    fn from(
1298        DropFlowTask {
1299            catalog_name,
1300            flow_name,
1301            flow_id,
1302            drop_if_exists,
1303        }: DropFlowTask,
1304    ) -> Self {
1305        PbDropFlowTask {
1306            drop_flow: Some(DropFlowExpr {
1307                catalog_name,
1308                flow_name,
1309                flow_id: Some(api::v1::FlowId { id: flow_id }),
1310                drop_if_exists,
1311            }),
1312        }
1313    }
1314}
1315
1316/// Represents the ID of the object being commented on (Table or Flow).
1317#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1318pub enum CommentObjectId {
1319    Table(TableId),
1320    Flow(FlowId),
1321}
1322
1323/// Comment on table, column, or flow
1324#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1325pub struct CommentOnTask {
1326    pub catalog_name: String,
1327    pub schema_name: String,
1328    pub object_type: CommentObjectType,
1329    pub object_name: String,
1330    /// Column name (only for Column comments)
1331    pub column_name: Option<String>,
1332    /// Object ID (Table or Flow) for validation and cache invalidation
1333    pub object_id: Option<CommentObjectId>,
1334    pub comment: Option<String>,
1335}
1336
1337#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1338pub enum CommentObjectType {
1339    Table,
1340    Column,
1341    Flow,
1342}
1343
1344impl CommentOnTask {
1345    pub fn table_ref(&self) -> TableReference<'_> {
1346        TableReference {
1347            catalog: &self.catalog_name,
1348            schema: &self.schema_name,
1349            table: &self.object_name,
1350        }
1351    }
1352}
1353
1354// Proto conversions for CommentObjectType
1355impl From<CommentObjectType> for PbCommentObjectType {
1356    fn from(object_type: CommentObjectType) -> Self {
1357        match object_type {
1358            CommentObjectType::Table => PbCommentObjectType::Table,
1359            CommentObjectType::Column => PbCommentObjectType::Column,
1360            CommentObjectType::Flow => PbCommentObjectType::Flow,
1361        }
1362    }
1363}
1364
1365impl TryFrom<i32> for CommentObjectType {
1366    type Error = error::Error;
1367
1368    fn try_from(value: i32) -> Result<Self> {
1369        match value {
1370            0 => Ok(CommentObjectType::Table),
1371            1 => Ok(CommentObjectType::Column),
1372            2 => Ok(CommentObjectType::Flow),
1373            _ => error::InvalidProtoMsgSnafu {
1374                err_msg: format!(
1375                    "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1376                    value
1377                ),
1378            }
1379            .fail(),
1380        }
1381    }
1382}
1383
1384// Proto conversions for CommentOnTask
1385impl TryFrom<PbCommentOnTask> for CommentOnTask {
1386    type Error = error::Error;
1387
1388    fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1389        let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1390            err_msg: "expected comment_on",
1391        })?;
1392
1393        Ok(CommentOnTask {
1394            catalog_name: comment_on.catalog_name,
1395            schema_name: comment_on.schema_name,
1396            object_type: comment_on.object_type.try_into()?,
1397            object_name: comment_on.object_name,
1398            column_name: if comment_on.column_name.is_empty() {
1399                None
1400            } else {
1401                Some(comment_on.column_name)
1402            },
1403            comment: if comment_on.comment.is_empty() {
1404                None
1405            } else {
1406                Some(comment_on.comment)
1407            },
1408            object_id: None,
1409        })
1410    }
1411}
1412
1413impl From<CommentOnTask> for PbCommentOnTask {
1414    fn from(task: CommentOnTask) -> Self {
1415        let pb_object_type: PbCommentObjectType = task.object_type.into();
1416        PbCommentOnTask {
1417            comment_on: Some(CommentOnExpr {
1418                catalog_name: task.catalog_name,
1419                schema_name: task.schema_name,
1420                object_type: pb_object_type as i32,
1421                object_name: task.object_name,
1422                column_name: task.column_name.unwrap_or_default(),
1423                comment: task.comment.unwrap_or_default(),
1424            }),
1425        }
1426    }
1427}
1428
1429#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1430pub struct QueryContext {
1431    pub(crate) current_catalog: String,
1432    pub(crate) current_schema: String,
1433    pub(crate) timezone: String,
1434    pub(crate) extensions: HashMap<String, String>,
1435    pub(crate) channel: u8,
1436}
1437
1438impl QueryContext {
1439    /// Get the current catalog
1440    pub fn current_catalog(&self) -> &str {
1441        &self.current_catalog
1442    }
1443
1444    /// Get the current schema
1445    pub fn current_schema(&self) -> &str {
1446        &self.current_schema
1447    }
1448
1449    /// Get the timezone
1450    pub fn timezone(&self) -> &str {
1451        &self.timezone
1452    }
1453
1454    /// Get the extensions
1455    pub fn extensions(&self) -> &HashMap<String, String> {
1456        &self.extensions
1457    }
1458
1459    /// Get the channel
1460    pub fn channel(&self) -> u8 {
1461        self.channel
1462    }
1463}
1464
1465/// Lightweight query context for flow operations containing only essential fields.
1466/// This is a subset of QueryContext that includes only the fields actually needed
1467/// for flow creation and execution.
1468#[derive(Debug, Clone, Serialize, PartialEq)]
1469pub struct FlowQueryContext {
1470    /// Current catalog name - needed for flow metadata and recovery
1471    pub(crate) catalog: String,
1472    /// Current schema name - needed for table resolution during flow execution
1473    pub(crate) schema: String,
1474    /// Timezone for timestamp operations in the flow
1475    pub(crate) timezone: String,
1476}
1477
1478impl<'de> Deserialize<'de> for FlowQueryContext {
1479    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1480    where
1481        D: serde::Deserializer<'de>,
1482    {
1483        // Support both QueryContext format and FlowQueryContext format
1484        #[derive(Deserialize)]
1485        #[serde(untagged)]
1486        enum ContextCompat {
1487            Flow(FlowQueryContextHelper),
1488            Full(QueryContext),
1489        }
1490
1491        #[derive(Deserialize)]
1492        struct FlowQueryContextHelper {
1493            catalog: String,
1494            schema: String,
1495            timezone: String,
1496        }
1497
1498        match ContextCompat::deserialize(deserializer)? {
1499            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1500                catalog: helper.catalog,
1501                schema: helper.schema,
1502                timezone: helper.timezone,
1503            }),
1504            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1505        }
1506    }
1507}
1508
1509impl From<QueryContextRef> for QueryContext {
1510    fn from(query_context: QueryContextRef) -> Self {
1511        QueryContext {
1512            current_catalog: query_context.current_catalog().to_string(),
1513            current_schema: query_context.current_schema().clone(),
1514            timezone: query_context.timezone().to_string(),
1515            extensions: query_context.extensions(),
1516            channel: query_context.channel() as u8,
1517        }
1518    }
1519}
1520
1521impl TryFrom<QueryContext> for session::context::QueryContext {
1522    type Error = error::Error;
1523    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1524        Ok(QueryContextBuilder::default()
1525            .current_catalog(value.current_catalog)
1526            .current_schema(value.current_schema)
1527            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1528            .extensions(value.extensions)
1529            .channel((value.channel as u32).into())
1530            .build())
1531    }
1532}
1533
1534impl From<QueryContext> for PbQueryContext {
1535    fn from(
1536        QueryContext {
1537            current_catalog,
1538            current_schema,
1539            timezone,
1540            extensions,
1541            channel,
1542        }: QueryContext,
1543    ) -> Self {
1544        PbQueryContext {
1545            current_catalog,
1546            current_schema,
1547            timezone,
1548            extensions,
1549            channel: channel as u32,
1550            snapshot_seqs: None,
1551            explain: None,
1552        }
1553    }
1554}
1555
1556impl From<QueryContext> for FlowQueryContext {
1557    fn from(ctx: QueryContext) -> Self {
1558        Self {
1559            catalog: ctx.current_catalog,
1560            schema: ctx.current_schema,
1561            timezone: ctx.timezone,
1562        }
1563    }
1564}
1565
1566impl From<QueryContextRef> for FlowQueryContext {
1567    fn from(ctx: QueryContextRef) -> Self {
1568        Self {
1569            catalog: ctx.current_catalog().to_string(),
1570            schema: ctx.current_schema().clone(),
1571            timezone: ctx.timezone().to_string(),
1572        }
1573    }
1574}
1575
1576impl From<FlowQueryContext> for QueryContext {
1577    fn from(flow_ctx: FlowQueryContext) -> Self {
1578        Self {
1579            current_catalog: flow_ctx.catalog,
1580            current_schema: flow_ctx.schema,
1581            timezone: flow_ctx.timezone,
1582            extensions: HashMap::new(),
1583            channel: 0, // Use default channel for flows
1584        }
1585    }
1586}
1587
1588impl From<FlowQueryContext> for PbQueryContext {
1589    fn from(flow_ctx: FlowQueryContext) -> Self {
1590        let query_ctx: QueryContext = flow_ctx.into();
1591        query_ctx.into()
1592    }
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597    use std::sync::Arc;
1598
1599    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1600    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1601    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1602    use store_api::storage::ConcreteDataType;
1603    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1604    use table::test_util::table_info::test_table_info;
1605
1606    use super::{AlterTableTask, CreateTableTask, *};
1607
1608    #[test]
1609    fn test_basic_ser_de_create_table_task() {
1610        let schema = SchemaBuilder::default().build().unwrap();
1611        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1612        let task = CreateTableTask::new(
1613            CreateTableExpr::default(),
1614            Vec::new(),
1615            RawTableInfo::from(table_info),
1616        );
1617
1618        let output = serde_json::to_vec(&task).unwrap();
1619
1620        let de = serde_json::from_slice(&output).unwrap();
1621        assert_eq!(task, de);
1622    }
1623
1624    #[test]
1625    fn test_basic_ser_de_alter_table_task() {
1626        let task = AlterTableTask {
1627            alter_table: AlterTableExpr::default(),
1628        };
1629
1630        let output = serde_json::to_vec(&task).unwrap();
1631
1632        let de = serde_json::from_slice(&output).unwrap();
1633        assert_eq!(task, de);
1634    }
1635
1636    #[test]
1637    fn test_sort_columns() {
1638        // construct RawSchema
1639        let raw_schema = RawSchema {
1640            column_schemas: vec![
1641                ColumnSchema::new(
1642                    "column3".to_string(),
1643                    ConcreteDataType::string_datatype(),
1644                    true,
1645                ),
1646                ColumnSchema::new(
1647                    "column1".to_string(),
1648                    ConcreteDataType::timestamp_millisecond_datatype(),
1649                    false,
1650                )
1651                .with_time_index(true),
1652                ColumnSchema::new(
1653                    "column2".to_string(),
1654                    ConcreteDataType::float64_datatype(),
1655                    true,
1656                ),
1657            ],
1658            timestamp_index: Some(1),
1659            version: 0,
1660        };
1661
1662        // construct RawTableMeta
1663        let raw_table_meta = RawTableMeta {
1664            schema: raw_schema,
1665            primary_key_indices: vec![0],
1666            value_indices: vec![2],
1667            engine: METRIC_ENGINE_NAME.to_string(),
1668            next_column_id: 0,
1669            options: Default::default(),
1670            created_on: Default::default(),
1671            updated_on: Default::default(),
1672            partition_key_indices: Default::default(),
1673            column_ids: Default::default(),
1674        };
1675
1676        // construct RawTableInfo
1677        let raw_table_info = RawTableInfo {
1678            ident: Default::default(),
1679            meta: raw_table_meta,
1680            name: Default::default(),
1681            desc: Default::default(),
1682            catalog_name: Default::default(),
1683            schema_name: Default::default(),
1684            table_type: TableType::Base,
1685        };
1686
1687        // construct create table expr
1688        let create_table_expr = CreateTableExpr {
1689            column_defs: vec![
1690                ColumnDef {
1691                    name: "column3".to_string(),
1692                    semantic_type: SemanticType::Tag as i32,
1693                    ..Default::default()
1694                },
1695                ColumnDef {
1696                    name: "column1".to_string(),
1697                    semantic_type: SemanticType::Timestamp as i32,
1698                    ..Default::default()
1699                },
1700                ColumnDef {
1701                    name: "column2".to_string(),
1702                    semantic_type: SemanticType::Field as i32,
1703                    ..Default::default()
1704                },
1705            ],
1706            primary_keys: vec!["column3".to_string()],
1707            ..Default::default()
1708        };
1709
1710        let mut create_table_task =
1711            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1712
1713        // Call the sort_columns method
1714        create_table_task.sort_columns();
1715
1716        // Assert that the columns are sorted correctly
1717        assert_eq!(
1718            create_table_task.create_table.column_defs[0].name,
1719            "column1".to_string()
1720        );
1721        assert_eq!(
1722            create_table_task.create_table.column_defs[1].name,
1723            "column2".to_string()
1724        );
1725        assert_eq!(
1726            create_table_task.create_table.column_defs[2].name,
1727            "column3".to_string()
1728        );
1729
1730        // Assert that the table_info is updated correctly
1731        assert_eq!(
1732            create_table_task.table_info.meta.schema.timestamp_index,
1733            Some(0)
1734        );
1735        assert_eq!(
1736            create_table_task.table_info.meta.primary_key_indices,
1737            vec![2]
1738        );
1739        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1740    }
1741
1742    #[test]
1743    fn test_flow_query_context_conversion_from_query_context() {
1744        use std::collections::HashMap;
1745        let mut extensions = HashMap::new();
1746        extensions.insert("key1".to_string(), "value1".to_string());
1747        extensions.insert("key2".to_string(), "value2".to_string());
1748
1749        let query_ctx = QueryContext {
1750            current_catalog: "test_catalog".to_string(),
1751            current_schema: "test_schema".to_string(),
1752            timezone: "UTC".to_string(),
1753            extensions,
1754            channel: 5,
1755        };
1756
1757        let flow_ctx: FlowQueryContext = query_ctx.into();
1758
1759        assert_eq!(flow_ctx.catalog, "test_catalog");
1760        assert_eq!(flow_ctx.schema, "test_schema");
1761        assert_eq!(flow_ctx.timezone, "UTC");
1762    }
1763
1764    #[test]
1765    fn test_flow_query_context_conversion_to_query_context() {
1766        let flow_ctx = FlowQueryContext {
1767            catalog: "prod_catalog".to_string(),
1768            schema: "public".to_string(),
1769            timezone: "America/New_York".to_string(),
1770        };
1771
1772        let query_ctx: QueryContext = flow_ctx.clone().into();
1773
1774        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1775        assert_eq!(query_ctx.current_schema, "public");
1776        assert_eq!(query_ctx.timezone, "America/New_York");
1777        assert!(query_ctx.extensions.is_empty());
1778        assert_eq!(query_ctx.channel, 0);
1779
1780        // Test roundtrip conversion
1781        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1782        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1783    }
1784
1785    #[test]
1786    fn test_flow_query_context_conversion_from_query_context_ref() {
1787        use common_time::Timezone;
1788        use session::context::QueryContextBuilder;
1789
1790        let session_ctx = QueryContextBuilder::default()
1791            .current_catalog("session_catalog".to_string())
1792            .current_schema("session_schema".to_string())
1793            .timezone(Timezone::from_tz_string("Europe/London").unwrap())
1794            .build();
1795
1796        let session_ctx_ref = Arc::new(session_ctx);
1797        let flow_ctx: FlowQueryContext = session_ctx_ref.into();
1798
1799        assert_eq!(flow_ctx.catalog, "session_catalog");
1800        assert_eq!(flow_ctx.schema, "session_schema");
1801        assert_eq!(flow_ctx.timezone, "Europe/London");
1802    }
1803
1804    #[test]
1805    fn test_flow_query_context_serialization() {
1806        let flow_ctx = FlowQueryContext {
1807            catalog: "test_catalog".to_string(),
1808            schema: "test_schema".to_string(),
1809            timezone: "UTC".to_string(),
1810        };
1811
1812        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1813        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1814
1815        assert_eq!(flow_ctx, deserialized);
1816
1817        // Verify JSON structure
1818        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1819        assert_eq!(json_value["catalog"], "test_catalog");
1820        assert_eq!(json_value["schema"], "test_schema");
1821        assert_eq!(json_value["timezone"], "UTC");
1822    }
1823
1824    #[test]
1825    fn test_flow_query_context_conversion_to_pb() {
1826        let flow_ctx = FlowQueryContext {
1827            catalog: "pb_catalog".to_string(),
1828            schema: "pb_schema".to_string(),
1829            timezone: "Asia/Tokyo".to_string(),
1830        };
1831
1832        let pb_ctx: PbQueryContext = flow_ctx.into();
1833
1834        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1835        assert_eq!(pb_ctx.current_schema, "pb_schema");
1836        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1837        assert!(pb_ctx.extensions.is_empty());
1838        assert_eq!(pb_ctx.channel, 0);
1839        assert!(pb_ctx.snapshot_seqs.is_none());
1840        assert!(pb_ctx.explain.is_none());
1841    }
1842}