common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
22use api::v1::meta::ddl_task_request::Task;
23use api::v1::meta::{
24    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
25    AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
26    CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
27    CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
28    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
29    DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
30    DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
31    DropViewTask as PbDropViewTask, Partition, ProcedureId,
32    TruncateTableTask as PbTruncateTableTask,
33};
34use api::v1::{
35    AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
36    CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
37    Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
38};
39use base64::engine::general_purpose;
40use base64::Engine as _;
41use common_time::{DatabaseTimeToLive, Timezone};
42use prost::Message;
43use serde::{Deserialize, Serialize};
44use serde_with::{serde_as, DefaultOnNull};
45use session::context::{QueryContextBuilder, QueryContextRef};
46use snafu::{OptionExt, ResultExt};
47use table::metadata::{RawTableInfo, TableId};
48use table::table_name::TableName;
49use table::table_reference::TableReference;
50
51use crate::error::{
52    self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
53    Result,
54};
55use crate::key::FlowId;
56
57/// DDL tasks
58#[derive(Debug, Clone)]
59pub enum DdlTask {
60    CreateTable(CreateTableTask),
61    DropTable(DropTableTask),
62    AlterTable(AlterTableTask),
63    TruncateTable(TruncateTableTask),
64    CreateLogicalTables(Vec<CreateTableTask>),
65    DropLogicalTables(Vec<DropTableTask>),
66    AlterLogicalTables(Vec<AlterTableTask>),
67    CreateDatabase(CreateDatabaseTask),
68    DropDatabase(DropDatabaseTask),
69    AlterDatabase(AlterDatabaseTask),
70    CreateFlow(CreateFlowTask),
71    DropFlow(DropFlowTask),
72    #[cfg(feature = "enterprise")]
73    DropTrigger(trigger::DropTriggerTask),
74    CreateView(CreateViewTask),
75    DropView(DropViewTask),
76    #[cfg(feature = "enterprise")]
77    CreateTrigger(trigger::CreateTriggerTask),
78}
79
80impl DdlTask {
81    /// Creates a [`DdlTask`] to create a flow.
82    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
83        DdlTask::CreateFlow(expr)
84    }
85
86    /// Creates a [`DdlTask`] to drop a flow.
87    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
88        DdlTask::DropFlow(expr)
89    }
90
91    /// Creates a [`DdlTask`] to drop a view.
92    pub fn new_drop_view(expr: DropViewTask) -> Self {
93        DdlTask::DropView(expr)
94    }
95
96    /// Creates a [`DdlTask`] to create a table.
97    pub fn new_create_table(
98        expr: CreateTableExpr,
99        partitions: Vec<Partition>,
100        table_info: RawTableInfo,
101    ) -> Self {
102        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
103    }
104
105    /// Creates a [`DdlTask`] to create several logical tables.
106    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
107        DdlTask::CreateLogicalTables(
108            table_data
109                .into_iter()
110                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
111                .collect(),
112        )
113    }
114
115    /// Creates a [`DdlTask`] to alter several logical tables.
116    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
117        DdlTask::AlterLogicalTables(
118            table_data
119                .into_iter()
120                .map(|alter_table| AlterTableTask { alter_table })
121                .collect(),
122        )
123    }
124
125    /// Creates a [`DdlTask`] to drop a table.
126    pub fn new_drop_table(
127        catalog: String,
128        schema: String,
129        table: String,
130        table_id: TableId,
131        drop_if_exists: bool,
132    ) -> Self {
133        DdlTask::DropTable(DropTableTask {
134            catalog,
135            schema,
136            table,
137            table_id,
138            drop_if_exists,
139        })
140    }
141
142    /// Creates a [`DdlTask`] to create a database.
143    pub fn new_create_database(
144        catalog: String,
145        schema: String,
146        create_if_not_exists: bool,
147        options: HashMap<String, String>,
148    ) -> Self {
149        DdlTask::CreateDatabase(CreateDatabaseTask {
150            catalog,
151            schema,
152            create_if_not_exists,
153            options,
154        })
155    }
156
157    /// Creates a [`DdlTask`] to drop a database.
158    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
159        DdlTask::DropDatabase(DropDatabaseTask {
160            catalog,
161            schema,
162            drop_if_exists,
163        })
164    }
165
166    /// Creates a [`DdlTask`] to alter a database.
167    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
168        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
169    }
170
171    /// Creates a [`DdlTask`] to alter a table.
172    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
173        DdlTask::AlterTable(AlterTableTask { alter_table })
174    }
175
176    /// Creates a [`DdlTask`] to truncate a table.
177    pub fn new_truncate_table(
178        catalog: String,
179        schema: String,
180        table: String,
181        table_id: TableId,
182    ) -> Self {
183        DdlTask::TruncateTable(TruncateTableTask {
184            catalog,
185            schema,
186            table,
187            table_id,
188        })
189    }
190
191    /// Creates a [`DdlTask`] to create a view.
192    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
193        DdlTask::CreateView(CreateViewTask {
194            create_view,
195            view_info,
196        })
197    }
198}
199
200impl TryFrom<Task> for DdlTask {
201    type Error = error::Error;
202    fn try_from(task: Task) -> Result<Self> {
203        match task {
204            Task::CreateTableTask(create_table) => {
205                Ok(DdlTask::CreateTable(create_table.try_into()?))
206            }
207            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
208            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
209            Task::TruncateTableTask(truncate_table) => {
210                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
211            }
212            Task::CreateTableTasks(create_tables) => {
213                let tasks = create_tables
214                    .tasks
215                    .into_iter()
216                    .map(|task| task.try_into())
217                    .collect::<Result<Vec<_>>>()?;
218
219                Ok(DdlTask::CreateLogicalTables(tasks))
220            }
221            Task::DropTableTasks(drop_tables) => {
222                let tasks = drop_tables
223                    .tasks
224                    .into_iter()
225                    .map(|task| task.try_into())
226                    .collect::<Result<Vec<_>>>()?;
227
228                Ok(DdlTask::DropLogicalTables(tasks))
229            }
230            Task::AlterTableTasks(alter_tables) => {
231                let tasks = alter_tables
232                    .tasks
233                    .into_iter()
234                    .map(|task| task.try_into())
235                    .collect::<Result<Vec<_>>>()?;
236
237                Ok(DdlTask::AlterLogicalTables(tasks))
238            }
239            Task::CreateDatabaseTask(create_database) => {
240                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
241            }
242            Task::DropDatabaseTask(drop_database) => {
243                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
244            }
245            Task::AlterDatabaseTask(alter_database) => {
246                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
247            }
248            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
249            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
250            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
251            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
252            Task::CreateTriggerTask(create_trigger) => {
253                #[cfg(feature = "enterprise")]
254                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
255                #[cfg(not(feature = "enterprise"))]
256                {
257                    let _ = create_trigger;
258                    crate::error::UnsupportedSnafu {
259                        operation: "create trigger",
260                    }
261                    .fail()
262                }
263            }
264            Task::DropTriggerTask(drop_trigger) => {
265                #[cfg(feature = "enterprise")]
266                return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
267                #[cfg(not(feature = "enterprise"))]
268                {
269                    let _ = drop_trigger;
270                    crate::error::UnsupportedSnafu {
271                        operation: "drop trigger",
272                    }
273                    .fail()
274                }
275            }
276        }
277    }
278}
279
280#[derive(Clone)]
281pub struct SubmitDdlTaskRequest {
282    pub query_context: QueryContextRef,
283    pub task: DdlTask,
284}
285
286impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
287    type Error = error::Error;
288
289    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
290        let task = match request.task {
291            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
292            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
293            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
294            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
295            DdlTask::CreateLogicalTables(tasks) => {
296                let tasks = tasks
297                    .into_iter()
298                    .map(|task| task.try_into())
299                    .collect::<Result<Vec<_>>>()?;
300
301                Task::CreateTableTasks(PbCreateTableTasks { tasks })
302            }
303            DdlTask::DropLogicalTables(tasks) => {
304                let tasks = tasks
305                    .into_iter()
306                    .map(|task| task.into())
307                    .collect::<Vec<_>>();
308
309                Task::DropTableTasks(PbDropTableTasks { tasks })
310            }
311            DdlTask::AlterLogicalTables(tasks) => {
312                let tasks = tasks
313                    .into_iter()
314                    .map(|task| task.try_into())
315                    .collect::<Result<Vec<_>>>()?;
316
317                Task::AlterTableTasks(PbAlterTableTasks { tasks })
318            }
319            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
320            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
321            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
322            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
323            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
324            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
325            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
326            #[cfg(feature = "enterprise")]
327            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
328            #[cfg(feature = "enterprise")]
329            DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
330        };
331
332        Ok(Self {
333            header: None,
334            query_context: Some((*request.query_context).clone().into()),
335            task: Some(task),
336        })
337    }
338}
339
340#[derive(Debug, Default)]
341pub struct SubmitDdlTaskResponse {
342    pub key: Vec<u8>,
343    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
344    pub table_ids: Vec<TableId>,
345}
346
347impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
348    type Error = error::Error;
349
350    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
351        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
352        Ok(Self {
353            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
354            table_ids,
355        })
356    }
357}
358
359impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
360    fn from(val: SubmitDdlTaskResponse) -> Self {
361        Self {
362            pid: Some(ProcedureId { key: val.key }),
363            table_ids: val
364                .table_ids
365                .into_iter()
366                .map(|id| api::v1::TableId { id })
367                .collect(),
368            ..Default::default()
369        }
370    }
371}
372
373/// A `CREATE VIEW` task.
374#[derive(Debug, PartialEq, Clone)]
375pub struct CreateViewTask {
376    pub create_view: CreateViewExpr,
377    pub view_info: RawTableInfo,
378}
379
380impl CreateViewTask {
381    /// Returns the [`TableReference`] of view.
382    pub fn table_ref(&self) -> TableReference {
383        TableReference {
384            catalog: &self.create_view.catalog_name,
385            schema: &self.create_view.schema_name,
386            table: &self.create_view.view_name,
387        }
388    }
389
390    /// Returns the encoded logical plan
391    pub fn raw_logical_plan(&self) -> &Vec<u8> {
392        &self.create_view.logical_plan
393    }
394
395    /// Returns the view definition in SQL
396    pub fn view_definition(&self) -> &str {
397        &self.create_view.definition
398    }
399
400    /// Returns the resolved table names in view's logical plan
401    pub fn table_names(&self) -> HashSet<TableName> {
402        self.create_view
403            .table_names
404            .iter()
405            .map(|t| t.clone().into())
406            .collect()
407    }
408
409    /// Returns the view's columns
410    pub fn columns(&self) -> &Vec<String> {
411        &self.create_view.columns
412    }
413
414    /// Returns the original logical plan's columns
415    pub fn plan_columns(&self) -> &Vec<String> {
416        &self.create_view.plan_columns
417    }
418}
419
420impl TryFrom<PbCreateViewTask> for CreateViewTask {
421    type Error = error::Error;
422
423    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
424        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
425
426        Ok(CreateViewTask {
427            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
428                err_msg: "expected create view",
429            })?,
430            view_info,
431        })
432    }
433}
434
435impl TryFrom<CreateViewTask> for PbCreateViewTask {
436    type Error = error::Error;
437
438    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
439        Ok(PbCreateViewTask {
440            create_view: Some(task.create_view),
441            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
442        })
443    }
444}
445
446impl Serialize for CreateViewTask {
447    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
448    where
449        S: serde::Serializer,
450    {
451        let view_info = serde_json::to_vec(&self.view_info)
452            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
453
454        let pb = PbCreateViewTask {
455            create_view: Some(self.create_view.clone()),
456            view_info,
457        };
458        let buf = pb.encode_to_vec();
459        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
460        serializer.serialize_str(&encoded)
461    }
462}
463
464impl<'de> Deserialize<'de> for CreateViewTask {
465    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
466    where
467        D: serde::Deserializer<'de>,
468    {
469        let encoded = String::deserialize(deserializer)?;
470        let buf = general_purpose::STANDARD_NO_PAD
471            .decode(encoded)
472            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
473        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
474            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
475
476        let expr = CreateViewTask::try_from(expr)
477            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
478
479        Ok(expr)
480    }
481}
482
483/// A `DROP VIEW` task.
484#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
485pub struct DropViewTask {
486    pub catalog: String,
487    pub schema: String,
488    pub view: String,
489    pub view_id: TableId,
490    pub drop_if_exists: bool,
491}
492
493impl DropViewTask {
494    /// Returns the [`TableReference`] of view.
495    pub fn table_ref(&self) -> TableReference {
496        TableReference {
497            catalog: &self.catalog,
498            schema: &self.schema,
499            table: &self.view,
500        }
501    }
502}
503
504impl TryFrom<PbDropViewTask> for DropViewTask {
505    type Error = error::Error;
506
507    fn try_from(pb: PbDropViewTask) -> Result<Self> {
508        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
509            err_msg: "expected drop view",
510        })?;
511
512        Ok(DropViewTask {
513            catalog: expr.catalog_name,
514            schema: expr.schema_name,
515            view: expr.view_name,
516            view_id: expr
517                .view_id
518                .context(error::InvalidProtoMsgSnafu {
519                    err_msg: "expected view_id",
520                })?
521                .id,
522            drop_if_exists: expr.drop_if_exists,
523        })
524    }
525}
526
527impl From<DropViewTask> for PbDropViewTask {
528    fn from(task: DropViewTask) -> Self {
529        PbDropViewTask {
530            drop_view: Some(DropViewExpr {
531                catalog_name: task.catalog,
532                schema_name: task.schema,
533                view_name: task.view,
534                view_id: Some(api::v1::TableId { id: task.view_id }),
535                drop_if_exists: task.drop_if_exists,
536            }),
537        }
538    }
539}
540
541#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
542pub struct DropTableTask {
543    pub catalog: String,
544    pub schema: String,
545    pub table: String,
546    pub table_id: TableId,
547    #[serde(default)]
548    pub drop_if_exists: bool,
549}
550
551impl DropTableTask {
552    pub fn table_ref(&self) -> TableReference {
553        TableReference {
554            catalog: &self.catalog,
555            schema: &self.schema,
556            table: &self.table,
557        }
558    }
559
560    pub fn table_name(&self) -> TableName {
561        TableName {
562            catalog_name: self.catalog.to_string(),
563            schema_name: self.schema.to_string(),
564            table_name: self.table.to_string(),
565        }
566    }
567}
568
569impl TryFrom<PbDropTableTask> for DropTableTask {
570    type Error = error::Error;
571
572    fn try_from(pb: PbDropTableTask) -> Result<Self> {
573        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
574            err_msg: "expected drop table",
575        })?;
576
577        Ok(Self {
578            catalog: drop_table.catalog_name,
579            schema: drop_table.schema_name,
580            table: drop_table.table_name,
581            table_id: drop_table
582                .table_id
583                .context(error::InvalidProtoMsgSnafu {
584                    err_msg: "expected table_id",
585                })?
586                .id,
587            drop_if_exists: drop_table.drop_if_exists,
588        })
589    }
590}
591
592impl From<DropTableTask> for PbDropTableTask {
593    fn from(task: DropTableTask) -> Self {
594        PbDropTableTask {
595            drop_table: Some(DropTableExpr {
596                catalog_name: task.catalog,
597                schema_name: task.schema,
598                table_name: task.table,
599                table_id: Some(api::v1::TableId { id: task.table_id }),
600                drop_if_exists: task.drop_if_exists,
601            }),
602        }
603    }
604}
605
606#[derive(Debug, PartialEq, Clone)]
607pub struct CreateTableTask {
608    pub create_table: CreateTableExpr,
609    pub partitions: Vec<Partition>,
610    pub table_info: RawTableInfo,
611}
612
613impl TryFrom<PbCreateTableTask> for CreateTableTask {
614    type Error = error::Error;
615
616    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
617        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
618
619        Ok(CreateTableTask::new(
620            pb.create_table.context(error::InvalidProtoMsgSnafu {
621                err_msg: "expected create table",
622            })?,
623            pb.partitions,
624            table_info,
625        ))
626    }
627}
628
629impl TryFrom<CreateTableTask> for PbCreateTableTask {
630    type Error = error::Error;
631
632    fn try_from(task: CreateTableTask) -> Result<Self> {
633        Ok(PbCreateTableTask {
634            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
635            create_table: Some(task.create_table),
636            partitions: task.partitions,
637        })
638    }
639}
640
641impl CreateTableTask {
642    pub fn new(
643        expr: CreateTableExpr,
644        partitions: Vec<Partition>,
645        table_info: RawTableInfo,
646    ) -> CreateTableTask {
647        CreateTableTask {
648            create_table: expr,
649            partitions,
650            table_info,
651        }
652    }
653
654    pub fn table_name(&self) -> TableName {
655        let table = &self.create_table;
656
657        TableName {
658            catalog_name: table.catalog_name.to_string(),
659            schema_name: table.schema_name.to_string(),
660            table_name: table.table_name.to_string(),
661        }
662    }
663
664    pub fn table_ref(&self) -> TableReference {
665        let table = &self.create_table;
666
667        TableReference {
668            catalog: &table.catalog_name,
669            schema: &table.schema_name,
670            table: &table.table_name,
671        }
672    }
673
674    /// Sets the `table_info`'s table_id.
675    pub fn set_table_id(&mut self, table_id: TableId) {
676        self.table_info.ident.table_id = table_id;
677    }
678
679    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
680    ///
681    /// This function won't do any check or verification. Caller should
682    /// ensure this task is valid.
683    pub fn sort_columns(&mut self) {
684        // sort create table expr
685        // sort column_defs by name
686        self.create_table
687            .column_defs
688            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
689
690        self.table_info.sort_columns();
691    }
692}
693
694impl Serialize for CreateTableTask {
695    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
696    where
697        S: serde::Serializer,
698    {
699        let table_info = serde_json::to_vec(&self.table_info)
700            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
701
702        let pb = PbCreateTableTask {
703            create_table: Some(self.create_table.clone()),
704            partitions: self.partitions.clone(),
705            table_info,
706        };
707        let buf = pb.encode_to_vec();
708        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
709        serializer.serialize_str(&encoded)
710    }
711}
712
713impl<'de> Deserialize<'de> for CreateTableTask {
714    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
715    where
716        D: serde::Deserializer<'de>,
717    {
718        let encoded = String::deserialize(deserializer)?;
719        let buf = general_purpose::STANDARD_NO_PAD
720            .decode(encoded)
721            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
722        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
723            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
724
725        let expr = CreateTableTask::try_from(expr)
726            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
727
728        Ok(expr)
729    }
730}
731
732#[derive(Debug, PartialEq, Clone)]
733pub struct AlterTableTask {
734    // TODO(CookiePieWw): Replace proto struct with user-defined struct
735    pub alter_table: AlterTableExpr,
736}
737
738impl AlterTableTask {
739    pub fn validate(&self) -> Result<()> {
740        self.alter_table
741            .kind
742            .as_ref()
743            .context(error::UnexpectedSnafu {
744                err_msg: "'kind' is absent",
745            })?;
746        Ok(())
747    }
748
749    pub fn table_ref(&self) -> TableReference {
750        TableReference {
751            catalog: &self.alter_table.catalog_name,
752            schema: &self.alter_table.schema_name,
753            table: &self.alter_table.table_name,
754        }
755    }
756
757    pub fn table_name(&self) -> TableName {
758        let table = &self.alter_table;
759
760        TableName {
761            catalog_name: table.catalog_name.to_string(),
762            schema_name: table.schema_name.to_string(),
763            table_name: table.table_name.to_string(),
764        }
765    }
766}
767
768impl TryFrom<PbAlterTableTask> for AlterTableTask {
769    type Error = error::Error;
770
771    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
772        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
773            err_msg: "expected alter_table",
774        })?;
775
776        Ok(AlterTableTask { alter_table })
777    }
778}
779
780impl TryFrom<AlterTableTask> for PbAlterTableTask {
781    type Error = error::Error;
782
783    fn try_from(task: AlterTableTask) -> Result<Self> {
784        Ok(PbAlterTableTask {
785            alter_table: Some(task.alter_table),
786        })
787    }
788}
789
790impl Serialize for AlterTableTask {
791    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
792    where
793        S: serde::Serializer,
794    {
795        let pb = PbAlterTableTask {
796            alter_table: Some(self.alter_table.clone()),
797        };
798        let buf = pb.encode_to_vec();
799        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
800        serializer.serialize_str(&encoded)
801    }
802}
803
804impl<'de> Deserialize<'de> for AlterTableTask {
805    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
806    where
807        D: serde::Deserializer<'de>,
808    {
809        let encoded = String::deserialize(deserializer)?;
810        let buf = general_purpose::STANDARD_NO_PAD
811            .decode(encoded)
812            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
813        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
814            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
815
816        let expr = AlterTableTask::try_from(expr)
817            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
818
819        Ok(expr)
820    }
821}
822
823#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
824pub struct TruncateTableTask {
825    pub catalog: String,
826    pub schema: String,
827    pub table: String,
828    pub table_id: TableId,
829}
830
831impl TruncateTableTask {
832    pub fn table_ref(&self) -> TableReference {
833        TableReference {
834            catalog: &self.catalog,
835            schema: &self.schema,
836            table: &self.table,
837        }
838    }
839
840    pub fn table_name(&self) -> TableName {
841        TableName {
842            catalog_name: self.catalog.to_string(),
843            schema_name: self.schema.to_string(),
844            table_name: self.table.to_string(),
845        }
846    }
847}
848
849impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
850    type Error = error::Error;
851
852    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
853        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
854            err_msg: "expected truncate table",
855        })?;
856
857        Ok(Self {
858            catalog: truncate_table.catalog_name,
859            schema: truncate_table.schema_name,
860            table: truncate_table.table_name,
861            table_id: truncate_table
862                .table_id
863                .context(error::InvalidProtoMsgSnafu {
864                    err_msg: "expected table_id",
865                })?
866                .id,
867        })
868    }
869}
870
871impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
872    type Error = error::Error;
873
874    fn try_from(task: TruncateTableTask) -> Result<Self> {
875        Ok(PbTruncateTableTask {
876            truncate_table: Some(TruncateTableExpr {
877                catalog_name: task.catalog,
878                schema_name: task.schema,
879                table_name: task.table,
880                table_id: Some(api::v1::TableId { id: task.table_id }),
881            }),
882        })
883    }
884}
885
886#[serde_as]
887#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
888pub struct CreateDatabaseTask {
889    pub catalog: String,
890    pub schema: String,
891    pub create_if_not_exists: bool,
892    #[serde_as(deserialize_as = "DefaultOnNull")]
893    pub options: HashMap<String, String>,
894}
895
896impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
897    type Error = error::Error;
898
899    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
900        let CreateDatabaseExpr {
901            catalog_name,
902            schema_name,
903            create_if_not_exists,
904            options,
905        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
906            err_msg: "expected create database",
907        })?;
908
909        Ok(CreateDatabaseTask {
910            catalog: catalog_name,
911            schema: schema_name,
912            create_if_not_exists,
913            options,
914        })
915    }
916}
917
918impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
919    type Error = error::Error;
920
921    fn try_from(
922        CreateDatabaseTask {
923            catalog,
924            schema,
925            create_if_not_exists,
926            options,
927        }: CreateDatabaseTask,
928    ) -> Result<Self> {
929        Ok(PbCreateDatabaseTask {
930            create_database: Some(CreateDatabaseExpr {
931                catalog_name: catalog,
932                schema_name: schema,
933                create_if_not_exists,
934                options,
935            }),
936        })
937    }
938}
939
940#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
941pub struct DropDatabaseTask {
942    pub catalog: String,
943    pub schema: String,
944    pub drop_if_exists: bool,
945}
946
947impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
948    type Error = error::Error;
949
950    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
951        let DropDatabaseExpr {
952            catalog_name,
953            schema_name,
954            drop_if_exists,
955        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
956            err_msg: "expected drop database",
957        })?;
958
959        Ok(DropDatabaseTask {
960            catalog: catalog_name,
961            schema: schema_name,
962            drop_if_exists,
963        })
964    }
965}
966
967impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
968    type Error = error::Error;
969
970    fn try_from(
971        DropDatabaseTask {
972            catalog,
973            schema,
974            drop_if_exists,
975        }: DropDatabaseTask,
976    ) -> Result<Self> {
977        Ok(PbDropDatabaseTask {
978            drop_database: Some(DropDatabaseExpr {
979                catalog_name: catalog,
980                schema_name: schema,
981                drop_if_exists,
982            }),
983        })
984    }
985}
986
987#[derive(Debug, PartialEq, Clone)]
988pub struct AlterDatabaseTask {
989    pub alter_expr: AlterDatabaseExpr,
990}
991
992impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
993    type Error = error::Error;
994
995    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
996        Ok(PbAlterDatabaseTask {
997            task: Some(task.alter_expr),
998        })
999    }
1000}
1001
1002impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
1003    type Error = error::Error;
1004
1005    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
1006        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
1007            err_msg: "expected alter database",
1008        })?;
1009
1010        Ok(AlterDatabaseTask { alter_expr })
1011    }
1012}
1013
1014impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
1015    type Error = error::Error;
1016
1017    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1018        match pb {
1019            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1020                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1021                    options
1022                        .set_database_options
1023                        .into_iter()
1024                        .map(SetDatabaseOption::try_from)
1025                        .collect::<Result<Vec<_>>>()?,
1026                )))
1027            }
1028            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1029                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1030                    options
1031                        .keys
1032                        .iter()
1033                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1034                        .collect::<Result<Vec<_>>>()?,
1035                )),
1036            ),
1037        }
1038    }
1039}
1040
1041const TTL_KEY: &str = "ttl";
1042
1043impl TryFrom<PbOption> for SetDatabaseOption {
1044    type Error = error::Error;
1045
1046    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1047        match key.to_ascii_lowercase().as_str() {
1048            TTL_KEY => {
1049                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1050                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1051
1052                Ok(SetDatabaseOption::Ttl(ttl))
1053            }
1054            _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1055        }
1056    }
1057}
1058
1059#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1060pub enum SetDatabaseOption {
1061    Ttl(DatabaseTimeToLive),
1062}
1063
1064#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1065pub enum UnsetDatabaseOption {
1066    Ttl,
1067}
1068
1069impl TryFrom<&str> for UnsetDatabaseOption {
1070    type Error = error::Error;
1071
1072    fn try_from(key: &str) -> Result<Self> {
1073        match key.to_ascii_lowercase().as_str() {
1074            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1075            _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1076        }
1077    }
1078}
1079
1080#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1081pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1082
1083#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1084pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1085
1086#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1087pub enum AlterDatabaseKind {
1088    SetDatabaseOptions(SetDatabaseOptions),
1089    UnsetDatabaseOptions(UnsetDatabaseOptions),
1090}
1091
1092impl AlterDatabaseTask {
1093    pub fn catalog(&self) -> &str {
1094        &self.alter_expr.catalog_name
1095    }
1096
1097    pub fn schema(&self) -> &str {
1098        &self.alter_expr.catalog_name
1099    }
1100}
1101
1102/// Create flow
1103#[derive(Debug, Clone, Serialize, Deserialize)]
1104pub struct CreateFlowTask {
1105    pub catalog_name: String,
1106    pub flow_name: String,
1107    pub source_table_names: Vec<TableName>,
1108    pub sink_table_name: TableName,
1109    pub or_replace: bool,
1110    pub create_if_not_exists: bool,
1111    /// Duration in seconds. Data older than this duration will not be used.
1112    pub expire_after: Option<i64>,
1113    pub comment: String,
1114    pub sql: String,
1115    pub flow_options: HashMap<String, String>,
1116}
1117
1118impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1119    type Error = error::Error;
1120
1121    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1122        let CreateFlowExpr {
1123            catalog_name,
1124            flow_name,
1125            source_table_names,
1126            sink_table_name,
1127            or_replace,
1128            create_if_not_exists,
1129            expire_after,
1130            comment,
1131            sql,
1132            flow_options,
1133        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1134            err_msg: "expected create_flow",
1135        })?;
1136
1137        Ok(CreateFlowTask {
1138            catalog_name,
1139            flow_name,
1140            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1141            sink_table_name: sink_table_name
1142                .context(error::InvalidProtoMsgSnafu {
1143                    err_msg: "expected sink_table_name",
1144                })?
1145                .into(),
1146            or_replace,
1147            create_if_not_exists,
1148            expire_after: expire_after.map(|e| e.value),
1149            comment,
1150            sql,
1151            flow_options,
1152        })
1153    }
1154}
1155
1156impl From<CreateFlowTask> for PbCreateFlowTask {
1157    fn from(
1158        CreateFlowTask {
1159            catalog_name,
1160            flow_name,
1161            source_table_names,
1162            sink_table_name,
1163            or_replace,
1164            create_if_not_exists,
1165            expire_after,
1166            comment,
1167            sql,
1168            flow_options,
1169        }: CreateFlowTask,
1170    ) -> Self {
1171        PbCreateFlowTask {
1172            create_flow: Some(CreateFlowExpr {
1173                catalog_name,
1174                flow_name,
1175                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1176                sink_table_name: Some(sink_table_name.into()),
1177                or_replace,
1178                create_if_not_exists,
1179                expire_after: expire_after.map(|value| ExpireAfter { value }),
1180                comment,
1181                sql,
1182                flow_options,
1183            }),
1184        }
1185    }
1186}
1187
1188/// Drop flow
1189#[derive(Debug, Clone, Serialize, Deserialize)]
1190pub struct DropFlowTask {
1191    pub catalog_name: String,
1192    pub flow_name: String,
1193    pub flow_id: FlowId,
1194    pub drop_if_exists: bool,
1195}
1196
1197impl TryFrom<PbDropFlowTask> for DropFlowTask {
1198    type Error = error::Error;
1199
1200    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1201        let DropFlowExpr {
1202            catalog_name,
1203            flow_name,
1204            flow_id,
1205            drop_if_exists,
1206        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1207            err_msg: "expected drop_flow",
1208        })?;
1209        let flow_id = flow_id
1210            .context(error::InvalidProtoMsgSnafu {
1211                err_msg: "expected flow_id",
1212            })?
1213            .id;
1214        Ok(DropFlowTask {
1215            catalog_name,
1216            flow_name,
1217            flow_id,
1218            drop_if_exists,
1219        })
1220    }
1221}
1222
1223impl From<DropFlowTask> for PbDropFlowTask {
1224    fn from(
1225        DropFlowTask {
1226            catalog_name,
1227            flow_name,
1228            flow_id,
1229            drop_if_exists,
1230        }: DropFlowTask,
1231    ) -> Self {
1232        PbDropFlowTask {
1233            drop_flow: Some(DropFlowExpr {
1234                catalog_name,
1235                flow_name,
1236                flow_id: Some(api::v1::FlowId { id: flow_id }),
1237                drop_if_exists,
1238            }),
1239        }
1240    }
1241}
1242
1243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1244pub struct QueryContext {
1245    current_catalog: String,
1246    current_schema: String,
1247    timezone: String,
1248    extensions: HashMap<String, String>,
1249    channel: u8,
1250}
1251
1252impl From<QueryContextRef> for QueryContext {
1253    fn from(query_context: QueryContextRef) -> Self {
1254        QueryContext {
1255            current_catalog: query_context.current_catalog().to_string(),
1256            current_schema: query_context.current_schema().to_string(),
1257            timezone: query_context.timezone().to_string(),
1258            extensions: query_context.extensions(),
1259            channel: query_context.channel() as u8,
1260        }
1261    }
1262}
1263
1264impl TryFrom<QueryContext> for session::context::QueryContext {
1265    type Error = error::Error;
1266    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1267        Ok(QueryContextBuilder::default()
1268            .current_catalog(value.current_catalog)
1269            .current_schema(value.current_schema)
1270            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1271            .extensions(value.extensions)
1272            .channel((value.channel as u32).into())
1273            .build())
1274    }
1275}
1276
1277impl From<QueryContext> for PbQueryContext {
1278    fn from(
1279        QueryContext {
1280            current_catalog,
1281            current_schema,
1282            timezone,
1283            extensions,
1284            channel,
1285        }: QueryContext,
1286    ) -> Self {
1287        PbQueryContext {
1288            current_catalog,
1289            current_schema,
1290            timezone,
1291            extensions,
1292            channel: channel as u32,
1293            snapshot_seqs: None,
1294            explain: None,
1295        }
1296    }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use std::sync::Arc;
1302
1303    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1304    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1305    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1306    use store_api::storage::ConcreteDataType;
1307    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1308    use table::test_util::table_info::test_table_info;
1309
1310    use super::{AlterTableTask, CreateTableTask};
1311
1312    #[test]
1313    fn test_basic_ser_de_create_table_task() {
1314        let schema = SchemaBuilder::default().build().unwrap();
1315        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1316        let task = CreateTableTask::new(
1317            CreateTableExpr::default(),
1318            Vec::new(),
1319            RawTableInfo::from(table_info),
1320        );
1321
1322        let output = serde_json::to_vec(&task).unwrap();
1323
1324        let de = serde_json::from_slice(&output).unwrap();
1325        assert_eq!(task, de);
1326    }
1327
1328    #[test]
1329    fn test_basic_ser_de_alter_table_task() {
1330        let task = AlterTableTask {
1331            alter_table: AlterTableExpr::default(),
1332        };
1333
1334        let output = serde_json::to_vec(&task).unwrap();
1335
1336        let de = serde_json::from_slice(&output).unwrap();
1337        assert_eq!(task, de);
1338    }
1339
1340    #[test]
1341    fn test_sort_columns() {
1342        // construct RawSchema
1343        let raw_schema = RawSchema {
1344            column_schemas: vec![
1345                ColumnSchema::new(
1346                    "column3".to_string(),
1347                    ConcreteDataType::string_datatype(),
1348                    true,
1349                ),
1350                ColumnSchema::new(
1351                    "column1".to_string(),
1352                    ConcreteDataType::timestamp_millisecond_datatype(),
1353                    false,
1354                )
1355                .with_time_index(true),
1356                ColumnSchema::new(
1357                    "column2".to_string(),
1358                    ConcreteDataType::float64_datatype(),
1359                    true,
1360                ),
1361            ],
1362            timestamp_index: Some(1),
1363            version: 0,
1364        };
1365
1366        // construct RawTableMeta
1367        let raw_table_meta = RawTableMeta {
1368            schema: raw_schema,
1369            primary_key_indices: vec![0],
1370            value_indices: vec![2],
1371            engine: METRIC_ENGINE_NAME.to_string(),
1372            next_column_id: 0,
1373            region_numbers: vec![0],
1374            options: Default::default(),
1375            created_on: Default::default(),
1376            partition_key_indices: Default::default(),
1377        };
1378
1379        // construct RawTableInfo
1380        let raw_table_info = RawTableInfo {
1381            ident: Default::default(),
1382            meta: raw_table_meta,
1383            name: Default::default(),
1384            desc: Default::default(),
1385            catalog_name: Default::default(),
1386            schema_name: Default::default(),
1387            table_type: TableType::Base,
1388        };
1389
1390        // construct create table expr
1391        let create_table_expr = CreateTableExpr {
1392            column_defs: vec![
1393                ColumnDef {
1394                    name: "column3".to_string(),
1395                    semantic_type: SemanticType::Tag as i32,
1396                    ..Default::default()
1397                },
1398                ColumnDef {
1399                    name: "column1".to_string(),
1400                    semantic_type: SemanticType::Timestamp as i32,
1401                    ..Default::default()
1402                },
1403                ColumnDef {
1404                    name: "column2".to_string(),
1405                    semantic_type: SemanticType::Field as i32,
1406                    ..Default::default()
1407                },
1408            ],
1409            primary_keys: vec!["column3".to_string()],
1410            ..Default::default()
1411        };
1412
1413        let mut create_table_task =
1414            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1415
1416        // Call the sort_columns method
1417        create_table_task.sort_columns();
1418
1419        // Assert that the columns are sorted correctly
1420        assert_eq!(
1421            create_table_task.create_table.column_defs[0].name,
1422            "column1".to_string()
1423        );
1424        assert_eq!(
1425            create_table_task.create_table.column_defs[1].name,
1426            "column2".to_string()
1427        );
1428        assert_eq!(
1429            create_table_task.create_table.column_defs[2].name,
1430            "column3".to_string()
1431        );
1432
1433        // Assert that the table_info is updated correctly
1434        assert_eq!(
1435            create_table_task.table_info.meta.schema.timestamp_index,
1436            Some(0)
1437        );
1438        assert_eq!(
1439            create_table_task.table_info.meta.primary_key_indices,
1440            vec![2]
1441        );
1442        assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
1443    }
1444}