Skip to main content

common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20use std::time::Duration;
21
22use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
23use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
24use api::v1::meta::ddl_task_request::Task;
25use api::v1::meta::{
26    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
27    AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
28    CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
29    CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
30    CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
31    DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
32    DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
33    DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
34    TruncateTableTask as PbTruncateTableTask,
35};
36use api::v1::{
37    AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
38    CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
39    DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
40    QueryContext as PbQueryContext, TruncateTableExpr,
41};
42use base64::Engine as _;
43use base64::engine::general_purpose;
44use common_error::ext::BoxedError;
45use common_time::{DatabaseTimeToLive, Timestamp};
46use prost::Message;
47use serde::{Deserialize, Serialize};
48use serde_with::{DefaultOnNull, serde_as};
49use snafu::{OptionExt, ResultExt};
50use table::metadata::{TableId, TableInfo};
51use table::requests::validate_database_option;
52use table::table_name::TableName;
53use table::table_reference::TableReference;
54
55use crate::error::{
56    self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
57    InvalidUnsetDatabaseOptionSnafu, Result,
58};
59use crate::key::FlowId;
60
61/// Reserved query-context extension key for the frontend peer address that submitted a DDL request.
62pub const ORIGIN_FRONTEND_ADDR_EXTENSION_KEY: &str = "__greptime_origin_frontend.addr";
63
64/// DDL tasks
65#[derive(Debug, Clone)]
66pub enum DdlTask {
67    CreateTable(CreateTableTask),
68    DropTable(DropTableTask),
69    AlterTable(AlterTableTask),
70    TruncateTable(TruncateTableTask),
71    CreateLogicalTables(Vec<CreateTableTask>),
72    DropLogicalTables(Vec<DropTableTask>),
73    AlterLogicalTables(Vec<AlterTableTask>),
74    CreateDatabase(CreateDatabaseTask),
75    DropDatabase(DropDatabaseTask),
76    AlterDatabase(AlterDatabaseTask),
77    CreateFlow(CreateFlowTask),
78    DropFlow(DropFlowTask),
79    #[cfg(feature = "enterprise")]
80    DropTrigger(trigger::DropTriggerTask),
81    CreateView(CreateViewTask),
82    DropView(DropViewTask),
83    #[cfg(feature = "enterprise")]
84    CreateTrigger(trigger::CreateTriggerTask),
85    CommentOn(CommentOnTask),
86}
87
88impl DdlTask {
89    /// Creates a [`DdlTask`] to create a flow.
90    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
91        DdlTask::CreateFlow(expr)
92    }
93
94    /// Creates a [`DdlTask`] to drop a flow.
95    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
96        DdlTask::DropFlow(expr)
97    }
98
99    /// Creates a [`DdlTask`] to drop a view.
100    pub fn new_drop_view(expr: DropViewTask) -> Self {
101        DdlTask::DropView(expr)
102    }
103
104    /// Creates a [`DdlTask`] to create a table.
105    pub fn new_create_table(
106        expr: CreateTableExpr,
107        partitions: Vec<Partition>,
108        table_info: TableInfo,
109    ) -> Self {
110        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
111    }
112
113    /// Creates a [`DdlTask`] to create several logical tables.
114    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, TableInfo)>) -> Self {
115        DdlTask::CreateLogicalTables(
116            table_data
117                .into_iter()
118                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
119                .collect(),
120        )
121    }
122
123    /// Creates a [`DdlTask`] to alter several logical tables.
124    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
125        DdlTask::AlterLogicalTables(
126            table_data
127                .into_iter()
128                .map(|alter_table| AlterTableTask { alter_table })
129                .collect(),
130        )
131    }
132
133    /// Creates a [`DdlTask`] to drop a table.
134    pub fn new_drop_table(
135        catalog: String,
136        schema: String,
137        table: String,
138        table_id: TableId,
139        drop_if_exists: bool,
140    ) -> Self {
141        DdlTask::DropTable(DropTableTask {
142            catalog,
143            schema,
144            table,
145            table_id,
146            drop_if_exists,
147        })
148    }
149
150    /// Creates a [`DdlTask`] to create a database.
151    pub fn new_create_database(
152        catalog: String,
153        schema: String,
154        create_if_not_exists: bool,
155        options: HashMap<String, String>,
156    ) -> Self {
157        DdlTask::CreateDatabase(CreateDatabaseTask {
158            catalog,
159            schema,
160            create_if_not_exists,
161            options,
162        })
163    }
164
165    /// Creates a [`DdlTask`] to drop a database.
166    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
167        DdlTask::DropDatabase(DropDatabaseTask {
168            catalog,
169            schema,
170            drop_if_exists,
171        })
172    }
173
174    /// Creates a [`DdlTask`] to alter a database.
175    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
176        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
177    }
178
179    /// Creates a [`DdlTask`] to alter a table.
180    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
181        DdlTask::AlterTable(AlterTableTask { alter_table })
182    }
183
184    /// Creates a [`DdlTask`] to truncate a table.
185    pub fn new_truncate_table(
186        catalog: String,
187        schema: String,
188        table: String,
189        table_id: TableId,
190        time_ranges: Vec<(Timestamp, Timestamp)>,
191    ) -> Self {
192        DdlTask::TruncateTable(TruncateTableTask {
193            catalog,
194            schema,
195            table,
196            table_id,
197            time_ranges,
198        })
199    }
200
201    /// Creates a [`DdlTask`] to create a view.
202    pub fn new_create_view(create_view: CreateViewExpr, view_info: TableInfo) -> Self {
203        DdlTask::CreateView(CreateViewTask {
204            create_view,
205            view_info,
206        })
207    }
208
209    /// Creates a [`DdlTask`] to comment on a table, column, or flow.
210    pub fn new_comment_on(task: CommentOnTask) -> Self {
211        DdlTask::CommentOn(task)
212    }
213}
214
215impl TryFrom<Task> for DdlTask {
216    type Error = error::Error;
217    fn try_from(task: Task) -> Result<Self> {
218        match task {
219            Task::CreateTableTask(create_table) => {
220                Ok(DdlTask::CreateTable(create_table.try_into()?))
221            }
222            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
223            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
224            Task::TruncateTableTask(truncate_table) => {
225                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
226            }
227            Task::CreateTableTasks(create_tables) => {
228                let tasks = create_tables
229                    .tasks
230                    .into_iter()
231                    .map(|task| task.try_into())
232                    .collect::<Result<Vec<_>>>()?;
233
234                Ok(DdlTask::CreateLogicalTables(tasks))
235            }
236            Task::DropTableTasks(drop_tables) => {
237                let tasks = drop_tables
238                    .tasks
239                    .into_iter()
240                    .map(|task| task.try_into())
241                    .collect::<Result<Vec<_>>>()?;
242
243                Ok(DdlTask::DropLogicalTables(tasks))
244            }
245            Task::AlterTableTasks(alter_tables) => {
246                let tasks = alter_tables
247                    .tasks
248                    .into_iter()
249                    .map(|task| task.try_into())
250                    .collect::<Result<Vec<_>>>()?;
251
252                Ok(DdlTask::AlterLogicalTables(tasks))
253            }
254            Task::CreateDatabaseTask(create_database) => {
255                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
256            }
257            Task::DropDatabaseTask(drop_database) => {
258                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
259            }
260            Task::AlterDatabaseTask(alter_database) => {
261                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
262            }
263            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
264            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
265            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
266            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
267            Task::CreateTriggerTask(create_trigger) => {
268                #[cfg(feature = "enterprise")]
269                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
270                #[cfg(not(feature = "enterprise"))]
271                {
272                    let _ = create_trigger;
273                    crate::error::UnsupportedSnafu {
274                        operation: "create trigger",
275                    }
276                    .fail()
277                }
278            }
279            Task::DropTriggerTask(drop_trigger) => {
280                #[cfg(feature = "enterprise")]
281                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
282                #[cfg(not(feature = "enterprise"))]
283                {
284                    let _ = drop_trigger;
285                    crate::error::UnsupportedSnafu {
286                        operation: "drop trigger",
287                    }
288                    .fail()
289                }
290            }
291            Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
292        }
293    }
294}
295
296#[derive(Clone)]
297pub struct SubmitDdlTaskRequest {
298    pub query_context: QueryContext,
299    pub wait: bool,
300    pub timeout: Duration,
301    pub task: DdlTask,
302}
303
304impl SubmitDdlTaskRequest {
305    /// The default constructor for [`SubmitDdlTaskRequest`].
306    pub fn new(query_context: QueryContext, task: DdlTask) -> Self {
307        Self {
308            query_context,
309            wait: Self::default_wait(),
310            timeout: Self::default_timeout(),
311            task,
312        }
313    }
314
315    /// The default timeout for a DDL task.
316    pub fn default_timeout() -> Duration {
317        Duration::from_secs(60)
318    }
319
320    /// The default wait for a DDL task.
321    pub fn default_wait() -> bool {
322        true
323    }
324}
325
326impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
327    type Error = error::Error;
328
329    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
330        let task = match request.task {
331            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
332            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
333            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
334            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
335            DdlTask::CreateLogicalTables(tasks) => {
336                let tasks = tasks
337                    .into_iter()
338                    .map(|task| task.try_into())
339                    .collect::<Result<Vec<_>>>()?;
340
341                Task::CreateTableTasks(PbCreateTableTasks { tasks })
342            }
343            DdlTask::DropLogicalTables(tasks) => {
344                let tasks = tasks
345                    .into_iter()
346                    .map(|task| task.into())
347                    .collect::<Vec<_>>();
348
349                Task::DropTableTasks(PbDropTableTasks { tasks })
350            }
351            DdlTask::AlterLogicalTables(tasks) => {
352                let tasks = tasks
353                    .into_iter()
354                    .map(|task| task.try_into())
355                    .collect::<Result<Vec<_>>>()?;
356
357                Task::AlterTableTasks(PbAlterTableTasks { tasks })
358            }
359            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
360            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
361            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
362            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
363            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
364            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
365            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
366            #[cfg(feature = "enterprise")]
367            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
368            #[cfg(feature = "enterprise")]
369            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
370            DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
371        };
372
373        Ok(Self {
374            header: None,
375            query_context: Some(request.query_context.into()),
376            timeout_secs: request.timeout.as_secs() as u32,
377            wait: request.wait,
378            task: Some(task),
379        })
380    }
381}
382
383#[derive(Debug, Default)]
384pub struct SubmitDdlTaskResponse {
385    pub key: Vec<u8>,
386    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
387    pub table_ids: Vec<TableId>,
388}
389
390impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
391    type Error = error::Error;
392
393    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
394        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
395        Ok(Self {
396            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
397            table_ids,
398        })
399    }
400}
401
402impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
403    fn from(val: SubmitDdlTaskResponse) -> Self {
404        Self {
405            pid: Some(ProcedureId { key: val.key }),
406            table_ids: val
407                .table_ids
408                .into_iter()
409                .map(|id| api::v1::TableId { id })
410                .collect(),
411            ..Default::default()
412        }
413    }
414}
415
416/// A `CREATE VIEW` task.
417#[derive(Debug, PartialEq, Clone)]
418pub struct CreateViewTask {
419    pub create_view: CreateViewExpr,
420    pub view_info: TableInfo,
421}
422
423impl CreateViewTask {
424    /// Returns the [`TableReference`] of view.
425    pub fn table_ref(&self) -> TableReference<'_> {
426        TableReference {
427            catalog: &self.create_view.catalog_name,
428            schema: &self.create_view.schema_name,
429            table: &self.create_view.view_name,
430        }
431    }
432
433    /// Returns the encoded logical plan
434    pub fn raw_logical_plan(&self) -> &Vec<u8> {
435        &self.create_view.logical_plan
436    }
437
438    /// Returns the view definition in SQL
439    pub fn view_definition(&self) -> &str {
440        &self.create_view.definition
441    }
442
443    /// Returns the resolved table names in view's logical plan
444    pub fn table_names(&self) -> HashSet<TableName> {
445        self.create_view
446            .table_names
447            .iter()
448            .map(|t| t.clone().into())
449            .collect()
450    }
451
452    /// Returns the view's columns
453    pub fn columns(&self) -> &Vec<String> {
454        &self.create_view.columns
455    }
456
457    /// Returns the original logical plan's columns
458    pub fn plan_columns(&self) -> &Vec<String> {
459        &self.create_view.plan_columns
460    }
461}
462
463impl TryFrom<PbCreateViewTask> for CreateViewTask {
464    type Error = error::Error;
465
466    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
467        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
468
469        Ok(CreateViewTask {
470            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
471                err_msg: "expected create view",
472            })?,
473            view_info,
474        })
475    }
476}
477
478impl TryFrom<CreateViewTask> for PbCreateViewTask {
479    type Error = error::Error;
480
481    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
482        Ok(PbCreateViewTask {
483            create_view: Some(task.create_view),
484            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
485        })
486    }
487}
488
489impl Serialize for CreateViewTask {
490    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
491    where
492        S: serde::Serializer,
493    {
494        let view_info = serde_json::to_vec(&self.view_info)
495            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
496
497        let pb = PbCreateViewTask {
498            create_view: Some(self.create_view.clone()),
499            view_info,
500        };
501        let buf = pb.encode_to_vec();
502        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
503        serializer.serialize_str(&encoded)
504    }
505}
506
507impl<'de> Deserialize<'de> for CreateViewTask {
508    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
509    where
510        D: serde::Deserializer<'de>,
511    {
512        let encoded = String::deserialize(deserializer)?;
513        let buf = general_purpose::STANDARD_NO_PAD
514            .decode(encoded)
515            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
516        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
517            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
518
519        let expr = CreateViewTask::try_from(expr)
520            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
521
522        Ok(expr)
523    }
524}
525
526/// A `DROP VIEW` task.
527#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
528pub struct DropViewTask {
529    pub catalog: String,
530    pub schema: String,
531    pub view: String,
532    pub view_id: TableId,
533    pub drop_if_exists: bool,
534}
535
536impl DropViewTask {
537    /// Returns the [`TableReference`] of view.
538    pub fn table_ref(&self) -> TableReference<'_> {
539        TableReference {
540            catalog: &self.catalog,
541            schema: &self.schema,
542            table: &self.view,
543        }
544    }
545}
546
547impl TryFrom<PbDropViewTask> for DropViewTask {
548    type Error = error::Error;
549
550    fn try_from(pb: PbDropViewTask) -> Result<Self> {
551        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
552            err_msg: "expected drop view",
553        })?;
554
555        Ok(DropViewTask {
556            catalog: expr.catalog_name,
557            schema: expr.schema_name,
558            view: expr.view_name,
559            view_id: expr
560                .view_id
561                .context(error::InvalidProtoMsgSnafu {
562                    err_msg: "expected view_id",
563                })?
564                .id,
565            drop_if_exists: expr.drop_if_exists,
566        })
567    }
568}
569
570impl From<DropViewTask> for PbDropViewTask {
571    fn from(task: DropViewTask) -> Self {
572        PbDropViewTask {
573            drop_view: Some(DropViewExpr {
574                catalog_name: task.catalog,
575                schema_name: task.schema,
576                view_name: task.view,
577                view_id: Some(api::v1::TableId { id: task.view_id }),
578                drop_if_exists: task.drop_if_exists,
579            }),
580        }
581    }
582}
583
584#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
585pub struct DropTableTask {
586    pub catalog: String,
587    pub schema: String,
588    pub table: String,
589    pub table_id: TableId,
590    #[serde(default)]
591    pub drop_if_exists: bool,
592}
593
594impl DropTableTask {
595    pub fn table_ref(&self) -> TableReference<'_> {
596        TableReference {
597            catalog: &self.catalog,
598            schema: &self.schema,
599            table: &self.table,
600        }
601    }
602
603    pub fn table_name(&self) -> TableName {
604        TableName {
605            catalog_name: self.catalog.clone(),
606            schema_name: self.schema.clone(),
607            table_name: self.table.clone(),
608        }
609    }
610}
611
612impl TryFrom<PbDropTableTask> for DropTableTask {
613    type Error = error::Error;
614
615    fn try_from(pb: PbDropTableTask) -> Result<Self> {
616        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
617            err_msg: "expected drop table",
618        })?;
619
620        Ok(Self {
621            catalog: drop_table.catalog_name,
622            schema: drop_table.schema_name,
623            table: drop_table.table_name,
624            table_id: drop_table
625                .table_id
626                .context(error::InvalidProtoMsgSnafu {
627                    err_msg: "expected table_id",
628                })?
629                .id,
630            drop_if_exists: drop_table.drop_if_exists,
631        })
632    }
633}
634
635impl From<DropTableTask> for PbDropTableTask {
636    fn from(task: DropTableTask) -> Self {
637        PbDropTableTask {
638            drop_table: Some(DropTableExpr {
639                catalog_name: task.catalog,
640                schema_name: task.schema,
641                table_name: task.table,
642                table_id: Some(api::v1::TableId { id: task.table_id }),
643                drop_if_exists: task.drop_if_exists,
644            }),
645        }
646    }
647}
648
649#[derive(Debug, PartialEq, Clone)]
650pub struct CreateTableTask {
651    pub create_table: CreateTableExpr,
652    pub partitions: Vec<Partition>,
653    pub table_info: TableInfo,
654}
655
656impl TryFrom<PbCreateTableTask> for CreateTableTask {
657    type Error = error::Error;
658
659    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
660        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
661
662        Ok(CreateTableTask::new(
663            pb.create_table.context(error::InvalidProtoMsgSnafu {
664                err_msg: "expected create table",
665            })?,
666            pb.partitions,
667            table_info,
668        ))
669    }
670}
671
672impl TryFrom<CreateTableTask> for PbCreateTableTask {
673    type Error = error::Error;
674
675    fn try_from(task: CreateTableTask) -> Result<Self> {
676        Ok(PbCreateTableTask {
677            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
678            create_table: Some(task.create_table),
679            partitions: task.partitions,
680        })
681    }
682}
683
684impl CreateTableTask {
685    pub fn new(
686        expr: CreateTableExpr,
687        partitions: Vec<Partition>,
688        table_info: TableInfo,
689    ) -> CreateTableTask {
690        CreateTableTask {
691            create_table: expr,
692            partitions,
693            table_info,
694        }
695    }
696
697    pub fn table_name(&self) -> TableName {
698        let table = &self.create_table;
699
700        TableName {
701            catalog_name: table.catalog_name.clone(),
702            schema_name: table.schema_name.clone(),
703            table_name: table.table_name.clone(),
704        }
705    }
706
707    pub fn table_ref(&self) -> TableReference<'_> {
708        let table = &self.create_table;
709
710        TableReference {
711            catalog: &table.catalog_name,
712            schema: &table.schema_name,
713            table: &table.table_name,
714        }
715    }
716
717    /// Sets the `table_info`'s table_id.
718    pub fn set_table_id(&mut self, table_id: TableId) {
719        self.table_info.ident.table_id = table_id;
720    }
721
722    /// Sort the columns in [CreateTableExpr] and [TableInfo].
723    ///
724    /// This function won't do any check or verification. Caller should
725    /// ensure this task is valid.
726    pub fn sort_columns(&mut self) {
727        // sort create table expr
728        // sort column_defs by name
729        self.create_table
730            .column_defs
731            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
732
733        self.table_info.sort_columns();
734    }
735}
736
737impl Serialize for CreateTableTask {
738    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
739    where
740        S: serde::Serializer,
741    {
742        let table_info = serde_json::to_vec(&self.table_info)
743            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
744
745        let pb = PbCreateTableTask {
746            create_table: Some(self.create_table.clone()),
747            partitions: self.partitions.clone(),
748            table_info,
749        };
750        let buf = pb.encode_to_vec();
751        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
752        serializer.serialize_str(&encoded)
753    }
754}
755
756impl<'de> Deserialize<'de> for CreateTableTask {
757    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
758    where
759        D: serde::Deserializer<'de>,
760    {
761        let encoded = String::deserialize(deserializer)?;
762        let buf = general_purpose::STANDARD_NO_PAD
763            .decode(encoded)
764            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
765        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
766            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
767
768        let expr = CreateTableTask::try_from(expr)
769            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
770
771        Ok(expr)
772    }
773}
774
775#[derive(Debug, PartialEq, Clone)]
776pub struct AlterTableTask {
777    // TODO(CookiePieWw): Replace proto struct with user-defined struct
778    pub alter_table: AlterTableExpr,
779}
780
781impl AlterTableTask {
782    pub fn validate(&self) -> Result<()> {
783        self.alter_table
784            .kind
785            .as_ref()
786            .context(error::UnexpectedSnafu {
787                err_msg: "'kind' is absent",
788            })?;
789        Ok(())
790    }
791
792    pub fn table_ref(&self) -> TableReference<'_> {
793        TableReference {
794            catalog: &self.alter_table.catalog_name,
795            schema: &self.alter_table.schema_name,
796            table: &self.alter_table.table_name,
797        }
798    }
799
800    pub fn table_name(&self) -> TableName {
801        let table = &self.alter_table;
802
803        TableName {
804            catalog_name: table.catalog_name.clone(),
805            schema_name: table.schema_name.clone(),
806            table_name: table.table_name.clone(),
807        }
808    }
809}
810
811impl TryFrom<PbAlterTableTask> for AlterTableTask {
812    type Error = error::Error;
813
814    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
815        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
816            err_msg: "expected alter_table",
817        })?;
818
819        Ok(AlterTableTask { alter_table })
820    }
821}
822
823impl TryFrom<AlterTableTask> for PbAlterTableTask {
824    type Error = error::Error;
825
826    fn try_from(task: AlterTableTask) -> Result<Self> {
827        Ok(PbAlterTableTask {
828            alter_table: Some(task.alter_table),
829        })
830    }
831}
832
833impl Serialize for AlterTableTask {
834    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
835    where
836        S: serde::Serializer,
837    {
838        let pb = PbAlterTableTask {
839            alter_table: Some(self.alter_table.clone()),
840        };
841        let buf = pb.encode_to_vec();
842        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
843        serializer.serialize_str(&encoded)
844    }
845}
846
847impl<'de> Deserialize<'de> for AlterTableTask {
848    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
849    where
850        D: serde::Deserializer<'de>,
851    {
852        let encoded = String::deserialize(deserializer)?;
853        let buf = general_purpose::STANDARD_NO_PAD
854            .decode(encoded)
855            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
856        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
857            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
858
859        let expr = AlterTableTask::try_from(expr)
860            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
861
862        Ok(expr)
863    }
864}
865
866#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
867pub struct TruncateTableTask {
868    pub catalog: String,
869    pub schema: String,
870    pub table: String,
871    pub table_id: TableId,
872    pub time_ranges: Vec<(Timestamp, Timestamp)>,
873}
874
875impl TruncateTableTask {
876    pub fn table_ref(&self) -> TableReference<'_> {
877        TableReference {
878            catalog: &self.catalog,
879            schema: &self.schema,
880            table: &self.table,
881        }
882    }
883
884    pub fn table_name(&self) -> TableName {
885        TableName {
886            catalog_name: self.catalog.clone(),
887            schema_name: self.schema.clone(),
888            table_name: self.table.clone(),
889        }
890    }
891}
892
893impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
894    type Error = error::Error;
895
896    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
897        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
898            err_msg: "expected truncate table",
899        })?;
900
901        Ok(Self {
902            catalog: truncate_table.catalog_name,
903            schema: truncate_table.schema_name,
904            table: truncate_table.table_name,
905            table_id: truncate_table
906                .table_id
907                .context(error::InvalidProtoMsgSnafu {
908                    err_msg: "expected table_id",
909                })?
910                .id,
911            time_ranges: truncate_table
912                .time_ranges
913                .map(from_pb_time_ranges)
914                .transpose()
915                .map_err(BoxedError::new)
916                .context(ExternalSnafu)?
917                .unwrap_or_default(),
918        })
919    }
920}
921
922impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
923    type Error = error::Error;
924
925    fn try_from(task: TruncateTableTask) -> Result<Self> {
926        Ok(PbTruncateTableTask {
927            truncate_table: Some(TruncateTableExpr {
928                catalog_name: task.catalog,
929                schema_name: task.schema,
930                table_name: task.table,
931                table_id: Some(api::v1::TableId { id: task.table_id }),
932                time_ranges: Some(
933                    to_pb_time_ranges(&task.time_ranges).context(ConvertTimeRangesSnafu)?,
934                ),
935            }),
936        })
937    }
938}
939
940#[serde_as]
941#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
942pub struct CreateDatabaseTask {
943    pub catalog: String,
944    pub schema: String,
945    pub create_if_not_exists: bool,
946    #[serde_as(deserialize_as = "DefaultOnNull")]
947    pub options: HashMap<String, String>,
948}
949
950impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
951    type Error = error::Error;
952
953    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
954        let CreateDatabaseExpr {
955            catalog_name,
956            schema_name,
957            create_if_not_exists,
958            options,
959        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
960            err_msg: "expected create database",
961        })?;
962
963        Ok(CreateDatabaseTask {
964            catalog: catalog_name,
965            schema: schema_name,
966            create_if_not_exists,
967            options,
968        })
969    }
970}
971
972impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
973    type Error = error::Error;
974
975    fn try_from(
976        CreateDatabaseTask {
977            catalog,
978            schema,
979            create_if_not_exists,
980            options,
981        }: CreateDatabaseTask,
982    ) -> Result<Self> {
983        Ok(PbCreateDatabaseTask {
984            create_database: Some(CreateDatabaseExpr {
985                catalog_name: catalog,
986                schema_name: schema,
987                create_if_not_exists,
988                options,
989            }),
990        })
991    }
992}
993
994#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
995pub struct DropDatabaseTask {
996    pub catalog: String,
997    pub schema: String,
998    pub drop_if_exists: bool,
999}
1000
1001impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
1002    type Error = error::Error;
1003
1004    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
1005        let DropDatabaseExpr {
1006            catalog_name,
1007            schema_name,
1008            drop_if_exists,
1009        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
1010            err_msg: "expected drop database",
1011        })?;
1012
1013        Ok(DropDatabaseTask {
1014            catalog: catalog_name,
1015            schema: schema_name,
1016            drop_if_exists,
1017        })
1018    }
1019}
1020
1021impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
1022    type Error = error::Error;
1023
1024    fn try_from(
1025        DropDatabaseTask {
1026            catalog,
1027            schema,
1028            drop_if_exists,
1029        }: DropDatabaseTask,
1030    ) -> Result<Self> {
1031        Ok(PbDropDatabaseTask {
1032            drop_database: Some(DropDatabaseExpr {
1033                catalog_name: catalog,
1034                schema_name: schema,
1035                drop_if_exists,
1036            }),
1037        })
1038    }
1039}
1040
1041#[derive(Debug, PartialEq, Clone)]
1042pub struct AlterDatabaseTask {
1043    pub alter_expr: AlterDatabaseExpr,
1044}
1045
1046impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
1047    type Error = error::Error;
1048
1049    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
1050        Ok(PbAlterDatabaseTask {
1051            task: Some(task.alter_expr),
1052        })
1053    }
1054}
1055
1056impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1057    type Error = error::Error;
1058
1059    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1060        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1061            err_msg: "expected alter database",
1062        })?;
1063
1064        Ok(AlterDatabaseTask { alter_expr })
1065    }
1066}
1067
1068impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1069    type Error = error::Error;
1070
1071    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1072        match pb {
1073            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1074                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1075                    options
1076                        .set_database_options
1077                        .into_iter()
1078                        .map(SetDatabaseOption::try_from)
1079                        .collect::<Result<Vec<_>>>()?,
1080                )))
1081            }
1082            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1083                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1084                    options
1085                        .keys
1086                        .iter()
1087                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1088                        .collect::<Result<Vec<_>>>()?,
1089                )),
1090            ),
1091        }
1092    }
1093}
1094
1095const TTL_KEY: &str = "ttl";
1096
1097impl TryFrom<PbOption> for SetDatabaseOption {
1098    type Error = error::Error;
1099
1100    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1101        let key_lower = key.to_ascii_lowercase();
1102        match key_lower.as_str() {
1103            TTL_KEY => {
1104                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1105                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1106
1107                Ok(SetDatabaseOption::Ttl(ttl))
1108            }
1109            _ => {
1110                if validate_database_option(&key_lower) {
1111                    Ok(SetDatabaseOption::Other(key_lower, value))
1112                } else {
1113                    InvalidSetDatabaseOptionSnafu { key, value }.fail()
1114                }
1115            }
1116        }
1117    }
1118}
1119
1120#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1121pub enum SetDatabaseOption {
1122    Ttl(DatabaseTimeToLive),
1123    Other(String, String),
1124}
1125
1126#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1127pub enum UnsetDatabaseOption {
1128    Ttl,
1129    Other(String),
1130}
1131
1132impl TryFrom<&str> for UnsetDatabaseOption {
1133    type Error = error::Error;
1134
1135    fn try_from(key: &str) -> Result<Self> {
1136        let key_lower = key.to_ascii_lowercase();
1137        match key_lower.as_str() {
1138            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1139            _ => {
1140                if validate_database_option(&key_lower) {
1141                    Ok(UnsetDatabaseOption::Other(key_lower))
1142                } else {
1143                    InvalidUnsetDatabaseOptionSnafu { key }.fail()
1144                }
1145            }
1146        }
1147    }
1148}
1149
1150#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1151pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1152
1153#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1154pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1155
1156#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1157pub enum AlterDatabaseKind {
1158    SetDatabaseOptions(SetDatabaseOptions),
1159    UnsetDatabaseOptions(UnsetDatabaseOptions),
1160}
1161
1162impl AlterDatabaseTask {
1163    pub fn catalog(&self) -> &str {
1164        &self.alter_expr.catalog_name
1165    }
1166
1167    pub fn schema(&self) -> &str {
1168        &self.alter_expr.catalog_name
1169    }
1170}
1171
1172/// Create flow
1173#[derive(Debug, Clone, Serialize, Deserialize)]
1174pub struct CreateFlowTask {
1175    pub catalog_name: String,
1176    pub flow_name: String,
1177    pub source_table_names: Vec<TableName>,
1178    pub sink_table_name: TableName,
1179    pub or_replace: bool,
1180    pub create_if_not_exists: bool,
1181    /// Duration in seconds. Data older than this duration will not be used.
1182    pub expire_after: Option<i64>,
1183    pub eval_interval_secs: Option<i64>,
1184    pub comment: String,
1185    pub sql: String,
1186    pub flow_options: HashMap<String, String>,
1187}
1188
1189impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1190    type Error = error::Error;
1191
1192    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1193        let CreateFlowExpr {
1194            catalog_name,
1195            flow_name,
1196            source_table_names,
1197            sink_table_name,
1198            or_replace,
1199            create_if_not_exists,
1200            expire_after,
1201            eval_interval,
1202            comment,
1203            sql,
1204            flow_options,
1205        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1206            err_msg: "expected create_flow",
1207        })?;
1208
1209        Ok(CreateFlowTask {
1210            catalog_name,
1211            flow_name,
1212            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1213            sink_table_name: sink_table_name
1214                .context(error::InvalidProtoMsgSnafu {
1215                    err_msg: "expected sink_table_name",
1216                })?
1217                .into(),
1218            or_replace,
1219            create_if_not_exists,
1220            expire_after: expire_after.map(|e| e.value),
1221            eval_interval_secs: eval_interval.map(|e| e.seconds),
1222            comment,
1223            sql,
1224            flow_options,
1225        })
1226    }
1227}
1228
1229impl From<CreateFlowTask> for PbCreateFlowTask {
1230    fn from(
1231        CreateFlowTask {
1232            catalog_name,
1233            flow_name,
1234            source_table_names,
1235            sink_table_name,
1236            or_replace,
1237            create_if_not_exists,
1238            expire_after,
1239            eval_interval_secs: eval_interval,
1240            comment,
1241            sql,
1242            flow_options,
1243        }: CreateFlowTask,
1244    ) -> Self {
1245        PbCreateFlowTask {
1246            create_flow: Some(CreateFlowExpr {
1247                catalog_name,
1248                flow_name,
1249                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1250                sink_table_name: Some(sink_table_name.into()),
1251                or_replace,
1252                create_if_not_exists,
1253                expire_after: expire_after.map(|value| ExpireAfter { value }),
1254                eval_interval: eval_interval.map(|seconds| EvalInterval { seconds }),
1255                comment,
1256                sql,
1257                flow_options,
1258            }),
1259        }
1260    }
1261}
1262
1263/// Drop flow
1264#[derive(Debug, Clone, Serialize, Deserialize)]
1265pub struct DropFlowTask {
1266    pub catalog_name: String,
1267    pub flow_name: String,
1268    pub flow_id: FlowId,
1269    pub drop_if_exists: bool,
1270}
1271
1272impl TryFrom<PbDropFlowTask> for DropFlowTask {
1273    type Error = error::Error;
1274
1275    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1276        let DropFlowExpr {
1277            catalog_name,
1278            flow_name,
1279            flow_id,
1280            drop_if_exists,
1281        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1282            err_msg: "expected drop_flow",
1283        })?;
1284        let flow_id = flow_id
1285            .context(error::InvalidProtoMsgSnafu {
1286                err_msg: "expected flow_id",
1287            })?
1288            .id;
1289        Ok(DropFlowTask {
1290            catalog_name,
1291            flow_name,
1292            flow_id,
1293            drop_if_exists,
1294        })
1295    }
1296}
1297
1298impl From<DropFlowTask> for PbDropFlowTask {
1299    fn from(
1300        DropFlowTask {
1301            catalog_name,
1302            flow_name,
1303            flow_id,
1304            drop_if_exists,
1305        }: DropFlowTask,
1306    ) -> Self {
1307        PbDropFlowTask {
1308            drop_flow: Some(DropFlowExpr {
1309                catalog_name,
1310                flow_name,
1311                flow_id: Some(api::v1::FlowId { id: flow_id }),
1312                drop_if_exists,
1313            }),
1314        }
1315    }
1316}
1317
1318/// Represents the ID of the object being commented on (Table or Flow).
1319#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1320pub enum CommentObjectId {
1321    Table(TableId),
1322    Flow(FlowId),
1323}
1324
1325/// Comment on table, column, or flow
1326#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1327pub struct CommentOnTask {
1328    pub catalog_name: String,
1329    pub schema_name: String,
1330    pub object_type: CommentObjectType,
1331    pub object_name: String,
1332    /// Column name (only for Column comments)
1333    pub column_name: Option<String>,
1334    /// Object ID (Table or Flow) for validation and cache invalidation
1335    pub object_id: Option<CommentObjectId>,
1336    pub comment: Option<String>,
1337}
1338
1339#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1340pub enum CommentObjectType {
1341    Table,
1342    Column,
1343    Flow,
1344}
1345
1346impl CommentOnTask {
1347    pub fn table_ref(&self) -> TableReference<'_> {
1348        TableReference {
1349            catalog: &self.catalog_name,
1350            schema: &self.schema_name,
1351            table: &self.object_name,
1352        }
1353    }
1354}
1355
1356// Proto conversions for CommentObjectType
1357impl From<CommentObjectType> for PbCommentObjectType {
1358    fn from(object_type: CommentObjectType) -> Self {
1359        match object_type {
1360            CommentObjectType::Table => PbCommentObjectType::Table,
1361            CommentObjectType::Column => PbCommentObjectType::Column,
1362            CommentObjectType::Flow => PbCommentObjectType::Flow,
1363        }
1364    }
1365}
1366
1367impl TryFrom<i32> for CommentObjectType {
1368    type Error = error::Error;
1369
1370    fn try_from(value: i32) -> Result<Self> {
1371        match value {
1372            0 => Ok(CommentObjectType::Table),
1373            1 => Ok(CommentObjectType::Column),
1374            2 => Ok(CommentObjectType::Flow),
1375            _ => error::InvalidProtoMsgSnafu {
1376                err_msg: format!(
1377                    "Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
1378                    value
1379                ),
1380            }
1381            .fail(),
1382        }
1383    }
1384}
1385
1386// Proto conversions for CommentOnTask
1387impl TryFrom<PbCommentOnTask> for CommentOnTask {
1388    type Error = error::Error;
1389
1390    fn try_from(pb: PbCommentOnTask) -> Result<Self> {
1391        let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
1392            err_msg: "expected comment_on",
1393        })?;
1394
1395        Ok(CommentOnTask {
1396            catalog_name: comment_on.catalog_name,
1397            schema_name: comment_on.schema_name,
1398            object_type: comment_on.object_type.try_into()?,
1399            object_name: comment_on.object_name,
1400            column_name: if comment_on.column_name.is_empty() {
1401                None
1402            } else {
1403                Some(comment_on.column_name)
1404            },
1405            comment: if comment_on.comment.is_empty() {
1406                None
1407            } else {
1408                Some(comment_on.comment)
1409            },
1410            object_id: None,
1411        })
1412    }
1413}
1414
1415impl From<CommentOnTask> for PbCommentOnTask {
1416    fn from(task: CommentOnTask) -> Self {
1417        let pb_object_type: PbCommentObjectType = task.object_type.into();
1418        PbCommentOnTask {
1419            comment_on: Some(CommentOnExpr {
1420                catalog_name: task.catalog_name,
1421                schema_name: task.schema_name,
1422                object_type: pb_object_type as i32,
1423                object_name: task.object_name,
1424                column_name: task.column_name.unwrap_or_default(),
1425                comment: task.comment.unwrap_or_default(),
1426            }),
1427        }
1428    }
1429}
1430
1431#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
1432pub struct QueryContext {
1433    pub current_catalog: String,
1434    pub current_schema: String,
1435    pub timezone: String,
1436    pub extensions: HashMap<String, String>,
1437    pub channel: u8,
1438    /// Maps region id -> snapshot upper bound sequence for that region.
1439    #[serde(default)]
1440    pub snapshot_seqs: HashMap<u64, u64>,
1441    /// Maps region id -> minimal SST sequence allowed for that region.
1442    #[serde(default)]
1443    pub sst_min_sequences: HashMap<u64, u64>,
1444}
1445
1446impl QueryContext {
1447    /// Get the current catalog
1448    pub fn current_catalog(&self) -> &str {
1449        &self.current_catalog
1450    }
1451
1452    /// Get the current schema
1453    pub fn current_schema(&self) -> &str {
1454        &self.current_schema
1455    }
1456
1457    /// Get the timezone
1458    pub fn timezone(&self) -> &str {
1459        &self.timezone
1460    }
1461
1462    /// Get the extensions
1463    pub fn extensions(&self) -> &HashMap<String, String> {
1464        &self.extensions
1465    }
1466
1467    /// Get the channel
1468    pub fn channel(&self) -> u8 {
1469        self.channel
1470    }
1471
1472    pub fn snapshot_seqs(&self) -> &HashMap<u64, u64> {
1473        &self.snapshot_seqs
1474    }
1475
1476    pub fn sst_min_sequences(&self) -> &HashMap<u64, u64> {
1477        &self.sst_min_sequences
1478    }
1479}
1480
1481/// Lightweight query context for flow operations containing only essential fields.
1482/// This is a subset of QueryContext that includes only the fields actually needed
1483/// for flow creation and execution.
1484#[derive(Debug, Clone, Serialize, PartialEq)]
1485pub struct FlowQueryContext {
1486    /// Current catalog name used for flow metadata and execution.
1487    pub catalog: String,
1488    /// Current schema name used for table resolution during flow execution.
1489    pub schema: String,
1490    /// Timezone used for timestamp evaluation in the flow.
1491    pub timezone: String,
1492    /// Query extensions carried into flow execution.
1493    #[serde(default)]
1494    pub extensions: HashMap<String, String>,
1495    /// Request channel propagated from the original query context.
1496    #[serde(default)]
1497    pub channel: u8,
1498    /// Per-region snapshot upper bounds bound during query planning/execution.
1499    #[serde(default)]
1500    pub snapshot_seqs: HashMap<u64, u64>,
1501    /// Per-region lower SST scan bounds carried with the flow context.
1502    #[serde(default)]
1503    pub sst_min_sequences: HashMap<u64, u64>,
1504}
1505
1506impl<'de> Deserialize<'de> for FlowQueryContext {
1507    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
1508    where
1509        D: serde::Deserializer<'de>,
1510    {
1511        // Support both QueryContext format and FlowQueryContext format
1512        #[derive(Deserialize)]
1513        #[serde(untagged)]
1514        enum ContextCompat {
1515            Flow(FlowQueryContextHelper),
1516            Full(QueryContext),
1517        }
1518
1519        #[derive(Deserialize)]
1520        struct FlowQueryContextHelper {
1521            catalog: String,
1522            schema: String,
1523            timezone: String,
1524            #[serde(default)]
1525            extensions: HashMap<String, String>,
1526            #[serde(default)]
1527            channel: u8,
1528            #[serde(default)]
1529            snapshot_seqs: HashMap<u64, u64>,
1530            #[serde(default)]
1531            sst_min_sequences: HashMap<u64, u64>,
1532        }
1533
1534        match ContextCompat::deserialize(deserializer)? {
1535            ContextCompat::Flow(helper) => Ok(FlowQueryContext {
1536                catalog: helper.catalog,
1537                schema: helper.schema,
1538                timezone: helper.timezone,
1539                extensions: helper.extensions,
1540                channel: helper.channel,
1541                snapshot_seqs: helper.snapshot_seqs,
1542                sst_min_sequences: helper.sst_min_sequences,
1543            }),
1544            ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
1545        }
1546    }
1547}
1548
1549impl From<PbQueryContext> for QueryContext {
1550    fn from(pb_ctx: PbQueryContext) -> Self {
1551        let (snapshot_seqs, sst_min_sequences) = pb_ctx
1552            .snapshot_seqs
1553            .map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences))
1554            .unwrap_or_default();
1555
1556        Self {
1557            current_catalog: pb_ctx.current_catalog,
1558            current_schema: pb_ctx.current_schema,
1559            timezone: pb_ctx.timezone,
1560            extensions: pb_ctx.extensions,
1561            channel: pb_ctx.channel as u8,
1562            snapshot_seqs,
1563            sst_min_sequences,
1564        }
1565    }
1566}
1567
1568impl From<QueryContext> for PbQueryContext {
1569    fn from(
1570        QueryContext {
1571            current_catalog,
1572            current_schema,
1573            timezone,
1574            extensions,
1575            channel,
1576            snapshot_seqs,
1577            sst_min_sequences,
1578        }: QueryContext,
1579    ) -> Self {
1580        PbQueryContext {
1581            current_catalog,
1582            current_schema,
1583            timezone,
1584            extensions,
1585            channel: channel as u32,
1586            snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some(
1587                api::v1::SnapshotSequences {
1588                    snapshot_seqs,
1589                    sst_min_sequences,
1590                },
1591            ),
1592            explain: None,
1593        }
1594    }
1595}
1596
1597impl From<QueryContext> for FlowQueryContext {
1598    fn from(ctx: QueryContext) -> Self {
1599        Self {
1600            catalog: ctx.current_catalog,
1601            schema: ctx.current_schema,
1602            timezone: ctx.timezone,
1603            extensions: ctx.extensions,
1604            channel: ctx.channel,
1605            snapshot_seqs: ctx.snapshot_seqs,
1606            sst_min_sequences: ctx.sst_min_sequences,
1607        }
1608    }
1609}
1610
1611impl From<FlowQueryContext> for QueryContext {
1612    fn from(flow_ctx: FlowQueryContext) -> Self {
1613        Self {
1614            current_catalog: flow_ctx.catalog,
1615            current_schema: flow_ctx.schema,
1616            timezone: flow_ctx.timezone,
1617            extensions: flow_ctx.extensions,
1618            channel: flow_ctx.channel,
1619            snapshot_seqs: flow_ctx.snapshot_seqs,
1620            sst_min_sequences: flow_ctx.sst_min_sequences,
1621        }
1622    }
1623}
1624
1625impl From<FlowQueryContext> for PbQueryContext {
1626    fn from(flow_ctx: FlowQueryContext) -> Self {
1627        let query_ctx: QueryContext = flow_ctx.into();
1628        query_ctx.into()
1629    }
1630}
1631
1632#[cfg(test)]
1633mod tests {
1634    use std::sync::Arc;
1635
1636    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1637    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
1638    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1639    use store_api::storage::ConcreteDataType;
1640    use table::metadata::{TableInfo, TableMeta, TableType};
1641    use table::test_util::table_info::test_table_info;
1642
1643    use super::{AlterTableTask, CreateTableTask, *};
1644
1645    #[test]
1646    fn test_basic_ser_de_create_table_task() {
1647        let schema = SchemaBuilder::default().build().unwrap();
1648        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1649        let task = CreateTableTask::new(CreateTableExpr::default(), Vec::new(), table_info);
1650
1651        let output = serde_json::to_vec(&task).unwrap();
1652
1653        let de = serde_json::from_slice(&output).unwrap();
1654        assert_eq!(task, de);
1655    }
1656
1657    #[test]
1658    fn test_basic_ser_de_alter_table_task() {
1659        let task = AlterTableTask {
1660            alter_table: AlterTableExpr::default(),
1661        };
1662
1663        let output = serde_json::to_vec(&task).unwrap();
1664
1665        let de = serde_json::from_slice(&output).unwrap();
1666        assert_eq!(task, de);
1667    }
1668
1669    #[test]
1670    fn test_sort_columns() {
1671        // construct RawSchema
1672        let schema = Arc::new(Schema::new(vec![
1673            ColumnSchema::new(
1674                "column3".to_string(),
1675                ConcreteDataType::string_datatype(),
1676                true,
1677            ),
1678            ColumnSchema::new(
1679                "column1".to_string(),
1680                ConcreteDataType::timestamp_millisecond_datatype(),
1681                false,
1682            )
1683            .with_time_index(true),
1684            ColumnSchema::new(
1685                "column2".to_string(),
1686                ConcreteDataType::float64_datatype(),
1687                true,
1688            ),
1689        ]));
1690
1691        // construct RawTableMeta
1692        let meta = TableMeta {
1693            schema,
1694            primary_key_indices: vec![0],
1695            value_indices: vec![2],
1696            engine: METRIC_ENGINE_NAME.to_string(),
1697            next_column_id: 0,
1698            options: Default::default(),
1699            created_on: Default::default(),
1700            updated_on: Default::default(),
1701            partition_key_indices: Default::default(),
1702            column_ids: Default::default(),
1703        };
1704
1705        // construct TableInfo
1706        let raw_table_info = TableInfo {
1707            ident: Default::default(),
1708            meta,
1709            name: Default::default(),
1710            desc: Default::default(),
1711            catalog_name: Default::default(),
1712            schema_name: Default::default(),
1713            table_type: TableType::Base,
1714        };
1715
1716        // construct create table expr
1717        let create_table_expr = CreateTableExpr {
1718            column_defs: vec![
1719                ColumnDef {
1720                    name: "column3".to_string(),
1721                    semantic_type: SemanticType::Tag as i32,
1722                    ..Default::default()
1723                },
1724                ColumnDef {
1725                    name: "column1".to_string(),
1726                    semantic_type: SemanticType::Timestamp as i32,
1727                    ..Default::default()
1728                },
1729                ColumnDef {
1730                    name: "column2".to_string(),
1731                    semantic_type: SemanticType::Field as i32,
1732                    ..Default::default()
1733                },
1734            ],
1735            primary_keys: vec!["column3".to_string()],
1736            ..Default::default()
1737        };
1738
1739        let mut create_table_task =
1740            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1741
1742        // Call the sort_columns method
1743        create_table_task.sort_columns();
1744
1745        // Assert that the columns are sorted correctly
1746        assert_eq!(
1747            create_table_task.create_table.column_defs[0].name,
1748            "column1".to_string()
1749        );
1750        assert_eq!(
1751            create_table_task.create_table.column_defs[1].name,
1752            "column2".to_string()
1753        );
1754        assert_eq!(
1755            create_table_task.create_table.column_defs[2].name,
1756            "column3".to_string()
1757        );
1758
1759        // Assert that the table_info is updated correctly
1760        assert_eq!(
1761            create_table_task.table_info.meta.schema.timestamp_index(),
1762            Some(0)
1763        );
1764        assert_eq!(
1765            create_table_task.table_info.meta.primary_key_indices,
1766            vec![2]
1767        );
1768        assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
1769    }
1770
1771    #[test]
1772    fn test_flow_query_context_conversion_from_query_context() {
1773        use std::collections::HashMap;
1774        let mut extensions = HashMap::new();
1775        extensions.insert("key1".to_string(), "value1".to_string());
1776        extensions.insert("key2".to_string(), "value2".to_string());
1777
1778        let query_ctx = QueryContext {
1779            current_catalog: "test_catalog".to_string(),
1780            current_schema: "test_schema".to_string(),
1781            timezone: "UTC".to_string(),
1782            extensions,
1783            channel: 5,
1784            snapshot_seqs: HashMap::from([(10, 100)]),
1785            sst_min_sequences: HashMap::from([(10, 90)]),
1786        };
1787
1788        let flow_ctx: FlowQueryContext = query_ctx.into();
1789
1790        assert_eq!(flow_ctx.catalog, "test_catalog");
1791        assert_eq!(flow_ctx.schema, "test_schema");
1792        assert_eq!(flow_ctx.timezone, "UTC");
1793        assert_eq!(flow_ctx.channel, 5);
1794        assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)]));
1795        assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)]));
1796    }
1797
1798    #[test]
1799    fn test_flow_query_context_conversion_to_query_context() {
1800        let flow_ctx = FlowQueryContext {
1801            catalog: "prod_catalog".to_string(),
1802            schema: "public".to_string(),
1803            timezone: "America/New_York".to_string(),
1804            extensions: HashMap::from([("k".to_string(), "v".to_string())]),
1805            channel: 7,
1806            snapshot_seqs: HashMap::from([(11, 111)]),
1807            sst_min_sequences: HashMap::from([(11, 101)]),
1808        };
1809
1810        let query_ctx: QueryContext = flow_ctx.clone().into();
1811
1812        assert_eq!(query_ctx.current_catalog, "prod_catalog");
1813        assert_eq!(query_ctx.current_schema, "public");
1814        assert_eq!(query_ctx.timezone, "America/New_York");
1815        assert_eq!(
1816            query_ctx.extensions,
1817            HashMap::from([("k".to_string(), "v".to_string())])
1818        );
1819        assert_eq!(query_ctx.channel, 7);
1820        assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)]));
1821        assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)]));
1822
1823        // Test roundtrip conversion
1824        let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
1825        assert_eq!(flow_ctx, flow_ctx_roundtrip);
1826    }
1827
1828    #[test]
1829    fn test_flow_query_context_serialization() {
1830        let flow_ctx = FlowQueryContext {
1831            catalog: "test_catalog".to_string(),
1832            schema: "test_schema".to_string(),
1833            timezone: "UTC".to_string(),
1834            extensions: HashMap::new(),
1835            channel: 0,
1836            snapshot_seqs: HashMap::new(),
1837            sst_min_sequences: HashMap::new(),
1838        };
1839
1840        let serialized = serde_json::to_string(&flow_ctx).unwrap();
1841        let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
1842
1843        assert_eq!(flow_ctx, deserialized);
1844
1845        // Verify JSON structure
1846        let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
1847        assert_eq!(json_value["catalog"], "test_catalog");
1848        assert_eq!(json_value["schema"], "test_schema");
1849        assert_eq!(json_value["timezone"], "UTC");
1850    }
1851
1852    #[test]
1853    fn test_flow_query_context_conversion_to_pb() {
1854        let flow_ctx = FlowQueryContext {
1855            catalog: "pb_catalog".to_string(),
1856            schema: "pb_schema".to_string(),
1857            timezone: "Asia/Tokyo".to_string(),
1858            extensions: HashMap::from([("x".to_string(), "y".to_string())]),
1859            channel: 6,
1860            snapshot_seqs: HashMap::from([(3, 30)]),
1861            sst_min_sequences: HashMap::from([(3, 21)]),
1862        };
1863
1864        let pb_ctx: PbQueryContext = flow_ctx.into();
1865
1866        assert_eq!(pb_ctx.current_catalog, "pb_catalog");
1867        assert_eq!(pb_ctx.current_schema, "pb_schema");
1868        assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
1869        assert_eq!(
1870            pb_ctx.extensions,
1871            HashMap::from([("x".to_string(), "y".to_string())])
1872        );
1873        assert_eq!(pb_ctx.channel, 6);
1874        assert_eq!(
1875            pb_ctx.snapshot_seqs,
1876            Some(api::v1::SnapshotSequences {
1877                snapshot_seqs: HashMap::from([(3, 30)]),
1878                sst_min_sequences: HashMap::from([(3, 21)]),
1879            })
1880        );
1881        assert!(pb_ctx.explain.is_none());
1882    }
1883
1884    #[test]
1885    fn test_pb_query_context_roundtrip_with_snapshot_sequences() {
1886        let pb = PbQueryContext {
1887            current_catalog: "c1".to_string(),
1888            current_schema: "s1".to_string(),
1889            timezone: "UTC".to_string(),
1890            extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
1891            channel: 3,
1892            snapshot_seqs: Some(api::v1::SnapshotSequences {
1893                snapshot_seqs: HashMap::from([(1, 100)]),
1894                sst_min_sequences: HashMap::from([(1, 90)]),
1895            }),
1896            explain: None,
1897        };
1898
1899        let query_ctx: QueryContext = pb.clone().into();
1900        let pb_roundtrip: PbQueryContext = query_ctx.into();
1901
1902        assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog);
1903        assert_eq!(pb_roundtrip.current_schema, pb.current_schema);
1904        assert_eq!(pb_roundtrip.timezone, pb.timezone);
1905        assert_eq!(pb_roundtrip.extensions, pb.extensions);
1906        assert_eq!(pb_roundtrip.channel, pb.channel);
1907        assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs);
1908    }
1909}