common_meta/rpc/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::result;
17
18use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
19use api::v1::meta::ddl_task_request::Task;
20use api::v1::meta::{
21    AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
22    AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
23    CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
24    CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
25    DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
26    DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
27    DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
28    DropViewTask as PbDropViewTask, Partition, ProcedureId,
29    TruncateTableTask as PbTruncateTableTask,
30};
31use api::v1::{
32    AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
33    CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
34    Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
35};
36use base64::engine::general_purpose;
37use base64::Engine as _;
38use common_time::{DatabaseTimeToLive, Timezone};
39use prost::Message;
40use serde::{Deserialize, Serialize};
41use serde_with::{serde_as, DefaultOnNull};
42use session::context::{QueryContextBuilder, QueryContextRef};
43use snafu::{OptionExt, ResultExt};
44use table::metadata::{RawTableInfo, TableId};
45use table::table_name::TableName;
46use table::table_reference::TableReference;
47
48use crate::error::{
49    self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
50    Result,
51};
52use crate::key::FlowId;
53
54/// DDL tasks
55#[derive(Debug, Clone)]
56pub enum DdlTask {
57    CreateTable(CreateTableTask),
58    DropTable(DropTableTask),
59    AlterTable(AlterTableTask),
60    TruncateTable(TruncateTableTask),
61    CreateLogicalTables(Vec<CreateTableTask>),
62    DropLogicalTables(Vec<DropTableTask>),
63    AlterLogicalTables(Vec<AlterTableTask>),
64    CreateDatabase(CreateDatabaseTask),
65    DropDatabase(DropDatabaseTask),
66    AlterDatabase(AlterDatabaseTask),
67    CreateFlow(CreateFlowTask),
68    DropFlow(DropFlowTask),
69    CreateView(CreateViewTask),
70    DropView(DropViewTask),
71}
72
73impl DdlTask {
74    /// Creates a [`DdlTask`] to create a flow.
75    pub fn new_create_flow(expr: CreateFlowTask) -> Self {
76        DdlTask::CreateFlow(expr)
77    }
78
79    /// Creates a [`DdlTask`] to drop a flow.
80    pub fn new_drop_flow(expr: DropFlowTask) -> Self {
81        DdlTask::DropFlow(expr)
82    }
83
84    /// Creates a [`DdlTask`] to drop a view.
85    pub fn new_drop_view(expr: DropViewTask) -> Self {
86        DdlTask::DropView(expr)
87    }
88
89    /// Creates a [`DdlTask`] to create a table.
90    pub fn new_create_table(
91        expr: CreateTableExpr,
92        partitions: Vec<Partition>,
93        table_info: RawTableInfo,
94    ) -> Self {
95        DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
96    }
97
98    /// Creates a [`DdlTask`] to create several logical tables.
99    pub fn new_create_logical_tables(table_data: Vec<(CreateTableExpr, RawTableInfo)>) -> Self {
100        DdlTask::CreateLogicalTables(
101            table_data
102                .into_iter()
103                .map(|(expr, table_info)| CreateTableTask::new(expr, Vec::new(), table_info))
104                .collect(),
105        )
106    }
107
108    /// Creates a [`DdlTask`] to alter several logical tables.
109    pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
110        DdlTask::AlterLogicalTables(
111            table_data
112                .into_iter()
113                .map(|alter_table| AlterTableTask { alter_table })
114                .collect(),
115        )
116    }
117
118    /// Creates a [`DdlTask`] to drop a table.
119    pub fn new_drop_table(
120        catalog: String,
121        schema: String,
122        table: String,
123        table_id: TableId,
124        drop_if_exists: bool,
125    ) -> Self {
126        DdlTask::DropTable(DropTableTask {
127            catalog,
128            schema,
129            table,
130            table_id,
131            drop_if_exists,
132        })
133    }
134
135    /// Creates a [`DdlTask`] to create a database.
136    pub fn new_create_database(
137        catalog: String,
138        schema: String,
139        create_if_not_exists: bool,
140        options: HashMap<String, String>,
141    ) -> Self {
142        DdlTask::CreateDatabase(CreateDatabaseTask {
143            catalog,
144            schema,
145            create_if_not_exists,
146            options,
147        })
148    }
149
150    /// Creates a [`DdlTask`] to drop a database.
151    pub fn new_drop_database(catalog: String, schema: String, drop_if_exists: bool) -> Self {
152        DdlTask::DropDatabase(DropDatabaseTask {
153            catalog,
154            schema,
155            drop_if_exists,
156        })
157    }
158
159    /// Creates a [`DdlTask`] to alter a database.
160    pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
161        DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
162    }
163
164    /// Creates a [`DdlTask`] to alter a table.
165    pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
166        DdlTask::AlterTable(AlterTableTask { alter_table })
167    }
168
169    /// Creates a [`DdlTask`] to truncate a table.
170    pub fn new_truncate_table(
171        catalog: String,
172        schema: String,
173        table: String,
174        table_id: TableId,
175    ) -> Self {
176        DdlTask::TruncateTable(TruncateTableTask {
177            catalog,
178            schema,
179            table,
180            table_id,
181        })
182    }
183
184    /// Creates a [`DdlTask`] to create a view.
185    pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
186        DdlTask::CreateView(CreateViewTask {
187            create_view,
188            view_info,
189        })
190    }
191}
192
193impl TryFrom<Task> for DdlTask {
194    type Error = error::Error;
195    fn try_from(task: Task) -> Result<Self> {
196        match task {
197            Task::CreateTableTask(create_table) => {
198                Ok(DdlTask::CreateTable(create_table.try_into()?))
199            }
200            Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
201            Task::AlterTableTask(alter_table) => Ok(DdlTask::AlterTable(alter_table.try_into()?)),
202            Task::TruncateTableTask(truncate_table) => {
203                Ok(DdlTask::TruncateTable(truncate_table.try_into()?))
204            }
205            Task::CreateTableTasks(create_tables) => {
206                let tasks = create_tables
207                    .tasks
208                    .into_iter()
209                    .map(|task| task.try_into())
210                    .collect::<Result<Vec<_>>>()?;
211
212                Ok(DdlTask::CreateLogicalTables(tasks))
213            }
214            Task::DropTableTasks(drop_tables) => {
215                let tasks = drop_tables
216                    .tasks
217                    .into_iter()
218                    .map(|task| task.try_into())
219                    .collect::<Result<Vec<_>>>()?;
220
221                Ok(DdlTask::DropLogicalTables(tasks))
222            }
223            Task::AlterTableTasks(alter_tables) => {
224                let tasks = alter_tables
225                    .tasks
226                    .into_iter()
227                    .map(|task| task.try_into())
228                    .collect::<Result<Vec<_>>>()?;
229
230                Ok(DdlTask::AlterLogicalTables(tasks))
231            }
232            Task::CreateDatabaseTask(create_database) => {
233                Ok(DdlTask::CreateDatabase(create_database.try_into()?))
234            }
235            Task::DropDatabaseTask(drop_database) => {
236                Ok(DdlTask::DropDatabase(drop_database.try_into()?))
237            }
238            Task::AlterDatabaseTask(alter_database) => {
239                Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
240            }
241            Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
242            Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
243            Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
244            Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
245        }
246    }
247}
248
249#[derive(Clone)]
250pub struct SubmitDdlTaskRequest {
251    pub query_context: QueryContextRef,
252    pub task: DdlTask,
253}
254
255impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
256    type Error = error::Error;
257
258    fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
259        let task = match request.task {
260            DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
261            DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
262            DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
263            DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
264            DdlTask::CreateLogicalTables(tasks) => {
265                let tasks = tasks
266                    .into_iter()
267                    .map(|task| task.try_into())
268                    .collect::<Result<Vec<_>>>()?;
269
270                Task::CreateTableTasks(PbCreateTableTasks { tasks })
271            }
272            DdlTask::DropLogicalTables(tasks) => {
273                let tasks = tasks
274                    .into_iter()
275                    .map(|task| task.into())
276                    .collect::<Vec<_>>();
277
278                Task::DropTableTasks(PbDropTableTasks { tasks })
279            }
280            DdlTask::AlterLogicalTables(tasks) => {
281                let tasks = tasks
282                    .into_iter()
283                    .map(|task| task.try_into())
284                    .collect::<Result<Vec<_>>>()?;
285
286                Task::AlterTableTasks(PbAlterTableTasks { tasks })
287            }
288            DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
289            DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
290            DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
291            DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
292            DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
293            DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
294            DdlTask::DropView(task) => Task::DropViewTask(task.into()),
295        };
296
297        Ok(Self {
298            header: None,
299            query_context: Some((*request.query_context).clone().into()),
300            task: Some(task),
301        })
302    }
303}
304
305#[derive(Debug, Default)]
306pub struct SubmitDdlTaskResponse {
307    pub key: Vec<u8>,
308    // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
309    pub table_ids: Vec<TableId>,
310}
311
312impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
313    type Error = error::Error;
314
315    fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
316        let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
317        Ok(Self {
318            key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
319            table_ids,
320        })
321    }
322}
323
324impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
325    fn from(val: SubmitDdlTaskResponse) -> Self {
326        Self {
327            pid: Some(ProcedureId { key: val.key }),
328            table_ids: val
329                .table_ids
330                .into_iter()
331                .map(|id| api::v1::TableId { id })
332                .collect(),
333            ..Default::default()
334        }
335    }
336}
337
338/// A `CREATE VIEW` task.
339#[derive(Debug, PartialEq, Clone)]
340pub struct CreateViewTask {
341    pub create_view: CreateViewExpr,
342    pub view_info: RawTableInfo,
343}
344
345impl CreateViewTask {
346    /// Returns the [`TableReference`] of view.
347    pub fn table_ref(&self) -> TableReference {
348        TableReference {
349            catalog: &self.create_view.catalog_name,
350            schema: &self.create_view.schema_name,
351            table: &self.create_view.view_name,
352        }
353    }
354
355    /// Returns the encoded logical plan
356    pub fn raw_logical_plan(&self) -> &Vec<u8> {
357        &self.create_view.logical_plan
358    }
359
360    /// Returns the view definition in SQL
361    pub fn view_definition(&self) -> &str {
362        &self.create_view.definition
363    }
364
365    /// Returns the resolved table names in view's logical plan
366    pub fn table_names(&self) -> HashSet<TableName> {
367        self.create_view
368            .table_names
369            .iter()
370            .map(|t| t.clone().into())
371            .collect()
372    }
373
374    /// Returns the view's columns
375    pub fn columns(&self) -> &Vec<String> {
376        &self.create_view.columns
377    }
378
379    /// Returns the original logical plan's columns
380    pub fn plan_columns(&self) -> &Vec<String> {
381        &self.create_view.plan_columns
382    }
383}
384
385impl TryFrom<PbCreateViewTask> for CreateViewTask {
386    type Error = error::Error;
387
388    fn try_from(pb: PbCreateViewTask) -> Result<Self> {
389        let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
390
391        Ok(CreateViewTask {
392            create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
393                err_msg: "expected create view",
394            })?,
395            view_info,
396        })
397    }
398}
399
400impl TryFrom<CreateViewTask> for PbCreateViewTask {
401    type Error = error::Error;
402
403    fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
404        Ok(PbCreateViewTask {
405            create_view: Some(task.create_view),
406            view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
407        })
408    }
409}
410
411impl Serialize for CreateViewTask {
412    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
413    where
414        S: serde::Serializer,
415    {
416        let view_info = serde_json::to_vec(&self.view_info)
417            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
418
419        let pb = PbCreateViewTask {
420            create_view: Some(self.create_view.clone()),
421            view_info,
422        };
423        let buf = pb.encode_to_vec();
424        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
425        serializer.serialize_str(&encoded)
426    }
427}
428
429impl<'de> Deserialize<'de> for CreateViewTask {
430    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
431    where
432        D: serde::Deserializer<'de>,
433    {
434        let encoded = String::deserialize(deserializer)?;
435        let buf = general_purpose::STANDARD_NO_PAD
436            .decode(encoded)
437            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
438        let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
439            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
440
441        let expr = CreateViewTask::try_from(expr)
442            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
443
444        Ok(expr)
445    }
446}
447
448/// A `DROP VIEW` task.
449#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
450pub struct DropViewTask {
451    pub catalog: String,
452    pub schema: String,
453    pub view: String,
454    pub view_id: TableId,
455    pub drop_if_exists: bool,
456}
457
458impl DropViewTask {
459    /// Returns the [`TableReference`] of view.
460    pub fn table_ref(&self) -> TableReference {
461        TableReference {
462            catalog: &self.catalog,
463            schema: &self.schema,
464            table: &self.view,
465        }
466    }
467}
468
469impl TryFrom<PbDropViewTask> for DropViewTask {
470    type Error = error::Error;
471
472    fn try_from(pb: PbDropViewTask) -> Result<Self> {
473        let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
474            err_msg: "expected drop view",
475        })?;
476
477        Ok(DropViewTask {
478            catalog: expr.catalog_name,
479            schema: expr.schema_name,
480            view: expr.view_name,
481            view_id: expr
482                .view_id
483                .context(error::InvalidProtoMsgSnafu {
484                    err_msg: "expected view_id",
485                })?
486                .id,
487            drop_if_exists: expr.drop_if_exists,
488        })
489    }
490}
491
492impl From<DropViewTask> for PbDropViewTask {
493    fn from(task: DropViewTask) -> Self {
494        PbDropViewTask {
495            drop_view: Some(DropViewExpr {
496                catalog_name: task.catalog,
497                schema_name: task.schema,
498                view_name: task.view,
499                view_id: Some(api::v1::TableId { id: task.view_id }),
500                drop_if_exists: task.drop_if_exists,
501            }),
502        }
503    }
504}
505
506#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
507pub struct DropTableTask {
508    pub catalog: String,
509    pub schema: String,
510    pub table: String,
511    pub table_id: TableId,
512    #[serde(default)]
513    pub drop_if_exists: bool,
514}
515
516impl DropTableTask {
517    pub fn table_ref(&self) -> TableReference {
518        TableReference {
519            catalog: &self.catalog,
520            schema: &self.schema,
521            table: &self.table,
522        }
523    }
524
525    pub fn table_name(&self) -> TableName {
526        TableName {
527            catalog_name: self.catalog.to_string(),
528            schema_name: self.schema.to_string(),
529            table_name: self.table.to_string(),
530        }
531    }
532}
533
534impl TryFrom<PbDropTableTask> for DropTableTask {
535    type Error = error::Error;
536
537    fn try_from(pb: PbDropTableTask) -> Result<Self> {
538        let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
539            err_msg: "expected drop table",
540        })?;
541
542        Ok(Self {
543            catalog: drop_table.catalog_name,
544            schema: drop_table.schema_name,
545            table: drop_table.table_name,
546            table_id: drop_table
547                .table_id
548                .context(error::InvalidProtoMsgSnafu {
549                    err_msg: "expected table_id",
550                })?
551                .id,
552            drop_if_exists: drop_table.drop_if_exists,
553        })
554    }
555}
556
557impl From<DropTableTask> for PbDropTableTask {
558    fn from(task: DropTableTask) -> Self {
559        PbDropTableTask {
560            drop_table: Some(DropTableExpr {
561                catalog_name: task.catalog,
562                schema_name: task.schema,
563                table_name: task.table,
564                table_id: Some(api::v1::TableId { id: task.table_id }),
565                drop_if_exists: task.drop_if_exists,
566            }),
567        }
568    }
569}
570
571#[derive(Debug, PartialEq, Clone)]
572pub struct CreateTableTask {
573    pub create_table: CreateTableExpr,
574    pub partitions: Vec<Partition>,
575    pub table_info: RawTableInfo,
576}
577
578impl TryFrom<PbCreateTableTask> for CreateTableTask {
579    type Error = error::Error;
580
581    fn try_from(pb: PbCreateTableTask) -> Result<Self> {
582        let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
583
584        Ok(CreateTableTask::new(
585            pb.create_table.context(error::InvalidProtoMsgSnafu {
586                err_msg: "expected create table",
587            })?,
588            pb.partitions,
589            table_info,
590        ))
591    }
592}
593
594impl TryFrom<CreateTableTask> for PbCreateTableTask {
595    type Error = error::Error;
596
597    fn try_from(task: CreateTableTask) -> Result<Self> {
598        Ok(PbCreateTableTask {
599            table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
600            create_table: Some(task.create_table),
601            partitions: task.partitions,
602        })
603    }
604}
605
606impl CreateTableTask {
607    pub fn new(
608        expr: CreateTableExpr,
609        partitions: Vec<Partition>,
610        table_info: RawTableInfo,
611    ) -> CreateTableTask {
612        CreateTableTask {
613            create_table: expr,
614            partitions,
615            table_info,
616        }
617    }
618
619    pub fn table_name(&self) -> TableName {
620        let table = &self.create_table;
621
622        TableName {
623            catalog_name: table.catalog_name.to_string(),
624            schema_name: table.schema_name.to_string(),
625            table_name: table.table_name.to_string(),
626        }
627    }
628
629    pub fn table_ref(&self) -> TableReference {
630        let table = &self.create_table;
631
632        TableReference {
633            catalog: &table.catalog_name,
634            schema: &table.schema_name,
635            table: &table.table_name,
636        }
637    }
638
639    /// Sets the `table_info`'s table_id.
640    pub fn set_table_id(&mut self, table_id: TableId) {
641        self.table_info.ident.table_id = table_id;
642    }
643
644    /// Sort the columns in [CreateTableExpr] and [RawTableInfo].
645    ///
646    /// This function won't do any check or verification. Caller should
647    /// ensure this task is valid.
648    pub fn sort_columns(&mut self) {
649        // sort create table expr
650        // sort column_defs by name
651        self.create_table
652            .column_defs
653            .sort_unstable_by(|a, b| a.name.cmp(&b.name));
654
655        self.table_info.sort_columns();
656    }
657}
658
659impl Serialize for CreateTableTask {
660    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
661    where
662        S: serde::Serializer,
663    {
664        let table_info = serde_json::to_vec(&self.table_info)
665            .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
666
667        let pb = PbCreateTableTask {
668            create_table: Some(self.create_table.clone()),
669            partitions: self.partitions.clone(),
670            table_info,
671        };
672        let buf = pb.encode_to_vec();
673        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
674        serializer.serialize_str(&encoded)
675    }
676}
677
678impl<'de> Deserialize<'de> for CreateTableTask {
679    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
680    where
681        D: serde::Deserializer<'de>,
682    {
683        let encoded = String::deserialize(deserializer)?;
684        let buf = general_purpose::STANDARD_NO_PAD
685            .decode(encoded)
686            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
687        let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
688            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
689
690        let expr = CreateTableTask::try_from(expr)
691            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
692
693        Ok(expr)
694    }
695}
696
697#[derive(Debug, PartialEq, Clone)]
698pub struct AlterTableTask {
699    // TODO(CookiePieWw): Replace proto struct with user-defined struct
700    pub alter_table: AlterTableExpr,
701}
702
703impl AlterTableTask {
704    pub fn validate(&self) -> Result<()> {
705        self.alter_table
706            .kind
707            .as_ref()
708            .context(error::UnexpectedSnafu {
709                err_msg: "'kind' is absent",
710            })?;
711        Ok(())
712    }
713
714    pub fn table_ref(&self) -> TableReference {
715        TableReference {
716            catalog: &self.alter_table.catalog_name,
717            schema: &self.alter_table.schema_name,
718            table: &self.alter_table.table_name,
719        }
720    }
721
722    pub fn table_name(&self) -> TableName {
723        let table = &self.alter_table;
724
725        TableName {
726            catalog_name: table.catalog_name.to_string(),
727            schema_name: table.schema_name.to_string(),
728            table_name: table.table_name.to_string(),
729        }
730    }
731}
732
733impl TryFrom<PbAlterTableTask> for AlterTableTask {
734    type Error = error::Error;
735
736    fn try_from(pb: PbAlterTableTask) -> Result<Self> {
737        let alter_table = pb.alter_table.context(error::InvalidProtoMsgSnafu {
738            err_msg: "expected alter_table",
739        })?;
740
741        Ok(AlterTableTask { alter_table })
742    }
743}
744
745impl TryFrom<AlterTableTask> for PbAlterTableTask {
746    type Error = error::Error;
747
748    fn try_from(task: AlterTableTask) -> Result<Self> {
749        Ok(PbAlterTableTask {
750            alter_table: Some(task.alter_table),
751        })
752    }
753}
754
755impl Serialize for AlterTableTask {
756    fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
757    where
758        S: serde::Serializer,
759    {
760        let pb = PbAlterTableTask {
761            alter_table: Some(self.alter_table.clone()),
762        };
763        let buf = pb.encode_to_vec();
764        let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
765        serializer.serialize_str(&encoded)
766    }
767}
768
769impl<'de> Deserialize<'de> for AlterTableTask {
770    fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
771    where
772        D: serde::Deserializer<'de>,
773    {
774        let encoded = String::deserialize(deserializer)?;
775        let buf = general_purpose::STANDARD_NO_PAD
776            .decode(encoded)
777            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
778        let expr: PbAlterTableTask = PbAlterTableTask::decode(&*buf)
779            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
780
781        let expr = AlterTableTask::try_from(expr)
782            .map_err(|err| serde::de::Error::custom(err.to_string()))?;
783
784        Ok(expr)
785    }
786}
787
788#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
789pub struct TruncateTableTask {
790    pub catalog: String,
791    pub schema: String,
792    pub table: String,
793    pub table_id: TableId,
794}
795
796impl TruncateTableTask {
797    pub fn table_ref(&self) -> TableReference {
798        TableReference {
799            catalog: &self.catalog,
800            schema: &self.schema,
801            table: &self.table,
802        }
803    }
804
805    pub fn table_name(&self) -> TableName {
806        TableName {
807            catalog_name: self.catalog.to_string(),
808            schema_name: self.schema.to_string(),
809            table_name: self.table.to_string(),
810        }
811    }
812}
813
814impl TryFrom<PbTruncateTableTask> for TruncateTableTask {
815    type Error = error::Error;
816
817    fn try_from(pb: PbTruncateTableTask) -> Result<Self> {
818        let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu {
819            err_msg: "expected truncate table",
820        })?;
821
822        Ok(Self {
823            catalog: truncate_table.catalog_name,
824            schema: truncate_table.schema_name,
825            table: truncate_table.table_name,
826            table_id: truncate_table
827                .table_id
828                .context(error::InvalidProtoMsgSnafu {
829                    err_msg: "expected table_id",
830                })?
831                .id,
832        })
833    }
834}
835
836impl TryFrom<TruncateTableTask> for PbTruncateTableTask {
837    type Error = error::Error;
838
839    fn try_from(task: TruncateTableTask) -> Result<Self> {
840        Ok(PbTruncateTableTask {
841            truncate_table: Some(TruncateTableExpr {
842                catalog_name: task.catalog,
843                schema_name: task.schema,
844                table_name: task.table,
845                table_id: Some(api::v1::TableId { id: task.table_id }),
846            }),
847        })
848    }
849}
850
851#[serde_as]
852#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
853pub struct CreateDatabaseTask {
854    pub catalog: String,
855    pub schema: String,
856    pub create_if_not_exists: bool,
857    #[serde_as(deserialize_as = "DefaultOnNull")]
858    pub options: HashMap<String, String>,
859}
860
861impl TryFrom<PbCreateDatabaseTask> for CreateDatabaseTask {
862    type Error = error::Error;
863
864    fn try_from(pb: PbCreateDatabaseTask) -> Result<Self> {
865        let CreateDatabaseExpr {
866            catalog_name,
867            schema_name,
868            create_if_not_exists,
869            options,
870        } = pb.create_database.context(error::InvalidProtoMsgSnafu {
871            err_msg: "expected create database",
872        })?;
873
874        Ok(CreateDatabaseTask {
875            catalog: catalog_name,
876            schema: schema_name,
877            create_if_not_exists,
878            options,
879        })
880    }
881}
882
883impl TryFrom<CreateDatabaseTask> for PbCreateDatabaseTask {
884    type Error = error::Error;
885
886    fn try_from(
887        CreateDatabaseTask {
888            catalog,
889            schema,
890            create_if_not_exists,
891            options,
892        }: CreateDatabaseTask,
893    ) -> Result<Self> {
894        Ok(PbCreateDatabaseTask {
895            create_database: Some(CreateDatabaseExpr {
896                catalog_name: catalog,
897                schema_name: schema,
898                create_if_not_exists,
899                options,
900            }),
901        })
902    }
903}
904
905#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
906pub struct DropDatabaseTask {
907    pub catalog: String,
908    pub schema: String,
909    pub drop_if_exists: bool,
910}
911
912impl TryFrom<PbDropDatabaseTask> for DropDatabaseTask {
913    type Error = error::Error;
914
915    fn try_from(pb: PbDropDatabaseTask) -> Result<Self> {
916        let DropDatabaseExpr {
917            catalog_name,
918            schema_name,
919            drop_if_exists,
920        } = pb.drop_database.context(error::InvalidProtoMsgSnafu {
921            err_msg: "expected drop database",
922        })?;
923
924        Ok(DropDatabaseTask {
925            catalog: catalog_name,
926            schema: schema_name,
927            drop_if_exists,
928        })
929    }
930}
931
932impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
933    type Error = error::Error;
934
935    fn try_from(
936        DropDatabaseTask {
937            catalog,
938            schema,
939            drop_if_exists,
940        }: DropDatabaseTask,
941    ) -> Result<Self> {
942        Ok(PbDropDatabaseTask {
943            drop_database: Some(DropDatabaseExpr {
944                catalog_name: catalog,
945                schema_name: schema,
946                drop_if_exists,
947            }),
948        })
949    }
950}
951
952#[derive(Debug, PartialEq, Clone)]
953pub struct AlterDatabaseTask {
954    pub alter_expr: AlterDatabaseExpr,
955}
956
957impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
958    type Error = error::Error;
959
960    fn try_from(task: AlterDatabaseTask) -> Result<Self> {
961        Ok(PbAlterDatabaseTask {
962            task: Some(task.alter_expr),
963        })
964    }
965}
966
967impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
968    type Error = error::Error;
969
970    fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
971        let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
972            err_msg: "expected alter database",
973        })?;
974
975        Ok(AlterDatabaseTask { alter_expr })
976    }
977}
978
979impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
980    type Error = error::Error;
981
982    fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
983        match pb {
984            PbAlterDatabaseKind::SetDatabaseOptions(options) => {
985                Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
986                    options
987                        .set_database_options
988                        .into_iter()
989                        .map(SetDatabaseOption::try_from)
990                        .collect::<Result<Vec<_>>>()?,
991                )))
992            }
993            PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
994                AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
995                    options
996                        .keys
997                        .iter()
998                        .map(|key| UnsetDatabaseOption::try_from(key.as_str()))
999                        .collect::<Result<Vec<_>>>()?,
1000                )),
1001            ),
1002        }
1003    }
1004}
1005
1006const TTL_KEY: &str = "ttl";
1007
1008impl TryFrom<PbOption> for SetDatabaseOption {
1009    type Error = error::Error;
1010
1011    fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
1012        match key.to_ascii_lowercase().as_str() {
1013            TTL_KEY => {
1014                let ttl = DatabaseTimeToLive::from_humantime_or_str(&value)
1015                    .map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?;
1016
1017                Ok(SetDatabaseOption::Ttl(ttl))
1018            }
1019            _ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
1020        }
1021    }
1022}
1023
1024#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1025pub enum SetDatabaseOption {
1026    Ttl(DatabaseTimeToLive),
1027}
1028
1029#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1030pub enum UnsetDatabaseOption {
1031    Ttl,
1032}
1033
1034impl TryFrom<&str> for UnsetDatabaseOption {
1035    type Error = error::Error;
1036
1037    fn try_from(key: &str) -> Result<Self> {
1038        match key.to_ascii_lowercase().as_str() {
1039            TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
1040            _ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
1041        }
1042    }
1043}
1044
1045#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1046pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
1047
1048#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1049pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
1050
1051#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
1052pub enum AlterDatabaseKind {
1053    SetDatabaseOptions(SetDatabaseOptions),
1054    UnsetDatabaseOptions(UnsetDatabaseOptions),
1055}
1056
1057impl AlterDatabaseTask {
1058    pub fn catalog(&self) -> &str {
1059        &self.alter_expr.catalog_name
1060    }
1061
1062    pub fn schema(&self) -> &str {
1063        &self.alter_expr.catalog_name
1064    }
1065}
1066
1067/// Create flow
1068#[derive(Debug, Clone, Serialize, Deserialize)]
1069pub struct CreateFlowTask {
1070    pub catalog_name: String,
1071    pub flow_name: String,
1072    pub source_table_names: Vec<TableName>,
1073    pub sink_table_name: TableName,
1074    pub or_replace: bool,
1075    pub create_if_not_exists: bool,
1076    /// Duration in seconds. Data older than this duration will not be used.
1077    pub expire_after: Option<i64>,
1078    pub comment: String,
1079    pub sql: String,
1080    pub flow_options: HashMap<String, String>,
1081}
1082
1083impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
1084    type Error = error::Error;
1085
1086    fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
1087        let CreateFlowExpr {
1088            catalog_name,
1089            flow_name,
1090            source_table_names,
1091            sink_table_name,
1092            or_replace,
1093            create_if_not_exists,
1094            expire_after,
1095            comment,
1096            sql,
1097            flow_options,
1098        } = pb.create_flow.context(error::InvalidProtoMsgSnafu {
1099            err_msg: "expected create_flow",
1100        })?;
1101
1102        Ok(CreateFlowTask {
1103            catalog_name,
1104            flow_name,
1105            source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1106            sink_table_name: sink_table_name
1107                .context(error::InvalidProtoMsgSnafu {
1108                    err_msg: "expected sink_table_name",
1109                })?
1110                .into(),
1111            or_replace,
1112            create_if_not_exists,
1113            expire_after: expire_after.map(|e| e.value),
1114            comment,
1115            sql,
1116            flow_options,
1117        })
1118    }
1119}
1120
1121impl From<CreateFlowTask> for PbCreateFlowTask {
1122    fn from(
1123        CreateFlowTask {
1124            catalog_name,
1125            flow_name,
1126            source_table_names,
1127            sink_table_name,
1128            or_replace,
1129            create_if_not_exists,
1130            expire_after,
1131            comment,
1132            sql,
1133            flow_options,
1134        }: CreateFlowTask,
1135    ) -> Self {
1136        PbCreateFlowTask {
1137            create_flow: Some(CreateFlowExpr {
1138                catalog_name,
1139                flow_name,
1140                source_table_names: source_table_names.into_iter().map(Into::into).collect(),
1141                sink_table_name: Some(sink_table_name.into()),
1142                or_replace,
1143                create_if_not_exists,
1144                expire_after: expire_after.map(|value| ExpireAfter { value }),
1145                comment,
1146                sql,
1147                flow_options,
1148            }),
1149        }
1150    }
1151}
1152
1153/// Drop flow
1154#[derive(Debug, Clone, Serialize, Deserialize)]
1155pub struct DropFlowTask {
1156    pub catalog_name: String,
1157    pub flow_name: String,
1158    pub flow_id: FlowId,
1159    pub drop_if_exists: bool,
1160}
1161
1162impl TryFrom<PbDropFlowTask> for DropFlowTask {
1163    type Error = error::Error;
1164
1165    fn try_from(pb: PbDropFlowTask) -> Result<Self> {
1166        let DropFlowExpr {
1167            catalog_name,
1168            flow_name,
1169            flow_id,
1170            drop_if_exists,
1171        } = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
1172            err_msg: "expected drop_flow",
1173        })?;
1174        let flow_id = flow_id
1175            .context(error::InvalidProtoMsgSnafu {
1176                err_msg: "expected flow_id",
1177            })?
1178            .id;
1179        Ok(DropFlowTask {
1180            catalog_name,
1181            flow_name,
1182            flow_id,
1183            drop_if_exists,
1184        })
1185    }
1186}
1187
1188impl From<DropFlowTask> for PbDropFlowTask {
1189    fn from(
1190        DropFlowTask {
1191            catalog_name,
1192            flow_name,
1193            flow_id,
1194            drop_if_exists,
1195        }: DropFlowTask,
1196    ) -> Self {
1197        PbDropFlowTask {
1198            drop_flow: Some(DropFlowExpr {
1199                catalog_name,
1200                flow_name,
1201                flow_id: Some(api::v1::FlowId { id: flow_id }),
1202                drop_if_exists,
1203            }),
1204        }
1205    }
1206}
1207
1208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1209pub struct QueryContext {
1210    current_catalog: String,
1211    current_schema: String,
1212    timezone: String,
1213    extensions: HashMap<String, String>,
1214    channel: u8,
1215}
1216
1217impl From<QueryContextRef> for QueryContext {
1218    fn from(query_context: QueryContextRef) -> Self {
1219        QueryContext {
1220            current_catalog: query_context.current_catalog().to_string(),
1221            current_schema: query_context.current_schema().to_string(),
1222            timezone: query_context.timezone().to_string(),
1223            extensions: query_context.extensions(),
1224            channel: query_context.channel() as u8,
1225        }
1226    }
1227}
1228
1229impl TryFrom<QueryContext> for session::context::QueryContext {
1230    type Error = error::Error;
1231    fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
1232        Ok(QueryContextBuilder::default()
1233            .current_catalog(value.current_catalog)
1234            .current_schema(value.current_schema)
1235            .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
1236            .extensions(value.extensions)
1237            .channel((value.channel as u32).into())
1238            .build())
1239    }
1240}
1241
1242impl From<QueryContext> for PbQueryContext {
1243    fn from(
1244        QueryContext {
1245            current_catalog,
1246            current_schema,
1247            timezone,
1248            extensions,
1249            channel,
1250        }: QueryContext,
1251    ) -> Self {
1252        PbQueryContext {
1253            current_catalog,
1254            current_schema,
1255            timezone,
1256            extensions,
1257            channel: channel as u32,
1258            snapshot_seqs: None,
1259            explain: None,
1260        }
1261    }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266    use std::sync::Arc;
1267
1268    use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
1269    use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
1270    use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
1271    use store_api::storage::ConcreteDataType;
1272    use table::metadata::{RawTableInfo, RawTableMeta, TableType};
1273    use table::test_util::table_info::test_table_info;
1274
1275    use super::{AlterTableTask, CreateTableTask};
1276
1277    #[test]
1278    fn test_basic_ser_de_create_table_task() {
1279        let schema = SchemaBuilder::default().build().unwrap();
1280        let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
1281        let task = CreateTableTask::new(
1282            CreateTableExpr::default(),
1283            Vec::new(),
1284            RawTableInfo::from(table_info),
1285        );
1286
1287        let output = serde_json::to_vec(&task).unwrap();
1288
1289        let de = serde_json::from_slice(&output).unwrap();
1290        assert_eq!(task, de);
1291    }
1292
1293    #[test]
1294    fn test_basic_ser_de_alter_table_task() {
1295        let task = AlterTableTask {
1296            alter_table: AlterTableExpr::default(),
1297        };
1298
1299        let output = serde_json::to_vec(&task).unwrap();
1300
1301        let de = serde_json::from_slice(&output).unwrap();
1302        assert_eq!(task, de);
1303    }
1304
1305    #[test]
1306    fn test_sort_columns() {
1307        // construct RawSchema
1308        let raw_schema = RawSchema {
1309            column_schemas: vec![
1310                ColumnSchema::new(
1311                    "column3".to_string(),
1312                    ConcreteDataType::string_datatype(),
1313                    true,
1314                ),
1315                ColumnSchema::new(
1316                    "column1".to_string(),
1317                    ConcreteDataType::timestamp_millisecond_datatype(),
1318                    false,
1319                )
1320                .with_time_index(true),
1321                ColumnSchema::new(
1322                    "column2".to_string(),
1323                    ConcreteDataType::float64_datatype(),
1324                    true,
1325                ),
1326            ],
1327            timestamp_index: Some(1),
1328            version: 0,
1329        };
1330
1331        // construct RawTableMeta
1332        let raw_table_meta = RawTableMeta {
1333            schema: raw_schema,
1334            primary_key_indices: vec![0],
1335            value_indices: vec![2],
1336            engine: METRIC_ENGINE_NAME.to_string(),
1337            next_column_id: 0,
1338            region_numbers: vec![0],
1339            options: Default::default(),
1340            created_on: Default::default(),
1341            partition_key_indices: Default::default(),
1342        };
1343
1344        // construct RawTableInfo
1345        let raw_table_info = RawTableInfo {
1346            ident: Default::default(),
1347            meta: raw_table_meta,
1348            name: Default::default(),
1349            desc: Default::default(),
1350            catalog_name: Default::default(),
1351            schema_name: Default::default(),
1352            table_type: TableType::Base,
1353        };
1354
1355        // construct create table expr
1356        let create_table_expr = CreateTableExpr {
1357            column_defs: vec![
1358                ColumnDef {
1359                    name: "column3".to_string(),
1360                    semantic_type: SemanticType::Tag as i32,
1361                    ..Default::default()
1362                },
1363                ColumnDef {
1364                    name: "column1".to_string(),
1365                    semantic_type: SemanticType::Timestamp as i32,
1366                    ..Default::default()
1367                },
1368                ColumnDef {
1369                    name: "column2".to_string(),
1370                    semantic_type: SemanticType::Field as i32,
1371                    ..Default::default()
1372                },
1373            ],
1374            primary_keys: vec!["column3".to_string()],
1375            ..Default::default()
1376        };
1377
1378        let mut create_table_task =
1379            CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info);
1380
1381        // Call the sort_columns method
1382        create_table_task.sort_columns();
1383
1384        // Assert that the columns are sorted correctly
1385        assert_eq!(
1386            create_table_task.create_table.column_defs[0].name,
1387            "column1".to_string()
1388        );
1389        assert_eq!(
1390            create_table_task.create_table.column_defs[1].name,
1391            "column2".to_string()
1392        );
1393        assert_eq!(
1394            create_table_task.create_table.column_defs[2].name,
1395            "column3".to_string()
1396        );
1397
1398        // Assert that the table_info is updated correctly
1399        assert_eq!(
1400            create_table_task.table_info.meta.schema.timestamp_index,
1401            Some(0)
1402        );
1403        assert_eq!(
1404            create_table_task.table_info.meta.primary_key_indices,
1405            vec![2]
1406        );
1407        assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
1408    }
1409}