Skip to main content

common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27    AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28    CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29    CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30    CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31    DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32    DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33    DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34    TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37    AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38    CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39    DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40    QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{TableId, TableInfo};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57    InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61/// DDL tasks
62#[derive(Debug, Clone)]
63pub enum DdlTask {
64    CreateTable(CreateTableTask),
65    DropTable(DropTableTask),
66    AlterTable(AlterTableTask),
67    TruncateTable(TruncateTableTask),
68    CreateLogicalTables(Vec<CreateTableTask>),
69    DropLogicalTables(Vec<DropTableTask>),
70    AlterLogicalTables(Vec<AlterTableTask>),
71    CreateDatabase(CreateDatabaseTask),
72    DropDatabase(DropDatabaseTask),
73    AlterDatabase(AlterDatabaseTask),
74    CreateFlow(CreateFlowTask),
75    DropFlow(DropFlowTask),
76    #[cfg(feature = "enterprise")]
77    DropTrigger(trigger::DropTriggerTask),
78    CreateView(CreateViewTask),
79    DropView(DropViewTask),
80    #[cfg(feature = "enterprise")]
81    CreateTrigger(trigger::CreateTriggerTask),
82    CommentOn(CommentOnTask),
83}
84
85impl DdlTask {
86    /// Creates a [`DdlTask`] to create a flow.
87    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
88        DdlTask::CreateFlow(expr)
89    }
90
91    /// Creates a [`DdlTask`] to drop a flow.
92    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
93        DdlTask::DropFlow(expr)
94    }
95
96    /// Creates a [`DdlTask`] to drop a view.
97    pub fn new_drop_view(expr: DropViewTask) -> Self {
98        DdlTask::DropView(expr)
99    }
100
101    /// Creates a [`DdlTask`] to create a table.
102    pub fn new_create_table(
103        expr: CreateTableExpr,
104        partitions: Vec<Partition>,
105        table_info: TableInfo,
106    ) -> Self {
107        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
108    }
109
110    /// Creates a [`DdlTask`] to create several logical tables.
111    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self {
112        DdlTask::CreateLogicalTables(
113            table_data
114                .into_iter()
115                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
116                .collect(),
117        )
118    }
119
120    /// Creates a [`DdlTask`] to alter several logical tables.
121    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
122        DdlTask::AlterLogicalTables(
123            table_data
124                .into_iter()
125                .map(|alter_table| AlterTableTask { alter_table })
126                .collect(),
127        )
128    }
129
130    /// Creates a [`DdlTask`] to drop a table.
131    pub fn new_drop_table(
132        catalog: String,
133        schema: String,
134        table: String,
135        table_id: TableId,
136        drop_if_exists: bool,
137    ) -> Self {
138        DdlTask::DropTable(DropTableTask {
139            catalog,
140            schema,
141            table,
142            table_id,
143            drop_if_exists,
144        })
145    }
146
147    /// Creates a [`DdlTask`] to create a database.
148    pub fn new_create_database(
149        catalog: String,
150        schema: String,
151        create_if_not_exists: bool,
152        options: HashMap<String, String>,
153    ) -> Self {
154        DdlTask::CreateDatabase(CreateDatabaseTask {
155            catalog,
156            schema,
157            create_if_not_exists,
158            options,
159        })
160    }
161
162    /// Creates a [`DdlTask`] to drop a database.
163    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
164        DdlTask::DropDatabase(DropDatabaseTask {
165            catalog,
166            schema,
167            drop_if_exists,
168        })
169    }
170
171    /// Creates a [`DdlTask`] to alter a database.
172    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
173        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
174    }
175
176    /// Creates a [`DdlTask`] to alter a table.
177    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
178        DdlTask::AlterTable(AlterTableTask { alter_table })
179    }
180
181    /// Creates a [`DdlTask`] to truncate a table.
182    pub fn new_truncate_table(
183        catalog: String,
184        schema: String,
185        table: String,
186        table_id: TableId,
187        time_ranges: Vec<(Timestamp, Timestamp)>,
188    ) -> Self {
189        DdlTask::TruncateTable(TruncateTableTask {
190            catalog,
191            schema,
192            table,
193            table_id,
194            time_ranges,
195        })
196    }
197
198    /// Creates a [`DdlTask`] to create a view.
199    pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self {
200        DdlTask::CreateView(CreateViewTask {
201            create_view,
202            view_info,
203        })
204    }
205
206    /// Creates a [`DdlTask`] to comment on a table, column, or flow.
207    pub fn new_comment_on(task: CommentOnTask) -> Self {
208        DdlTask::CommentOn(task)
209    }
210}
211
212impl TryFrom<Task> for DdlTask {
213    type Error = error::Error;
214    fn try_from(task: Task) -> Result<Self> {
215        match task {
216            Task::CreateTableTask(create_table) => {
217                Ok(DdlTask::CreateTable(create_table.try_into()?))
218            }
219            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
220            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
221            Task::TruncateTableTask(truncate_table) => {
222                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
223            }
224            Task::CreateTableTasks(create_tables) => {
225                let tasks = create_tables
226                    .tasks
227                    .into_iter()
228                    .map(|task| task.try_into())
229                    .collect::<Result<Vec<_>>>()?;
230
231                Ok(DdlTask::CreateLogicalTables(tasks))
232            }
233            Task::DropTableTasks(drop_tables) => {
234                let tasks = drop_tables
235                    .tasks
236                    .into_iter()
237                    .map(|task| task.try_into())
238                    .collect::<Result<Vec<_>>>()?;
239
240                Ok(DdlTask::DropLogicalTables(tasks))
241            }
242            Task::AlterTableTasks(alter_tables) => {
243                let tasks = alter_tables
244                    .tasks
245                    .into_iter()
246                    .map(|task| task.try_into())
247                    .collect::<Result<Vec<_>>>()?;
248
249                Ok(DdlTask::AlterLogicalTables(tasks))
250            }
251            Task::CreateDatabaseTask(create_database) => {
252                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
253            }
254            Task::DropDatabaseTask(drop_database) => {
255                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
256            }
257            Task::AlterDatabaseTask(alter_database) => {
258                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
259            }
260            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
261            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
262            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
263            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
264            Task::CreateTriggerTask(create_trigger) => {
265                #[cfg(feature = "enterprise")]
266                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
267                #[cfg(not(feature = "enterprise"))]
268                {
269                    let _ = create_trigger;
270                    crate::error::UnsupportedSnafu {
271                        operation: "create trigger",
272                    }
273                    .fail()
274                }
275            }
276            Task::DropTriggerTask(drop_trigger) => {
277                #[cfg(feature = "enterprise")]
278                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
279                #[cfg(not(feature = "enterprise"))]
280                {
281                    let _ = drop_trigger;
282                    crate::error::UnsupportedSnafu {
283                        operation: "drop trigger",
284                    }
285                    .fail()
286                }
287            }
288            Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
289        }
290    }
291}
292
293#[derive(Clone)]
294pub struct SubmitDdlTaskRequest {
295    pub query_context: QueryContext,
296    pub wait: bool,
297    pub timeout: Duration,
298    pub task: DdlTask,
299}
300
301impl SubmitDdlTaskRequest {
302    /// The default constructor for [`SubmitDdlTaskRequest`].
303    pub fn new(query_context: QueryContext, task: DdlTask) -> Self {
304        Self {
305            query_context,
306            wait: Self::default_wait(),
307            timeout: Self::default_timeout(),
308            task,
309        }
310    }
311
312    /// The default timeout for a DDL task.
313    pub fn default_timeout() -> Duration {
314        Duration::from_secs(60)
315    }
316
317    /// The default wait for a DDL task.
318    pub fn default_wait() -> bool {
319        true
320    }
321}
322
323impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
324    type Error = error::Error;
325
326    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
327        let task = match request.task {
328            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
329            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
330            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
331            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
332            DdlTask::CreateLogicalTables(tasks) => {
333                let tasks = tasks
334                    .into_iter()
335                    .map(|task| task.try_into())
336                    .collect::<Result<Vec<_>>>()?;
337
338                Task::CreateTableTasks(PbCreateTableTasks { tasks })
339            }
340            DdlTask::DropLogicalTables(tasks) => {
341                let tasks = tasks
342                    .into_iter()
343                    .map(|task| task.into())
344                    .collect::<Vec<_>>();
345
346                Task::DropTableTasks(PbDropTableTasks { tasks })
347            }
348            DdlTask::AlterLogicalTables(tasks) => {
349                let tasks = tasks
350                    .into_iter()
351                    .map(|task| task.try_into())
352                    .collect::<Result<Vec<_>>>()?;
353
354                Task::AlterTableTasks(PbAlterTableTasks { tasks })
355            }
356            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
357            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
358            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
359            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
360            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
361            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
362            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
363            #[cfg(feature = "enterprise")]
364            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
365            #[cfg(feature = "enterprise")]
366            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
367            DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
368        };
369
370        Ok(Self {
371            header: None,
372            query_context: Some(request.query_context.into()),
373            timeout_secs: request.timeout.as_secs() as u32,
374            wait: request.wait,
375            task: Some(task),
376        })
377    }
378}
379
380#[derive(Debug, Default)]
381pub struct SubmitDdlTaskResponse {
382    pub key: Vec<u8>,
383    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
384    pub table_ids: Vec<TableId>,
385}
386
387impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
388    type Error = error::Error;
389
390    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
391        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
392        Ok(Self {
393            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
394            table_ids,
395        })
396    }
397}
398
399impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
400    fn from(val: SubmitDdlTaskResponse) -> Self {
401        Self {
402            pid: Some(ProcedureId { key: val.key }),
403            table_ids: val
404                .table_ids
405                .into_iter()
406                .map(|id| api::v1::TableId { id })
407                .collect(),
408            ..Default::default()
409        }
410    }
411}
412
413/// A `CREATE VIEW` task.
414#[derive(Debug, PartialEq, Clone)]
415pub struct CreateViewTask {
416    pub create_view: CreateViewExpr,
417    pub view_info: TableInfo,
418}
419
420impl CreateViewTask {
421    /// Returns the [`TableReference`] of view.
422    pub fn table_ref(&self) -> TableReference<'_> {
423        TableReference {
424            catalog: &self.create_view.catalog_name,
425            schema: &self.create_view.schema_name,
426            table: &self.create_view.view_name,
427        }
428    }
429
430    /// Returns the encoded logical plan
431    pub fn raw_logical_plan(&self) -> &Vec<u8> {
432        &self.create_view.logical_plan
433    }
434
435    /// Returns the view definition in SQL
436    pub fn view_definition(&self) -> &str {
437        &self.create_view.definition
438    }
439
440    /// Returns the resolved table names in view's logical plan
441    pub fn table_names(&self) -> HashSet<TableName> {
442        self.create_view
443            .table_names
444            .iter()
445            .map(|t| t.clone().into())
446            .collect()
447    }
448
449    /// Returns the view's columns
450    pub fn columns(&self) -> &Vec<String> {
451        &self.create_view.columns
452    }
453
454    /// Returns the original logical plan's columns
455    pub fn plan_columns(&self) -> &Vec<String> {
456        &self.create_view.plan_columns
457    }
458}
459
460impl TryFrom<PbCreateViewTask> for CreateViewTask {
461    type Error = error::Error;
462
463    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
464        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
465
466        Ok(CreateViewTask {
467            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
468                err_msg: "expected create view",
469            })?,
470            view_info,
471        })
472    }
473}
474
475impl TryFrom<CreateViewTask> for PbCreateViewTask {
476    type Error = error::Error;
477
478    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
479        Ok(PbCreateViewTask {
480            create_view: Some(task.create_view),
481            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
482        })
483    }
484}
485
486impl Serialize for CreateViewTask {
487    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
488    where
489        S: serde::Serializer,
490    {
491        let view_info = serde_json::to_vec(&self.view_info)
492            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
493
494        let pb = PbCreateViewTask {
495            create_view: Some(self.create_view.clone()),
496            view_info,
497        };
498        let buf = pb.encode_to_vec();
499        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
500        serializer.serialize_str(&encoded)
501    }
502}
503
504impl<'de> Deserialize<'de> for CreateViewTask {
505    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
506    where
507        D: serde::Deserializer<'de>,
508    {
509        let encoded = String::deserialize(deserializer)?;
510        let buf = general_purpose::STANDARD_NO_PAD
511            .decode(encoded)
512            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
513        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
514            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
515
516        let expr = CreateViewTask::try_from(expr)
517            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
518
519        Ok(expr)
520    }
521}
522
523/// A `DROP VIEW` task.
524#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
525pub struct DropViewTask {
526    pub catalog: String,
527    pub schema: String,
528    pub view: String,
529    pub view_id: TableId,
530    pub drop_if_exists: bool,
531}
532
533impl DropViewTask {
534    /// Returns the [`TableReference`] of view.
535    pub fn table_ref(&self) -> TableReference<'_> {
536        TableReference {
537            catalog: &self.catalog,
538            schema: &self.schema,
539            table: &self.view,
540        }
541    }
542}
543
544impl TryFrom<PbDropViewTask> for DropViewTask {
545    type Error = error::Error;
546
547    fn try_from(pb: PbDropViewTask) -> Result<Self> {
548        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
549            err_msg: "expected drop view",
550        })?;
551
552        Ok(DropViewTask {
553            catalog: expr.catalog_name,
554            schema: expr.schema_name,
555            view: expr.view_name,
556            view_id: expr
557                .view_id
558                .context(error::InvalidProtoMsgSnafu {
559                    err_msg: "expected view_id",
560                })?
561                .id,
562            drop_if_exists: expr.drop_if_exists,
563        })
564    }
565}
566
567impl From<DropViewTask> for PbDropViewTask {
568    fn from(task: DropViewTask) -> Self {
569        PbDropViewTask {
570            drop_view: Some(DropViewExpr {
571                catalog_name: task.catalog,
572                schema_name: task.schema,
573                view_name: task.view,
574                view_id: Some(api::v1::TableId { id: task.view_id }),
575                drop_if_exists: task.drop_if_exists,
576            }),
577        }
578    }
579}
580
581#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
582pub struct DropTableTask {
583    pub catalog: String,
584    pub schema: String,
585    pub table: String,
586    pub table_id: TableId,
587    #[serde(default)]
588    pub drop_if_exists: bool,
589}
590
591impl DropTableTask {
592    pub fn table_ref(&self) -> TableReference<'_> {
593        TableReference {
594            catalog: &self.catalog,
595            schema: &self.schema,
596            table: &self.table,
597        }
598    }
599
600    pub fn table_name(&self) -> TableName {
601        TableName {
602            catalog_name: self.catalog.clone(),
603            schema_name: self.schema.clone(),
604            table_name: self.table.clone(),
605        }
606    }
607}
608
609impl TryFrom<PbDropTableTask> for DropTableTask {
610    type Error = error::Error;
611
612    fn try_from(pb: PbDropTableTask) -> Result<Self> {
613        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
614            err_msg: "expected drop table",
615        })?;
616
617        Ok(Self {
618            catalog: drop_table.catalog_name,
619            schema: drop_table.schema_name,
620            table: drop_table.table_name,
621            table_id: drop_table
622                .table_id
623                .context(error::InvalidProtoMsgSnafu {
624                    err_msg: "expected table_id",
625                })?
626                .id,
627            drop_if_exists: drop_table.drop_if_exists,
628        })
629    }
630}
631
632impl From<DropTableTask> for PbDropTableTask {
633    fn from(task: DropTableTask) -> Self {
634        PbDropTableTask {
635            drop_table: Some(DropTableExpr {
636                catalog_name: task.catalog,
637                schema_name: task.schema,
638                table_name: task.table,
639                table_id: Some(api::v1::TableId { id: task.table_id }),
640                drop_if_exists: task.drop_if_exists,
641            }),
642        }
643    }
644}
645
646#[derive(Debug, PartialEq, Clone)]
647pub struct CreateTableTask {
648    pub create_table: CreateTableExpr,
649    pub partitions: Vec<Partition>,
650    pub table_info: TableInfo,
651}
652
653impl TryFrom<PbCreateTableTask> for CreateTableTask {
654    type Error = error::Error;
655
656    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
657        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
658
659        Ok(CreateTableTask::new(
660            pb.create_table.context(error::InvalidProtoMsgSnafu {
661                err_msg: "expected create table",
662            })?,
663            pb.partitions,
664            table_info,
665        ))
666    }
667}
668
669impl TryFrom<CreateTableTask> for PbCreateTableTask {
670    type Error = error::Error;
671
672    fn try_from(task: CreateTableTask) -> Result<Self> {
673        Ok(PbCreateTableTask {
674            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
675            create_table: Some(task.create_table),
676            partitions: task.partitions,
677        })
678    }
679}
680
681impl CreateTableTask {
682    pub fn new(
683        expr: CreateTableExpr,
684        partitions: Vec<Partition>,
685        table_info: TableInfo,
686    ) -> CreateTableTask {
687        CreateTableTask {
688            create_table: expr,
689            partitions,
690            table_info,
691        }
692    }
693
694    pub fn table_name(&self) -> TableName {
695        let table = &self.create_table;
696
697        TableName {
698            catalog_name: table.catalog_name.clone(),
699            schema_name: table.schema_name.clone(),
700            table_name: table.table_name.clone(),
701        }
702    }
703
704    pub fn table_ref(&self) -> TableReference<'_> {
705        let table = &self.create_table;
706
707        TableReference {
708            catalog: &table.catalog_name,
709            schema: &table.schema_name,
710            table: &table.table_name,
711        }
712    }
713
714    /// Sets the `table_info`'s table_id.
715    pub fn set_table_id(&mut self, table_id: TableId) {
716        self.table_info.ident.table_id = table_id;
717    }
718
719    /// Sort the columns in [CreateTableExpr] and [TableInfo].
720    ///
721    /// This function won't do any check or verification. Caller should
722    /// ensure this task is valid.
723    pub fn sort_columns(&mut self) {
724        // sort create table expr
725        // sort column_defs by name
726        self.create_table
727            .column_defs
728            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
729
730        self.table_info.sort_columns();
731    }
732}
733
734impl Serialize for CreateTableTask {
735    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
736    where
737        S: serde::Serializer,
738    {
739        let table_info = serde_json::to_vec(&self.table_info)
740            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
741
742        let pb = PbCreateTableTask {
743            create_table: Some(self.create_table.clone()),
744            partitions: self.partitions.clone(),
745            table_info,
746        };
747        let buf = pb.encode_to_vec();
748        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
749        serializer.serialize_str(&encoded)
750    }
751}
752
753impl<'de> Deserialize<'de> for CreateTableTask {
754    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
755    where
756        D: serde::Deserializer<'de>,
757    {
758        let encoded = String::deserialize(deserializer)?;
759        let buf = general_purpose::STANDARD_NO_PAD
760            .decode(encoded)
761            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
762        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
763            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
764
765        let expr = CreateTableTask::try_from(expr)
766            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
767
768        Ok(expr)
769    }
770}
771
772#[derive(Debug, PartialEq, Clone)]
773pub struct AlterTableTask {
774    // TODO(CookiePieWw): Replace proto struct with user-defined struct
775    pub alter_table: AlterTableExpr,
776}
777
778impl AlterTableTask {
779    pub fn validate(&self) -> Result<()> {
780        self.alter_table
781            .kind
782            .as_ref()
783            .context(error::UnexpectedSnafu {
784                err_msg: "'kind' is absent",
785            })?;
786        Ok(())
787    }
788
789    pub fn table_ref(&self) -> TableReference<'_> {
790        TableReference {
791            catalog: &self.alter_table.catalog_name,
792            schema: &self.alter_table.schema_name,
793            table: &self.alter_table.table_name,
794        }
795    }
796
797    pub fn table_name(&self) -> TableName {
798        let table = &self.alter_table;
799
800        TableName {
801            catalog_name: table.catalog_name.clone(),
802            schema_name: table.schema_name.clone(),
803            table_name: table.table_name.clone(),
804        }
805    }
806}
807
808impl TryFrom<PbAlterTableTask> for AlterTableTask {
809    type Error = error::Error;
810
811    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
812        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
813            err_msg: "expected alter_table",
814        })?;
815
816        Ok(AlterTableTask { alter_table })
817    }
818}
819
820impl TryFrom<AlterTableTask> for PbAlterTableTask {
821    type Error = error::Error;
822
823    fn try_from(task: AlterTableTask) -> Result<Self> {
824        Ok(PbAlterTableTask {
825            alter_table: Some(task.alter_table),
826        })
827    }
828}
829
830impl Serialize for AlterTableTask {
831    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
832    where
833        S: serde::Serializer,
834    {
835        let pb = PbAlterTableTask {
836            alter_table: Some(self.alter_table.clone()),
837        };
838        let buf = pb.encode_to_vec();
839        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
840        serializer.serialize_str(&encoded)
841    }
842}
843
844impl<'de> Deserialize<'de> for AlterTableTask {
845    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
846    where
847        D: serde::Deserializer<'de>,
848    {
849        let encoded = String::deserialize(deserializer)?;
850        let buf = general_purpose::STANDARD_NO_PAD
851            .decode(encoded)
852            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
853        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
854            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
855
856        let expr = AlterTableTask::try_from(expr)
857            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
858
859        Ok(expr)
860    }
861}
862
863#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
864pub struct TruncateTableTask {
865    pub catalog: String,
866    pub schema: String,
867    pub table: String,
868    pub table_id: TableId,
869    pub time_ranges: Vec<(Timestamp, Timestamp)>,
870}
871
872impl TruncateTableTask {
873    pub fn table_ref(&self) -> TableReference<'_> {
874        TableReference {
875            catalog: &self.catalog,
876            schema: &self.schema,
877            table: &self.table,
878        }
879    }
880
881    pub fn table_name(&self) -> TableName {
882        TableName {
883            catalog_name: self.catalog.clone(),
884            schema_name: self.schema.clone(),
885            table_name: self.table.clone(),
886        }
887    }
888}
889
890impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
891    type Error = error::Error;
892
893    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
894        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
895            err_msg: "expected truncate table",
896        })?;
897
898        Ok(Self {
899            catalog: truncate_table.catalog_name,
900            schema: truncate_table.schema_name,
901            table: truncate_table.table_name,
902            table_id: truncate_table
903                .table_id
904                .context(error::InvalidProtoMsgSnafu {
905                    err_msg: "expected table_id",
906                })?
907                .id,
908            time_ranges: truncate_table
909                .time_ranges
910                .map(from_pb_time_ranges)
911                .transpose()
912                .map_err(BoxedError::new)
913                .context(ExternalSnafu)?
914                .unwrap_or_default(),
915        })
916    }
917}
918
919impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
920    type Error = error::Error;
921
922    fn try_from(task: TruncateTableTask) -> Result<Self> {
923        Ok(PbTruncateTableTask {
924            truncate_table: Some(TruncateTableExpr {
925                catalog_name: task.catalog,
926                schema_name: task.schema,
927                table_name: task.table,
928                table_id: Some(api::v1::TableId { id: task.table_id }),
929                time_ranges: Some(
930                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
931                ),
932            }),
933        })
934    }
935}
936
937#[serde_as]
938#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
939pub struct CreateDatabaseTask {
940    pub catalog: String,
941    pub schema: String,
942    pub create_if_not_exists: bool,
943    #[serde_as(deserialize_as = "DefaultOnNull")]
944    pub options: HashMap<String, String>,
945}
946
947impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
948    type Error = error::Error;
949
950    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
951        let CreateDatabaseExpr {
952            catalog_name,
953            schema_name,
954            create_if_not_exists,
955            options,
956        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
957            err_msg: "expected create database",
958        })?;
959
960        Ok(CreateDatabaseTask {
961            catalog: catalog_name,
962            schema: schema_name,
963            create_if_not_exists,
964            options,
965        })
966    }
967}
968
969impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
970    type Error = error::Error;
971
972    fn try_from(
973        CreateDatabaseTask {
974            catalog,
975            schema,
976            create_if_not_exists,
977            options,
978        }: CreateDatabaseTask,
979    ) -> Result<Self> {
980        Ok(PbCreateDatabaseTask {
981            create_database: Some(CreateDatabaseExpr {
982                catalog_name: catalog,
983                schema_name: schema,
984                create_if_not_exists,
985                options,
986            }),
987        })
988    }
989}
990
991#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
992pub struct DropDatabaseTask {
993    pub catalog: String,
994    pub schema: String,
995    pub drop_if_exists: bool,
996}
997
998impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
999    type Error = error::Error;
1000
1001    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1002        let DropDatabaseExpr {
1003            catalog_name,
1004            schema_name,
1005            drop_if_exists,
1006        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1007            err_msg: "expected drop database",
1008        })?;
1009
1010        Ok(DropDatabaseTask {
1011            catalog: catalog_name,
1012            schema: schema_name,
1013            drop_if_exists,
1014        })
1015    }
1016}
1017
1018impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1019    type Error = error::Error;
1020
1021    fn try_from(
1022        DropDatabaseTask {
1023            catalog,
1024            schema,
1025            drop_if_exists,
1026        }: DropDatabaseTask,
1027    ) -> Result<Self> {
1028        Ok(PbDropDatabaseTask {
1029            drop_database: Some(DropDatabaseExpr {
1030                catalog_name: catalog,
1031                schema_name: schema,
1032                drop_if_exists,
1033            }),
1034        })
1035    }
1036}
1037
1038#[derive(Debug, PartialEq, Clone)]
1039pub struct AlterDatabaseTask {
1040    pub alter_expr: AlterDatabaseExpr,
1041}
1042
1043impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1044    type Error = error::Error;
1045
1046    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1047        Ok(PbAlterDatabaseTask {
1048            task: Some(task.alter_expr),
1049        })
1050    }
1051}
1052
1053impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1054    type Error = error::Error;
1055
1056    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1057        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1058            err_msg: "expected alter database",
1059        })?;
1060
1061        Ok(AlterDatabaseTask { alter_expr })
1062    }
1063}
1064
1065impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1066    type Error = error::Error;
1067
1068    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1069        match pb {
1070            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1071                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1072                    options
1073                        .set_database_options
1074                        .into_iter()
1075                        .map(SetDatabaseOption::try_from)
1076                        .collect::<Result<Vec<_>>>()?,
1077                )))
1078            }
1079            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1080                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1081                    options
1082                        .keys
1083                        .iter()
1084                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1085                        .collect::<Result<Vec<_>>>()?,
1086                )),
1087            ),
1088        }
1089    }
1090}
1091
1092const TTL_KEY: &str = "ttl";
1093
1094impl TryFrom<PbOption> for SetDatabaseOption {
1095    type Error = error::Error;
1096
1097    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1098        let key_lower = key.to_ascii_lowercase();
1099        match key_lower.as_str() {
1100            TTL_KEY => {
1101                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1102                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1103
1104                Ok(SetDatabaseOption::Ttl(ttl))
1105            }
1106            _ => {
1107                if validate_database_option(&key_lower) {
1108                    Ok(SetDatabaseOption::Other(key_lower, value))
1109                } else {
1110                    InvalidSetDatabaseOptionSnafu { key, value }.fail()
1111                }
1112            }
1113        }
1114    }
1115}
1116
1117#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1118pub enum SetDatabaseOption {
1119    Ttl(DatabaseTimeToLive),
1120    Other(String, String),
1121}
1122
1123#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1124pub enum UnsetDatabaseOption {
1125    Ttl,
1126    Other(String),
1127}
1128
1129impl TryFrom<&str> for UnsetDatabaseOption {
1130    type Error = error::Error;
1131
1132    fn try_from(key: &str) -> Result<Self> {
1133        let key_lower = key.to_ascii_lowercase();
1134        match key_lower.as_str() {
1135            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1136            _ => {
1137                if validate_database_option(&key_lower) {
1138                    Ok(UnsetDatabaseOption::Other(key_lower))
1139                } else {
1140                    InvalidUnsetDatabaseOptionSnafu { key }.fail()
1141                }
1142            }
1143        }
1144    }
1145}
1146
1147#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1148pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1149
1150#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1151pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1152
1153#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1154pub enum AlterDatabaseKind {
1155    SetDatabaseOptions(SetDatabaseOptions),
1156    UnsetDatabaseOptions(UnsetDatabaseOptions),
1157}
1158
1159impl AlterDatabaseTask {
1160    pub fn catalog(&self) -> &str {
1161        &self.alter_expr.catalog_name
1162    }
1163
1164    pub fn schema(&self) -> &str {
1165        &self.alter_expr.catalog_name
1166    }
1167}
1168
1169/// Create flow
1170#[derive(Debug, Clone, Serialize, Deserialize)]
1171pub struct CreateFlowTask {
1172    pub catalog_name: String,
1173    pub flow_name: String,
1174    pub source_table_names: Vec<TableName>,
1175    pub sink_table_name: TableName,
1176    pub or_replace: bool,
1177    pub create_if_not_exists: bool,
1178    /// Duration in seconds. Data older than this duration will not be used.
1179    pub expire_after: Option<i64>,
1180    pub eval_interval_secs: Option<i64>,
1181    pub comment: String,
1182    pub sql: String,
1183    pub flow_options: HashMap<String, String>,
1184}
1185
1186impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1187    type Error = error::Error;
1188
1189    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1190        let CreateFlowExpr {
1191            catalog_name,
1192            flow_name,
1193            source_table_names,
1194            sink_table_name,
1195            or_replace,
1196            create_if_not_exists,
1197            expire_after,
1198            eval_interval,
1199            comment,
1200            sql,
1201            flow_options,
1202        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1203            err_msg: "expected create_flow",
1204        })?;
1205
1206        Ok(CreateFlowTask {
1207            catalog_name,
1208            flow_name,
1209            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1210            sink_table_name: sink_table_name
1211                .context(error::InvalidProtoMsgSnafu {
1212                    err_msg: "expected sink_table_name",
1213                })?
1214                .into(),
1215            or_replace,
1216            create_if_not_exists,
1217            expire_after: expire_after.map(|e| e.value),
1218            eval_interval_secs: eval_interval.map(|e| e.seconds),
1219            comment,
1220            sql,
1221            flow_options,
1222        })
1223    }
1224}
1225
1226impl From<CreateFlowTask> for PbCreateFlowTask {
1227    fn from(
1228        CreateFlowTask {
1229            catalog_name,
1230            flow_name,
1231            source_table_names,
1232            sink_table_name,
1233            or_replace,
1234            create_if_not_exists,
1235            expire_after,
1236            eval_interval_secs: eval_interval,
1237            comment,
1238            sql,
1239            flow_options,
1240        }: CreateFlowTask,
1241    ) -> Self {
1242        PbCreateFlowTask {
1243            create_flow: Some(CreateFlowExpr {
1244                catalog_name,
1245                flow_name,
1246                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1247                sink_table_name: Some(sink_table_name.into()),
1248                or_replace,
1249                create_if_not_exists,
1250                expire_after: expire_after.map(|value| ExpireAfter { value }),
1251                eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1252                comment,
1253                sql,
1254                flow_options,
1255            }),
1256        }
1257    }
1258}
1259
1260/// Drop flow
1261#[derive(Debug, Clone, Serialize, Deserialize)]
1262pub struct DropFlowTask {
1263    pub catalog_name: String,
1264    pub flow_name: String,
1265    pub flow_id: FlowId,
1266    pub drop_if_exists: bool,
1267}
1268
1269impl TryFrom<PbDropFlowTask> for DropFlowTask {
1270    type Error = error::Error;
1271
1272    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1273        let DropFlowExpr {
1274            catalog_name,
1275            flow_name,
1276            flow_id,
1277            drop_if_exists,
1278        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1279            err_msg: "expected drop_flow",
1280        })?;
1281        let flow_id = flow_id
1282            .context(error::InvalidProtoMsgSnafu {
1283                err_msg: "expected flow_id",
1284            })?
1285            .id;
1286        Ok(DropFlowTask {
1287            catalog_name,
1288            flow_name,
1289            flow_id,
1290            drop_if_exists,
1291        })
1292    }
1293}
1294
1295impl From<DropFlowTask> for PbDropFlowTask {
1296    fn from(
1297        DropFlowTask {
1298            catalog_name,
1299            flow_name,
1300            flow_id,
1301            drop_if_exists,
1302        }: DropFlowTask,
1303    ) -> Self {
1304        PbDropFlowTask {
1305            drop_flow: Some(DropFlowExpr {
1306                catalog_name,
1307                flow_name,
1308                flow_id: Some(api::v1::FlowId { id: flow_id }),
1309                drop_if_exists,
1310            }),
1311        }
1312    }
1313}
1314
1315/// Represents the ID of the object being commented on (Table or Flow).
1316#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1317pub enum CommentObjectId {
1318    Table(TableId),
1319    Flow(FlowId),
1320}
1321
1322/// Comment on table, column, or flow
1323#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1324pub struct CommentOnTask {
1325    pub catalog_name: String,
1326    pub schema_name: String,
1327    pub object_type: CommentObjectType,
1328    pub object_name: String,
1329    /// Column name (only for Column comments)
1330    pub column_name: Option<String>,
1331    /// Object ID (Table or Flow) for validation and cache invalidation
1332    pub object_id: Option<CommentObjectId>,
1333    pub comment: Option<String>,
1334}
1335
1336#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1337pub enum CommentObjectType {
1338    Table,
1339    Column,
1340    Flow,
1341}
1342
1343impl CommentOnTask {
1344    pub fn table_ref(&self) -> TableReference<'_> {
1345        TableReference {
1346            catalog: &self.catalog_name,
1347            schema: &self.schema_name,
1348            table: &self.object_name,
1349        }
1350    }
1351}
1352
1353// Proto conversions for CommentObjectType
1354impl From<CommentObjectType> for PbCommentObjectType {
1355    fn from(object_type: CommentObjectType) -> Self {
1356        match object_type {
1357            CommentObjectType::Table => PbCommentObjectType::Table,
1358            CommentObjectType::Column => PbCommentObjectType::Column,
1359            CommentObjectType::Flow => PbCommentObjectType::Flow,
1360        }
1361    }
1362}
1363
1364impl TryFrom<i32> for CommentObjectType {
1365    type Error = error::Error;
1366
1367    fn try_from(value: i32) -> Result<Self> {
1368        match value {
1369            0 => Ok(CommentObjectType::Table),
1370            1 => Ok(CommentObjectType::Column),
1371            2 => Ok(CommentObjectType::Flow),
1372            _ => error::InvalidProtoMsgSnafu {
1373                err_msg: format!(
1374                    "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1375                    value
1376                ),
1377            }
1378            .fail(),
1379        }
1380    }
1381}
1382
1383// Proto conversions for CommentOnTask
1384impl TryFrom<PbCommentOnTask> for CommentOnTask {
1385    type Error = error::Error;
1386
1387    fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1388        let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1389            err_msg: "expected comment_on",
1390        })?;
1391
1392        Ok(CommentOnTask {
1393            catalog_name: comment_on.catalog_name,
1394            schema_name: comment_on.schema_name,
1395            object_type: comment_on.object_type.try_into()?,
1396            object_name: comment_on.object_name,
1397            column_name: if comment_on.column_name.is_empty() {
1398                None
1399            } else {
1400                Some(comment_on.column_name)
1401            },
1402            comment: if comment_on.comment.is_empty() {
1403                None
1404            } else {
1405                Some(comment_on.comment)
1406            },
1407            object_id: None,
1408        })
1409    }
1410}
1411
1412impl From<CommentOnTask> for PbCommentOnTask {
1413    fn from(task: CommentOnTask) -> Self {
1414        let pb_object_type: PbCommentObjectType = task.object_type.into();
1415        PbCommentOnTask {
1416            comment_on: Some(CommentOnExpr {
1417                catalog_name: task.catalog_name,
1418                schema_name: task.schema_name,
1419                object_type: pb_object_type as i32,
1420                object_name: task.object_name,
1421                column_name: task.column_name.unwrap_or_default(),
1422                comment: task.comment.unwrap_or_default(),
1423            }),
1424        }
1425    }
1426}
1427
1428#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1429pub struct QueryContext {
1430    pub current_catalog: String,
1431    pub current_schema: String,
1432    pub timezone: String,
1433    pub extensions: HashMap<String, String>,
1434    pub channel: u8,
1435    /// Maps region id -> snapshot upper bound sequence for that region.
1436    #[serde(default)]
1437    pub snapshot_seqs: HashMap<u64, u64>,
1438    /// Maps region id -> minimal SST sequence allowed for that region.
1439    #[serde(default)]
1440    pub sst_min_sequences: HashMap<u64, u64>,
1441}
1442
1443impl QueryContext {
1444    /// Get the current catalog
1445    pub fn current_catalog(&self) -> &str {
1446        &self.current_catalog
1447    }
1448
1449    /// Get the current schema
1450    pub fn current_schema(&self) -> &str {
1451        &self.current_schema
1452    }
1453
1454    /// Get the timezone
1455    pub fn timezone(&self) -> &str {
1456        &self.timezone
1457    }
1458
1459    /// Get the extensions
1460    pub fn extensions(&self) -> &HashMap<String, String> {
1461        &self.extensions
1462    }
1463
1464    /// Get the channel
1465    pub fn channel(&self) -> u8 {
1466        self.channel
1467    }
1468
1469    pub fn snapshot_seqs(&self) -> &HashMap<u64, u64> {
1470        &self.snapshot_seqs
1471    }
1472
1473    pub fn sst_min_sequences(&self) -> &HashMap<u64, u64> {
1474        &self.sst_min_sequences
1475    }
1476}
1477
1478/// Lightweight query context for flow operations containing only essential fields.
1479/// This is a subset of QueryContext that includes only the fields actually needed
1480/// for flow creation and execution.
1481#[derive(Debug, Clone, Serialize, PartialEq)]
1482pub struct FlowQueryContext {
1483    /// Current catalog name used for flow metadata and execution.
1484    pub catalog: String,
1485    /// Current schema name used for table resolution during flow execution.
1486    pub schema: String,
1487    /// Timezone used for timestamp evaluation in the flow.
1488    pub timezone: String,
1489    /// Query extensions carried into flow execution.
1490    #[serde(default)]
1491    pub extensions: HashMap<String, String>,
1492    /// Request channel propagated from the original query context.
1493    #[serde(default)]
1494    pub channel: u8,
1495    /// Per-region snapshot upper bounds bound during query planning/execution.
1496    #[serde(default)]
1497    pub snapshot_seqs: HashMap<u64, u64>,
1498    /// Per-region lower SST scan bounds carried with the flow context.
1499    #[serde(default)]
1500    pub sst_min_sequences: HashMap<u64, u64>,
1501}
1502
1503impl<'de> Deserialize<'de> for FlowQueryContext {
1504    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1505    where
1506        D: serde::Deserializer<'de>,
1507    {
1508        // Support both QueryContext format and FlowQueryContext format
1509        #[derive(Deserialize)]
1510        #[serde(untagged)]
1511        enum ContextCompat {
1512            Flow(FlowQueryContextHelper),
1513            Full(QueryContext),
1514        }
1515
1516        #[derive(Deserialize)]
1517        struct FlowQueryContextHelper {
1518            catalog: String,
1519            schema: String,
1520            timezone: String,
1521            #[serde(default)]
1522            extensions: HashMap<String, String>,
1523            #[serde(default)]
1524            channel: u8,
1525            #[serde(default)]
1526            snapshot_seqs: HashMap<u64, u64>,
1527            #[serde(default)]
1528            sst_min_sequences: HashMap<u64, u64>,
1529        }
1530
1531        match ContextCompat::deserialize(deserializer)? {
1532            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1533                catalog: helper.catalog,
1534                schema: helper.schema,
1535                timezone: helper.timezone,
1536                extensions: helper.extensions,
1537                channel: helper.channel,
1538                snapshot_seqs: helper.snapshot_seqs,
1539                sst_min_sequences: helper.sst_min_sequences,
1540            }),
1541            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1542        }
1543    }
1544}
1545
1546impl From<PbQueryContext> for QueryContext {
1547    fn from(pb_ctx: PbQueryContext) -> Self {
1548        let (snapshot_seqs, sst_min_sequences) = pb_ctx
1549            .snapshot_seqs
1550            .map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences))
1551            .unwrap_or_default();
1552
1553        Self {
1554            current_catalog: pb_ctx.current_catalog,
1555            current_schema: pb_ctx.current_schema,
1556            timezone: pb_ctx.timezone,
1557            extensions: pb_ctx.extensions,
1558            channel: pb_ctx.channel as u8,
1559            snapshot_seqs,
1560            sst_min_sequences,
1561        }
1562    }
1563}
1564
1565impl From<QueryContext> for PbQueryContext {
1566    fn from(
1567        QueryContext {
1568            current_catalog,
1569            current_schema,
1570            timezone,
1571            extensions,
1572            channel,
1573            snapshot_seqs,
1574            sst_min_sequences,
1575        }: QueryContext,
1576    ) -> Self {
1577        PbQueryContext {
1578            current_catalog,
1579            current_schema,
1580            timezone,
1581            extensions,
1582            channel: channel as u32,
1583            snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some(
1584                api::v1::SnapshotSequences {
1585                    snapshot_seqs,
1586                    sst_min_sequences,
1587                },
1588            ),
1589            explain: None,
1590        }
1591    }
1592}
1593
1594impl From<QueryContext> for FlowQueryContext {
1595    fn from(ctx: QueryContext) -> Self {
1596        Self {
1597            catalog: ctx.current_catalog,
1598            schema: ctx.current_schema,
1599            timezone: ctx.timezone,
1600            extensions: ctx.extensions,
1601            channel: ctx.channel,
1602            snapshot_seqs: ctx.snapshot_seqs,
1603            sst_min_sequences: ctx.sst_min_sequences,
1604        }
1605    }
1606}
1607
1608impl From<FlowQueryContext> for QueryContext {
1609    fn from(flow_ctx: FlowQueryContext) -> Self {
1610        Self {
1611            current_catalog: flow_ctx.catalog,
1612            current_schema: flow_ctx.schema,
1613            timezone: flow_ctx.timezone,
1614            extensions: flow_ctx.extensions,
1615            channel: flow_ctx.channel,
1616            snapshot_seqs: flow_ctx.snapshot_seqs,
1617            sst_min_sequences: flow_ctx.sst_min_sequences,
1618        }
1619    }
1620}
1621
1622impl From<FlowQueryContext> for PbQueryContext {
1623    fn from(flow_ctx: FlowQueryContext) -> Self {
1624        let query_ctx: QueryContext = flow_ctx.into();
1625        query_ctx.into()
1626    }
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631    use std::sync::Arc;
1632
1633    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1634    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
1635    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1636    use store_api::storage::ConcreteDataType;
1637    use table::metadata::{TableInfo, TableMeta, TableType};
1638    use table::test_util::table_info::test_table_info;
1639
1640    use super::{AlterTableTask, CreateTableTask, *};
1641
1642    #[test]
1643    fn test_basic_ser_de_create_table_task() {
1644        let schema = SchemaBuilder::default().build().unwrap();
1645        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1646        let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info);
1647
1648        let output = serde_json::to_vec(&task).unwrap();
1649
1650        let de = serde_json::from_slice(&output).unwrap();
1651        assert_eq!(task, de);
1652    }
1653
1654    #[test]
1655    fn test_basic_ser_de_alter_table_task() {
1656        let task = AlterTableTask {
1657            alter_table: AlterTableExpr::default(),
1658        };
1659
1660        let output = serde_json::to_vec(&task).unwrap();
1661
1662        let de = serde_json::from_slice(&output).unwrap();
1663        assert_eq!(task, de);
1664    }
1665
1666    #[test]
1667    fn test_sort_columns() {
1668        // construct RawSchema
1669        let schema = Arc::new(Schema::new(vec![
1670            ColumnSchema::new(
1671                "column3".to_string(),
1672                ConcreteDataType::string_datatype(),
1673                true,
1674            ),
1675            ColumnSchema::new(
1676                "column1".to_string(),
1677                ConcreteDataType::timestamp_millisecond_datatype(),
1678                false,
1679            )
1680            .with_time_index(true),
1681            ColumnSchema::new(
1682                "column2".to_string(),
1683                ConcreteDataType::float64_datatype(),
1684                true,
1685            ),
1686        ]));
1687
1688        // construct RawTableMeta
1689        let meta = TableMeta {
1690            schema,
1691            primary_key_indices: vec![0],
1692            value_indices: vec![2],
1693            engine: METRIC_ENGINE_NAME.to_string(),
1694            next_column_id: 0,
1695            options: Default::default(),
1696            created_on: Default::default(),
1697            updated_on: Default::default(),
1698            partition_key_indices: Default::default(),
1699            column_ids: Default::default(),
1700        };
1701
1702        // construct TableInfo
1703        let raw_table_info = TableInfo {
1704            ident: Default::default(),
1705            meta,
1706            name: Default::default(),
1707            desc: Default::default(),
1708            catalog_name: Default::default(),
1709            schema_name: Default::default(),
1710            table_type: TableType::Base,
1711        };
1712
1713        // construct create table expr
1714        let create_table_expr = CreateTableExpr {
1715            column_defs: vec![
1716                ColumnDef {
1717                    name: "column3".to_string(),
1718                    semantic_type: SemanticType::Tag as i32,
1719                    ..Default::default()
1720                },
1721                ColumnDef {
1722                    name: "column1".to_string(),
1723                    semantic_type: SemanticType::Timestamp as i32,
1724                    ..Default::default()
1725                },
1726                ColumnDef {
1727                    name: "column2".to_string(),
1728                    semantic_type: SemanticType::Field as i32,
1729                    ..Default::default()
1730                },
1731            ],
1732            primary_keys: vec!["column3".to_string()],
1733            ..Default::default()
1734        };
1735
1736        let mut create_table_task =
1737            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1738
1739        // Call the sort_columns method
1740        create_table_task.sort_columns();
1741
1742        // Assert that the columns are sorted correctly
1743        assert_eq!(
1744            create_table_task.create_table.column_defs[0].name,
1745            "column1".to_string()
1746        );
1747        assert_eq!(
1748            create_table_task.create_table.column_defs[1].name,
1749            "column2".to_string()
1750        );
1751        assert_eq!(
1752            create_table_task.create_table.column_defs[2].name,
1753            "column3".to_string()
1754        );
1755
1756        // Assert that the table_info is updated correctly
1757        assert_eq!(
1758            create_table_task.table_info.meta.schema.timestamp_index(),
1759            Some(0)
1760        );
1761        assert_eq!(
1762            create_table_task.table_info.meta.primary_key_indices,
1763            vec![2]
1764        );
1765        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1766    }
1767
1768    #[test]
1769    fn test_flow_query_context_conversion_from_query_context() {
1770        use std::collections::HashMap;
1771        let mut extensions = HashMap::new();
1772        extensions.insert("key1".to_string(), "value1".to_string());
1773        extensions.insert("key2".to_string(), "value2".to_string());
1774
1775        let query_ctx = QueryContext {
1776            current_catalog: "test_catalog".to_string(),
1777            current_schema: "test_schema".to_string(),
1778            timezone: "UTC".to_string(),
1779            extensions,
1780            channel: 5,
1781            snapshot_seqs: HashMap::from([(10, 100)]),
1782            sst_min_sequences: HashMap::from([(10, 90)]),
1783        };
1784
1785        let flow_ctx: FlowQueryContext = query_ctx.into();
1786
1787        assert_eq!(flow_ctx.catalog, "test_catalog");
1788        assert_eq!(flow_ctx.schema, "test_schema");
1789        assert_eq!(flow_ctx.timezone, "UTC");
1790        assert_eq!(flow_ctx.channel, 5);
1791        assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)]));
1792        assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)]));
1793    }
1794
1795    #[test]
1796    fn test_flow_query_context_conversion_to_query_context() {
1797        let flow_ctx = FlowQueryContext {
1798            catalog: "prod_catalog".to_string(),
1799            schema: "public".to_string(),
1800            timezone: "America/New_York".to_string(),
1801            extensions: HashMap::from([("k".to_string(), "v".to_string())]),
1802            channel: 7,
1803            snapshot_seqs: HashMap::from([(11, 111)]),
1804            sst_min_sequences: HashMap::from([(11, 101)]),
1805        };
1806
1807        let query_ctx: QueryContext = flow_ctx.clone().into();
1808
1809        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1810        assert_eq!(query_ctx.current_schema, "public");
1811        assert_eq!(query_ctx.timezone, "America/New_York");
1812        assert_eq!(
1813            query_ctx.extensions,
1814            HashMap::from([("k".to_string(), "v".to_string())])
1815        );
1816        assert_eq!(query_ctx.channel, 7);
1817        assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)]));
1818        assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)]));
1819
1820        // Test roundtrip conversion
1821        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1822        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1823    }
1824
1825    #[test]
1826    fn test_flow_query_context_serialization() {
1827        let flow_ctx = FlowQueryContext {
1828            catalog: "test_catalog".to_string(),
1829            schema: "test_schema".to_string(),
1830            timezone: "UTC".to_string(),
1831            extensions: HashMap::new(),
1832            channel: 0,
1833            snapshot_seqs: HashMap::new(),
1834            sst_min_sequences: HashMap::new(),
1835        };
1836
1837        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1838        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1839
1840        assert_eq!(flow_ctx, deserialized);
1841
1842        // Verify JSON structure
1843        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1844        assert_eq!(json_value["catalog"], "test_catalog");
1845        assert_eq!(json_value["schema"], "test_schema");
1846        assert_eq!(json_value["timezone"], "UTC");
1847    }
1848
1849    #[test]
1850    fn test_flow_query_context_conversion_to_pb() {
1851        let flow_ctx = FlowQueryContext {
1852            catalog: "pb_catalog".to_string(),
1853            schema: "pb_schema".to_string(),
1854            timezone: "Asia/Tokyo".to_string(),
1855            extensions: HashMap::from([("x".to_string(), "y".to_string())]),
1856            channel: 6,
1857            snapshot_seqs: HashMap::from([(3, 30)]),
1858            sst_min_sequences: HashMap::from([(3, 21)]),
1859        };
1860
1861        let pb_ctx: PbQueryContext = flow_ctx.into();
1862
1863        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1864        assert_eq!(pb_ctx.current_schema, "pb_schema");
1865        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1866        assert_eq!(
1867            pb_ctx.extensions,
1868            HashMap::from([("x".to_string(), "y".to_string())])
1869        );
1870        assert_eq!(pb_ctx.channel, 6);
1871        assert_eq!(
1872            pb_ctx.snapshot_seqs,
1873            Some(api::v1::SnapshotSequences {
1874                snapshot_seqs: HashMap::from([(3, 30)]),
1875                sst_min_sequences: HashMap::from([(3, 21)]),
1876            })
1877        );
1878        assert!(pb_ctx.explain.is_none());
1879    }
1880
1881    #[test]
1882    fn test_pb_query_context_roundtrip_with_snapshot_sequences() {
1883        let pb = PbQueryContext {
1884            current_catalog: "c1".to_string(),
1885            current_schema: "s1".to_string(),
1886            timezone: "UTC".to_string(),
1887            extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
1888            channel: 3,
1889            snapshot_seqs: Some(api::v1::SnapshotSequences {
1890                snapshot_seqs: HashMap::from([(1, 100)]),
1891                sst_min_sequences: HashMap::from([(1, 90)]),
1892            }),
1893            explain: None,
1894        };
1895
1896        let query_ctx: QueryContext = pb.clone().into();
1897        let pb_roundtrip: PbQueryContext = query_ctx.into();
1898
1899        assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog);
1900        assert_eq!(pb_roundtrip.current_schema, pb.current_schema);
1901        assert_eq!(pb_roundtrip.timezone, pb.timezone);
1902        assert_eq!(pb_roundtrip.extensions, pb.extensions);
1903        assert_eq!(pb_roundtrip.channel, pb.channel);
1904        assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs);
1905    }
1906}