common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(feature = "enterprise")]
16pub mod trigger;
17
18use std::collections::{HashMap, HashSet};
19use std::result;
20
21use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
22use api::v1::meta::ddl_task_request::Task;
23use api::v1::meta::{
24    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
25    AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
26    CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
27    CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
28    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
29    DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
30    DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
31    DropViewTask as PbDropViewTask, Partition, ProcedureId,
32    TruncateTableTask as PbTruncateTableTask,
33};
34use api::v1::{
35    AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
36    CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
37    Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
38};
39use base64::engine::general_purpose;
40use base64::Engine as _;
41use common_time::{DatabaseTimeToLive, Timezone};
42use prost::Message;
43use serde::{Deserialize, Serialize};
44use serde_with::{serde_as, DefaultOnNull};
45use session::context::{QueryContextBuilder, QueryContextRef};
46use snafu::{OptionExt, ResultExt};
47use table::metadata::{RawTableInfo, TableId};
48use table::table_name::TableName;
49use table::table_reference::TableReference;
50
51use crate::error::{
52    self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
53    Result,
54};
55use crate::key::FlowId;
56
57/// DDL tasks
58#[derive(Debug, Clone)]
59pub enum DdlTask {
60    CreateTable(CreateTableTask),
61    DropTable(DropTableTask),
62    AlterTable(AlterTableTask),
63    TruncateTable(TruncateTableTask),
64    CreateLogicalTables(Vec<CreateTableTask>),
65    DropLogicalTables(Vec<DropTableTask>),
66    AlterLogicalTables(Vec<AlterTableTask>),
67    CreateDatabase(CreateDatabaseTask),
68    DropDatabase(DropDatabaseTask),
69    AlterDatabase(AlterDatabaseTask),
70    CreateFlow(CreateFlowTask),
71    DropFlow(DropFlowTask),
72    CreateView(CreateViewTask),
73    DropView(DropViewTask),
74    #[cfg(feature = "enterprise")]
75    CreateTrigger(trigger::CreateTriggerTask),
76}
77
78impl DdlTask {
79    /// Creates a [`DdlTask`] to create a flow.
80    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
81        DdlTask::CreateFlow(expr)
82    }
83
84    /// Creates a [`DdlTask`] to drop a flow.
85    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
86        DdlTask::DropFlow(expr)
87    }
88
89    /// Creates a [`DdlTask`] to drop a view.
90    pub fn new_drop_view(expr: DropViewTask) -> Self {
91        DdlTask::DropView(expr)
92    }
93
94    /// Creates a [`DdlTask`] to create a table.
95    pub fn new_create_table(
96        expr: CreateTableExpr,
97        partitions: Vec<Partition>,
98        table_info: RawTableInfo,
99    ) -> Self {
100        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
101    }
102
103    /// Creates a [`DdlTask`] to create several logical tables.
104    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
105        DdlTask::CreateLogicalTables(
106            table_data
107                .into_iter()
108                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
109                .collect(),
110        )
111    }
112
113    /// Creates a [`DdlTask`] to alter several logical tables.
114    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
115        DdlTask::AlterLogicalTables(
116            table_data
117                .into_iter()
118                .map(|alter_table| AlterTableTask { alter_table })
119                .collect(),
120        )
121    }
122
123    /// Creates a [`DdlTask`] to drop a table.
124    pub fn new_drop_table(
125        catalog: String,
126        schema: String,
127        table: String,
128        table_id: TableId,
129        drop_if_exists: bool,
130    ) -> Self {
131        DdlTask::DropTable(DropTableTask {
132            catalog,
133            schema,
134            table,
135            table_id,
136            drop_if_exists,
137        })
138    }
139
140    /// Creates a [`DdlTask`] to create a database.
141    pub fn new_create_database(
142        catalog: String,
143        schema: String,
144        create_if_not_exists: bool,
145        options: HashMap<String, String>,
146    ) -> Self {
147        DdlTask::CreateDatabase(CreateDatabaseTask {
148            catalog,
149            schema,
150            create_if_not_exists,
151            options,
152        })
153    }
154
155    /// Creates a [`DdlTask`] to drop a database.
156    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
157        DdlTask::DropDatabase(DropDatabaseTask {
158            catalog,
159            schema,
160            drop_if_exists,
161        })
162    }
163
164    /// Creates a [`DdlTask`] to alter a database.
165    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
166        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
167    }
168
169    /// Creates a [`DdlTask`] to alter a table.
170    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
171        DdlTask::AlterTable(AlterTableTask { alter_table })
172    }
173
174    /// Creates a [`DdlTask`] to truncate a table.
175    pub fn new_truncate_table(
176        catalog: String,
177        schema: String,
178        table: String,
179        table_id: TableId,
180    ) -> Self {
181        DdlTask::TruncateTable(TruncateTableTask {
182            catalog,
183            schema,
184            table,
185            table_id,
186        })
187    }
188
189    /// Creates a [`DdlTask`] to create a view.
190    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
191        DdlTask::CreateView(CreateViewTask {
192            create_view,
193            view_info,
194        })
195    }
196}
197
198impl TryFrom<Task> for DdlTask {
199    type Error = error::Error;
200    fn try_from(task: Task) -> Result<Self> {
201        match task {
202            Task::CreateTableTask(create_table) => {
203                Ok(DdlTask::CreateTable(create_table.try_into()?))
204            }
205            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
206            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
207            Task::TruncateTableTask(truncate_table) => {
208                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
209            }
210            Task::CreateTableTasks(create_tables) => {
211                let tasks = create_tables
212                    .tasks
213                    .into_iter()
214                    .map(|task| task.try_into())
215                    .collect::<Result<Vec<_>>>()?;
216
217                Ok(DdlTask::CreateLogicalTables(tasks))
218            }
219            Task::DropTableTasks(drop_tables) => {
220                let tasks = drop_tables
221                    .tasks
222                    .into_iter()
223                    .map(|task| task.try_into())
224                    .collect::<Result<Vec<_>>>()?;
225
226                Ok(DdlTask::DropLogicalTables(tasks))
227            }
228            Task::AlterTableTasks(alter_tables) => {
229                let tasks = alter_tables
230                    .tasks
231                    .into_iter()
232                    .map(|task| task.try_into())
233                    .collect::<Result<Vec<_>>>()?;
234
235                Ok(DdlTask::AlterLogicalTables(tasks))
236            }
237            Task::CreateDatabaseTask(create_database) => {
238                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
239            }
240            Task::DropDatabaseTask(drop_database) => {
241                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
242            }
243            Task::AlterDatabaseTask(alter_database) => {
244                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
245            }
246            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
247            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
248            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
249            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
250            Task::CreateTriggerTask(create_trigger) => {
251                #[cfg(feature = "enterprise")]
252                return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
253                #[cfg(not(feature = "enterprise"))]
254                {
255                    let _ = create_trigger;
256                    crate::error::UnsupportedSnafu {
257                        operation: "create trigger",
258                    }
259                    .fail()
260                }
261            }
262        }
263    }
264}
265
266#[derive(Clone)]
267pub struct SubmitDdlTaskRequest {
268    pub query_context: QueryContextRef,
269    pub task: DdlTask,
270}
271
272impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
273    type Error = error::Error;
274
275    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
276        let task = match request.task {
277            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
278            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
279            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
280            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
281            DdlTask::CreateLogicalTables(tasks) => {
282                let tasks = tasks
283                    .into_iter()
284                    .map(|task| task.try_into())
285                    .collect::<Result<Vec<_>>>()?;
286
287                Task::CreateTableTasks(PbCreateTableTasks { tasks })
288            }
289            DdlTask::DropLogicalTables(tasks) => {
290                let tasks = tasks
291                    .into_iter()
292                    .map(|task| task.into())
293                    .collect::<Vec<_>>();
294
295                Task::DropTableTasks(PbDropTableTasks { tasks })
296            }
297            DdlTask::AlterLogicalTables(tasks) => {
298                let tasks = tasks
299                    .into_iter()
300                    .map(|task| task.try_into())
301                    .collect::<Result<Vec<_>>>()?;
302
303                Task::AlterTableTasks(PbAlterTableTasks { tasks })
304            }
305            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
306            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
307            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
308            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
309            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
310            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
311            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
312            #[cfg(feature = "enterprise")]
313            DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
314        };
315
316        Ok(Self {
317            header: None,
318            query_context: Some((*request.query_context).clone().into()),
319            task: Some(task),
320        })
321    }
322}
323
324#[derive(Debug, Default)]
325pub struct SubmitDdlTaskResponse {
326    pub key: Vec<u8>,
327    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
328    pub table_ids: Vec<TableId>,
329}
330
331impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
332    type Error = error::Error;
333
334    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
335        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
336        Ok(Self {
337            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
338            table_ids,
339        })
340    }
341}
342
343impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
344    fn from(val: SubmitDdlTaskResponse) -> Self {
345        Self {
346            pid: Some(ProcedureId { key: val.key }),
347            table_ids: val
348                .table_ids
349                .into_iter()
350                .map(|id| api::v1::TableId { id })
351                .collect(),
352            ..Default::default()
353        }
354    }
355}
356
357/// A `CREATE VIEW` task.
358#[derive(Debug, PartialEq, Clone)]
359pub struct CreateViewTask {
360    pub create_view: CreateViewExpr,
361    pub view_info: RawTableInfo,
362}
363
364impl CreateViewTask {
365    /// Returns the [`TableReference`] of view.
366    pub fn table_ref(&self) -> TableReference {
367        TableReference {
368            catalog: &self.create_view.catalog_name,
369            schema: &self.create_view.schema_name,
370            table: &self.create_view.view_name,
371        }
372    }
373
374    /// Returns the encoded logical plan
375    pub fn raw_logical_plan(&self) -> &Vec<u8> {
376        &self.create_view.logical_plan
377    }
378
379    /// Returns the view definition in SQL
380    pub fn view_definition(&self) -> &str {
381        &self.create_view.definition
382    }
383
384    /// Returns the resolved table names in view's logical plan
385    pub fn table_names(&self) -> HashSet<TableName> {
386        self.create_view
387            .table_names
388            .iter()
389            .map(|t| t.clone().into())
390            .collect()
391    }
392
393    /// Returns the view's columns
394    pub fn columns(&self) -> &Vec<String> {
395        &self.create_view.columns
396    }
397
398    /// Returns the original logical plan's columns
399    pub fn plan_columns(&self) -> &Vec<String> {
400        &self.create_view.plan_columns
401    }
402}
403
404impl TryFrom<PbCreateViewTask> for CreateViewTask {
405    type Error = error::Error;
406
407    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
408        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
409
410        Ok(CreateViewTask {
411            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
412                err_msg: "expected create view",
413            })?,
414            view_info,
415        })
416    }
417}
418
419impl TryFrom<CreateViewTask> for PbCreateViewTask {
420    type Error = error::Error;
421
422    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
423        Ok(PbCreateViewTask {
424            create_view: Some(task.create_view),
425            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
426        })
427    }
428}
429
430impl Serialize for CreateViewTask {
431    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
432    where
433        S: serde::Serializer,
434    {
435        let view_info = serde_json::to_vec(&self.view_info)
436            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
437
438        let pb = PbCreateViewTask {
439            create_view: Some(self.create_view.clone()),
440            view_info,
441        };
442        let buf = pb.encode_to_vec();
443        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
444        serializer.serialize_str(&encoded)
445    }
446}
447
448impl<'de> Deserialize<'de> for CreateViewTask {
449    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
450    where
451        D: serde::Deserializer<'de>,
452    {
453        let encoded = String::deserialize(deserializer)?;
454        let buf = general_purpose::STANDARD_NO_PAD
455            .decode(encoded)
456            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
457        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
458            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
459
460        let expr = CreateViewTask::try_from(expr)
461            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
462
463        Ok(expr)
464    }
465}
466
467/// A `DROP VIEW` task.
468#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
469pub struct DropViewTask {
470    pub catalog: String,
471    pub schema: String,
472    pub view: String,
473    pub view_id: TableId,
474    pub drop_if_exists: bool,
475}
476
477impl DropViewTask {
478    /// Returns the [`TableReference`] of view.
479    pub fn table_ref(&self) -> TableReference {
480        TableReference {
481            catalog: &self.catalog,
482            schema: &self.schema,
483            table: &self.view,
484        }
485    }
486}
487
488impl TryFrom<PbDropViewTask> for DropViewTask {
489    type Error = error::Error;
490
491    fn try_from(pb: PbDropViewTask) -> Result<Self> {
492        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
493            err_msg: "expected drop view",
494        })?;
495
496        Ok(DropViewTask {
497            catalog: expr.catalog_name,
498            schema: expr.schema_name,
499            view: expr.view_name,
500            view_id: expr
501                .view_id
502                .context(error::InvalidProtoMsgSnafu {
503                    err_msg: "expected view_id",
504                })?
505                .id,
506            drop_if_exists: expr.drop_if_exists,
507        })
508    }
509}
510
511impl From<DropViewTask> for PbDropViewTask {
512    fn from(task: DropViewTask) -> Self {
513        PbDropViewTask {
514            drop_view: Some(DropViewExpr {
515                catalog_name: task.catalog,
516                schema_name: task.schema,
517                view_name: task.view,
518                view_id: Some(api::v1::TableId { id: task.view_id }),
519                drop_if_exists: task.drop_if_exists,
520            }),
521        }
522    }
523}
524
525#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
526pub struct DropTableTask {
527    pub catalog: String,
528    pub schema: String,
529    pub table: String,
530    pub table_id: TableId,
531    #[serde(default)]
532    pub drop_if_exists: bool,
533}
534
535impl DropTableTask {
536    pub fn table_ref(&self) -> TableReference {
537        TableReference {
538            catalog: &self.catalog,
539            schema: &self.schema,
540            table: &self.table,
541        }
542    }
543
544    pub fn table_name(&self) -> TableName {
545        TableName {
546            catalog_name: self.catalog.to_string(),
547            schema_name: self.schema.to_string(),
548            table_name: self.table.to_string(),
549        }
550    }
551}
552
553impl TryFrom<PbDropTableTask> for DropTableTask {
554    type Error = error::Error;
555
556    fn try_from(pb: PbDropTableTask) -> Result<Self> {
557        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
558            err_msg: "expected drop table",
559        })?;
560
561        Ok(Self {
562            catalog: drop_table.catalog_name,
563            schema: drop_table.schema_name,
564            table: drop_table.table_name,
565            table_id: drop_table
566                .table_id
567                .context(error::InvalidProtoMsgSnafu {
568                    err_msg: "expected table_id",
569                })?
570                .id,
571            drop_if_exists: drop_table.drop_if_exists,
572        })
573    }
574}
575
576impl From<DropTableTask> for PbDropTableTask {
577    fn from(task: DropTableTask) -> Self {
578        PbDropTableTask {
579            drop_table: Some(DropTableExpr {
580                catalog_name: task.catalog,
581                schema_name: task.schema,
582                table_name: task.table,
583                table_id: Some(api::v1::TableId { id: task.table_id }),
584                drop_if_exists: task.drop_if_exists,
585            }),
586        }
587    }
588}
589
590#[derive(Debug, PartialEq, Clone)]
591pub struct CreateTableTask {
592    pub create_table: CreateTableExpr,
593    pub partitions: Vec<Partition>,
594    pub table_info: RawTableInfo,
595}
596
597impl TryFrom<PbCreateTableTask> for CreateTableTask {
598    type Error = error::Error;
599
600    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
601        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
602
603        Ok(CreateTableTask::new(
604            pb.create_table.context(error::InvalidProtoMsgSnafu {
605                err_msg: "expected create table",
606            })?,
607            pb.partitions,
608            table_info,
609        ))
610    }
611}
612
613impl TryFrom<CreateTableTask> for PbCreateTableTask {
614    type Error = error::Error;
615
616    fn try_from(task: CreateTableTask) -> Result<Self> {
617        Ok(PbCreateTableTask {
618            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
619            create_table: Some(task.create_table),
620            partitions: task.partitions,
621        })
622    }
623}
624
625impl CreateTableTask {
626    pub fn new(
627        expr: CreateTableExpr,
628        partitions: Vec<Partition>,
629        table_info: RawTableInfo,
630    ) -> CreateTableTask {
631        CreateTableTask {
632            create_table: expr,
633            partitions,
634            table_info,
635        }
636    }
637
638    pub fn table_name(&self) -> TableName {
639        let table = &self.create_table;
640
641        TableName {
642            catalog_name: table.catalog_name.to_string(),
643            schema_name: table.schema_name.to_string(),
644            table_name: table.table_name.to_string(),
645        }
646    }
647
648    pub fn table_ref(&self) -> TableReference {
649        let table = &self.create_table;
650
651        TableReference {
652            catalog: &table.catalog_name,
653            schema: &table.schema_name,
654            table: &table.table_name,
655        }
656    }
657
658    /// Sets the `table_info`'s table_id.
659    pub fn set_table_id(&mut self, table_id: TableId) {
660        self.table_info.ident.table_id = table_id;
661    }
662
663    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
664    ///
665    /// This function won't do any check or verification. Caller should
666    /// ensure this task is valid.
667    pub fn sort_columns(&mut self) {
668        // sort create table expr
669        // sort column_defs by name
670        self.create_table
671            .column_defs
672            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
673
674        self.table_info.sort_columns();
675    }
676}
677
678impl Serialize for CreateTableTask {
679    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
680    where
681        S: serde::Serializer,
682    {
683        let table_info = serde_json::to_vec(&self.table_info)
684            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
685
686        let pb = PbCreateTableTask {
687            create_table: Some(self.create_table.clone()),
688            partitions: self.partitions.clone(),
689            table_info,
690        };
691        let buf = pb.encode_to_vec();
692        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
693        serializer.serialize_str(&encoded)
694    }
695}
696
697impl<'de> Deserialize<'de> for CreateTableTask {
698    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
699    where
700        D: serde::Deserializer<'de>,
701    {
702        let encoded = String::deserialize(deserializer)?;
703        let buf = general_purpose::STANDARD_NO_PAD
704            .decode(encoded)
705            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
706        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
707            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
708
709        let expr = CreateTableTask::try_from(expr)
710            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
711
712        Ok(expr)
713    }
714}
715
716#[derive(Debug, PartialEq, Clone)]
717pub struct AlterTableTask {
718    // TODO(CookiePieWw): Replace proto struct with user-defined struct
719    pub alter_table: AlterTableExpr,
720}
721
722impl AlterTableTask {
723    pub fn validate(&self) -> Result<()> {
724        self.alter_table
725            .kind
726            .as_ref()
727            .context(error::UnexpectedSnafu {
728                err_msg: "'kind' is absent",
729            })?;
730        Ok(())
731    }
732
733    pub fn table_ref(&self) -> TableReference {
734        TableReference {
735            catalog: &self.alter_table.catalog_name,
736            schema: &self.alter_table.schema_name,
737            table: &self.alter_table.table_name,
738        }
739    }
740
741    pub fn table_name(&self) -> TableName {
742        let table = &self.alter_table;
743
744        TableName {
745            catalog_name: table.catalog_name.to_string(),
746            schema_name: table.schema_name.to_string(),
747            table_name: table.table_name.to_string(),
748        }
749    }
750}
751
752impl TryFrom<PbAlterTableTask> for AlterTableTask {
753    type Error = error::Error;
754
755    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
756        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
757            err_msg: "expected alter_table",
758        })?;
759
760        Ok(AlterTableTask { alter_table })
761    }
762}
763
764impl TryFrom<AlterTableTask> for PbAlterTableTask {
765    type Error = error::Error;
766
767    fn try_from(task: AlterTableTask) -> Result<Self> {
768        Ok(PbAlterTableTask {
769            alter_table: Some(task.alter_table),
770        })
771    }
772}
773
774impl Serialize for AlterTableTask {
775    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
776    where
777        S: serde::Serializer,
778    {
779        let pb = PbAlterTableTask {
780            alter_table: Some(self.alter_table.clone()),
781        };
782        let buf = pb.encode_to_vec();
783        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
784        serializer.serialize_str(&encoded)
785    }
786}
787
788impl<'de> Deserialize<'de> for AlterTableTask {
789    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
790    where
791        D: serde::Deserializer<'de>,
792    {
793        let encoded = String::deserialize(deserializer)?;
794        let buf = general_purpose::STANDARD_NO_PAD
795            .decode(encoded)
796            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
797        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
798            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
799
800        let expr = AlterTableTask::try_from(expr)
801            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
802
803        Ok(expr)
804    }
805}
806
807#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
808pub struct TruncateTableTask {
809    pub catalog: String,
810    pub schema: String,
811    pub table: String,
812    pub table_id: TableId,
813}
814
815impl TruncateTableTask {
816    pub fn table_ref(&self) -> TableReference {
817        TableReference {
818            catalog: &self.catalog,
819            schema: &self.schema,
820            table: &self.table,
821        }
822    }
823
824    pub fn table_name(&self) -> TableName {
825        TableName {
826            catalog_name: self.catalog.to_string(),
827            schema_name: self.schema.to_string(),
828            table_name: self.table.to_string(),
829        }
830    }
831}
832
833impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
834    type Error = error::Error;
835
836    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
837        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
838            err_msg: "expected truncate table",
839        })?;
840
841        Ok(Self {
842            catalog: truncate_table.catalog_name,
843            schema: truncate_table.schema_name,
844            table: truncate_table.table_name,
845            table_id: truncate_table
846                .table_id
847                .context(error::InvalidProtoMsgSnafu {
848                    err_msg: "expected table_id",
849                })?
850                .id,
851        })
852    }
853}
854
855impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
856    type Error = error::Error;
857
858    fn try_from(task: TruncateTableTask) -> Result<Self> {
859        Ok(PbTruncateTableTask {
860            truncate_table: Some(TruncateTableExpr {
861                catalog_name: task.catalog,
862                schema_name: task.schema,
863                table_name: task.table,
864                table_id: Some(api::v1::TableId { id: task.table_id }),
865            }),
866        })
867    }
868}
869
870#[serde_as]
871#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
872pub struct CreateDatabaseTask {
873    pub catalog: String,
874    pub schema: String,
875    pub create_if_not_exists: bool,
876    #[serde_as(deserialize_as = "DefaultOnNull")]
877    pub options: HashMap<String, String>,
878}
879
880impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
881    type Error = error::Error;
882
883    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
884        let CreateDatabaseExpr {
885            catalog_name,
886            schema_name,
887            create_if_not_exists,
888            options,
889        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
890            err_msg: "expected create database",
891        })?;
892
893        Ok(CreateDatabaseTask {
894            catalog: catalog_name,
895            schema: schema_name,
896            create_if_not_exists,
897            options,
898        })
899    }
900}
901
902impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
903    type Error = error::Error;
904
905    fn try_from(
906        CreateDatabaseTask {
907            catalog,
908            schema,
909            create_if_not_exists,
910            options,
911        }: CreateDatabaseTask,
912    ) -> Result<Self> {
913        Ok(PbCreateDatabaseTask {
914            create_database: Some(CreateDatabaseExpr {
915                catalog_name: catalog,
916                schema_name: schema,
917                create_if_not_exists,
918                options,
919            }),
920        })
921    }
922}
923
924#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
925pub struct DropDatabaseTask {
926    pub catalog: String,
927    pub schema: String,
928    pub drop_if_exists: bool,
929}
930
931impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
932    type Error = error::Error;
933
934    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
935        let DropDatabaseExpr {
936            catalog_name,
937            schema_name,
938            drop_if_exists,
939        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
940            err_msg: "expected drop database",
941        })?;
942
943        Ok(DropDatabaseTask {
944            catalog: catalog_name,
945            schema: schema_name,
946            drop_if_exists,
947        })
948    }
949}
950
951impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
952    type Error = error::Error;
953
954    fn try_from(
955        DropDatabaseTask {
956            catalog,
957            schema,
958            drop_if_exists,
959        }: DropDatabaseTask,
960    ) -> Result<Self> {
961        Ok(PbDropDatabaseTask {
962            drop_database: Some(DropDatabaseExpr {
963                catalog_name: catalog,
964                schema_name: schema,
965                drop_if_exists,
966            }),
967        })
968    }
969}
970
971#[derive(Debug, PartialEq, Clone)]
972pub struct AlterDatabaseTask {
973    pub alter_expr: AlterDatabaseExpr,
974}
975
976impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
977    type Error = error::Error;
978
979    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
980        Ok(PbAlterDatabaseTask {
981            task: Some(task.alter_expr),
982        })
983    }
984}
985
986impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
987    type Error = error::Error;
988
989    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
990        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
991            err_msg: "expected alter database",
992        })?;
993
994        Ok(AlterDatabaseTask { alter_expr })
995    }
996}
997
998impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
999    type Error = error::Error;
1000
1001    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
1002        match pb {
1003            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
1004                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
1005                    options
1006                        .set_database_options
1007                        .into_iter()
1008                        .map(SetDatabaseOption::try_from)
1009                        .collect::<Result<Vec<_>>>()?,
1010                )))
1011            }
1012            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
1013                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
1014                    options
1015                        .keys
1016                        .iter()
1017                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
1018                        .collect::<Result<Vec<_>>>()?,
1019                )),
1020            ),
1021        }
1022    }
1023}
1024
1025const TTL_KEY: &str = "ttl";
1026
1027impl TryFrom<PbOption> for SetDatabaseOption {
1028    type Error = error::Error;
1029
1030    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1031        match key.to_ascii_lowercase().as_str() {
1032            TTL_KEY => {
1033                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1034                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1035
1036                Ok(SetDatabaseOption::Ttl(ttl))
1037            }
1038            _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1039        }
1040    }
1041}
1042
1043#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1044pub enum SetDatabaseOption {
1045    Ttl(DatabaseTimeToLive),
1046}
1047
1048#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1049pub enum UnsetDatabaseOption {
1050    Ttl,
1051}
1052
1053impl TryFrom<&str> for UnsetDatabaseOption {
1054    type Error = error::Error;
1055
1056    fn try_from(key: &str) -> Result<Self> {
1057        match key.to_ascii_lowercase().as_str() {
1058            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1059            _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1060        }
1061    }
1062}
1063
1064#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1065pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1066
1067#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1068pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1069
1070#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1071pub enum AlterDatabaseKind {
1072    SetDatabaseOptions(SetDatabaseOptions),
1073    UnsetDatabaseOptions(UnsetDatabaseOptions),
1074}
1075
1076impl AlterDatabaseTask {
1077    pub fn catalog(&self) -> &str {
1078        &self.alter_expr.catalog_name
1079    }
1080
1081    pub fn schema(&self) -> &str {
1082        &self.alter_expr.catalog_name
1083    }
1084}
1085
1086/// Create flow
1087#[derive(Debug, Clone, Serialize, Deserialize)]
1088pub struct CreateFlowTask {
1089    pub catalog_name: String,
1090    pub flow_name: String,
1091    pub source_table_names: Vec<TableName>,
1092    pub sink_table_name: TableName,
1093    pub or_replace: bool,
1094    pub create_if_not_exists: bool,
1095    /// Duration in seconds. Data older than this duration will not be used.
1096    pub expire_after: Option<i64>,
1097    pub comment: String,
1098    pub sql: String,
1099    pub flow_options: HashMap<String, String>,
1100}
1101
1102impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1103    type Error = error::Error;
1104
1105    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1106        let CreateFlowExpr {
1107            catalog_name,
1108            flow_name,
1109            source_table_names,
1110            sink_table_name,
1111            or_replace,
1112            create_if_not_exists,
1113            expire_after,
1114            comment,
1115            sql,
1116            flow_options,
1117        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1118            err_msg: "expected create_flow",
1119        })?;
1120
1121        Ok(CreateFlowTask {
1122            catalog_name,
1123            flow_name,
1124            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1125            sink_table_name: sink_table_name
1126                .context(error::InvalidProtoMsgSnafu {
1127                    err_msg: "expected sink_table_name",
1128                })?
1129                .into(),
1130            or_replace,
1131            create_if_not_exists,
1132            expire_after: expire_after.map(|e| e.value),
1133            comment,
1134            sql,
1135            flow_options,
1136        })
1137    }
1138}
1139
1140impl From<CreateFlowTask> for PbCreateFlowTask {
1141    fn from(
1142        CreateFlowTask {
1143            catalog_name,
1144            flow_name,
1145            source_table_names,
1146            sink_table_name,
1147            or_replace,
1148            create_if_not_exists,
1149            expire_after,
1150            comment,
1151            sql,
1152            flow_options,
1153        }: CreateFlowTask,
1154    ) -> Self {
1155        PbCreateFlowTask {
1156            create_flow: Some(CreateFlowExpr {
1157                catalog_name,
1158                flow_name,
1159                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1160                sink_table_name: Some(sink_table_name.into()),
1161                or_replace,
1162                create_if_not_exists,
1163                expire_after: expire_after.map(|value| ExpireAfter { value }),
1164                comment,
1165                sql,
1166                flow_options,
1167            }),
1168        }
1169    }
1170}
1171
1172/// Drop flow
1173#[derive(Debug, Clone, Serialize, Deserialize)]
1174pub struct DropFlowTask {
1175    pub catalog_name: String,
1176    pub flow_name: String,
1177    pub flow_id: FlowId,
1178    pub drop_if_exists: bool,
1179}
1180
1181impl TryFrom<PbDropFlowTask> for DropFlowTask {
1182    type Error = error::Error;
1183
1184    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1185        let DropFlowExpr {
1186            catalog_name,
1187            flow_name,
1188            flow_id,
1189            drop_if_exists,
1190        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1191            err_msg: "expected drop_flow",
1192        })?;
1193        let flow_id = flow_id
1194            .context(error::InvalidProtoMsgSnafu {
1195                err_msg: "expected flow_id",
1196            })?
1197            .id;
1198        Ok(DropFlowTask {
1199            catalog_name,
1200            flow_name,
1201            flow_id,
1202            drop_if_exists,
1203        })
1204    }
1205}
1206
1207impl From<DropFlowTask> for PbDropFlowTask {
1208    fn from(
1209        DropFlowTask {
1210            catalog_name,
1211            flow_name,
1212            flow_id,
1213            drop_if_exists,
1214        }: DropFlowTask,
1215    ) -> Self {
1216        PbDropFlowTask {
1217            drop_flow: Some(DropFlowExpr {
1218                catalog_name,
1219                flow_name,
1220                flow_id: Some(api::v1::FlowId { id: flow_id }),
1221                drop_if_exists,
1222            }),
1223        }
1224    }
1225}
1226
1227#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1228pub struct QueryContext {
1229    current_catalog: String,
1230    current_schema: String,
1231    timezone: String,
1232    extensions: HashMap<String, String>,
1233    channel: u8,
1234}
1235
1236impl From<QueryContextRef> for QueryContext {
1237    fn from(query_context: QueryContextRef) -> Self {
1238        QueryContext {
1239            current_catalog: query_context.current_catalog().to_string(),
1240            current_schema: query_context.current_schema().to_string(),
1241            timezone: query_context.timezone().to_string(),
1242            extensions: query_context.extensions(),
1243            channel: query_context.channel() as u8,
1244        }
1245    }
1246}
1247
1248impl TryFrom<QueryContext> for session::context::QueryContext {
1249    type Error = error::Error;
1250    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1251        Ok(QueryContextBuilder::default()
1252            .current_catalog(value.current_catalog)
1253            .current_schema(value.current_schema)
1254            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1255            .extensions(value.extensions)
1256            .channel((value.channel as u32).into())
1257            .build())
1258    }
1259}
1260
1261impl From<QueryContext> for PbQueryContext {
1262    fn from(
1263        QueryContext {
1264            current_catalog,
1265            current_schema,
1266            timezone,
1267            extensions,
1268            channel,
1269        }: QueryContext,
1270    ) -> Self {
1271        PbQueryContext {
1272            current_catalog,
1273            current_schema,
1274            timezone,
1275            extensions,
1276            channel: channel as u32,
1277            snapshot_seqs: None,
1278            explain: None,
1279        }
1280    }
1281}
1282
1283#[cfg(test)]
1284mod tests {
1285    use std::sync::Arc;
1286
1287    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1288    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1289    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1290    use store_api::storage::ConcreteDataType;
1291    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1292    use table::test_util::table_info::test_table_info;
1293
1294    use super::{AlterTableTask, CreateTableTask};
1295
1296    #[test]
1297    fn test_basic_ser_de_create_table_task() {
1298        let schema = SchemaBuilder::default().build().unwrap();
1299        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1300        let task = CreateTableTask::new(
1301            CreateTableExpr::default(),
1302            Vec::new(),
1303            RawTableInfo::from(table_info),
1304        );
1305
1306        let output = serde_json::to_vec(&task).unwrap();
1307
1308        let de = serde_json::from_slice(&output).unwrap();
1309        assert_eq!(task, de);
1310    }
1311
1312    #[test]
1313    fn test_basic_ser_de_alter_table_task() {
1314        let task = AlterTableTask {
1315            alter_table: AlterTableExpr::default(),
1316        };
1317
1318        let output = serde_json::to_vec(&task).unwrap();
1319
1320        let de = serde_json::from_slice(&output).unwrap();
1321        assert_eq!(task, de);
1322    }
1323
1324    #[test]
1325    fn test_sort_columns() {
1326        // construct RawSchema
1327        let raw_schema = RawSchema {
1328            column_schemas: vec![
1329                ColumnSchema::new(
1330                    "column3".to_string(),
1331                    ConcreteDataType::string_datatype(),
1332                    true,
1333                ),
1334                ColumnSchema::new(
1335                    "column1".to_string(),
1336                    ConcreteDataType::timestamp_millisecond_datatype(),
1337                    false,
1338                )
1339                .with_time_index(true),
1340                ColumnSchema::new(
1341                    "column2".to_string(),
1342                    ConcreteDataType::float64_datatype(),
1343                    true,
1344                ),
1345            ],
1346            timestamp_index: Some(1),
1347            version: 0,
1348        };
1349
1350        // construct RawTableMeta
1351        let raw_table_meta = RawTableMeta {
1352            schema: raw_schema,
1353            primary_key_indices: vec![0],
1354            value_indices: vec![2],
1355            engine: METRIC_ENGINE_NAME.to_string(),
1356            next_column_id: 0,
1357            region_numbers: vec![0],
1358            options: Default::default(),
1359            created_on: Default::default(),
1360            partition_key_indices: Default::default(),
1361        };
1362
1363        // construct RawTableInfo
1364        let raw_table_info = RawTableInfo {
1365            ident: Default::default(),
1366            meta: raw_table_meta,
1367            name: Default::default(),
1368            desc: Default::default(),
1369            catalog_name: Default::default(),
1370            schema_name: Default::default(),
1371            table_type: TableType::Base,
1372        };
1373
1374        // construct create table expr
1375        let create_table_expr = CreateTableExpr {
1376            column_defs: vec![
1377                ColumnDef {
1378                    name: "column3".to_string(),
1379                    semantic_type: SemanticType::Tag as i32,
1380                    ..Default::default()
1381                },
1382                ColumnDef {
1383                    name: "column1".to_string(),
1384                    semantic_type: SemanticType::Timestamp as i32,
1385                    ..Default::default()
1386                },
1387                ColumnDef {
1388                    name: "column2".to_string(),
1389                    semantic_type: SemanticType::Field as i32,
1390                    ..Default::default()
1391                },
1392            ],
1393            primary_keys: vec!["column3".to_string()],
1394            ..Default::default()
1395        };
1396
1397        let mut create_table_task =
1398            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1399
1400        // Call the sort_columns method
1401        create_table_task.sort_columns();
1402
1403        // Assert that the columns are sorted correctly
1404        assert_eq!(
1405            create_table_task.create_table.column_defs[0].name,
1406            "column1".to_string()
1407        );
1408        assert_eq!(
1409            create_table_task.create_table.column_defs[1].name,
1410            "column2".to_string()
1411        );
1412        assert_eq!(
1413            create_table_task.create_table.column_defs[2].name,
1414            "column3".to_string()
1415        );
1416
1417        // Assert that the table_info is updated correctly
1418        assert_eq!(
1419            create_table_task.table_info.meta.schema.timestamp_index,
1420            Some(0)
1421        );
1422        assert_eq!(
1423            create_table_task.table_info.meta.primary_key_indices,
1424            vec![2]
1425        );
1426        assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
1427    }
1428}