common_meta/ddl/
create_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use async_trait::async_trait;
18use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
19use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
20use serde::{Deserialize, Serialize};
21use serde_with::{serde_as, DefaultOnNull};
22use snafu::{ensure, ResultExt};
23use strum::AsRefStr;
24
25use crate::ddl::utils::handle_retry_error;
26use crate::ddl::DdlContext;
27use crate::error::{self, Result};
28use crate::key::schema_name::{SchemaNameKey, SchemaNameValue};
29use crate::lock_key::{CatalogLock, SchemaLock};
30
31pub struct CreateDatabaseProcedure {
32    pub context: DdlContext,
33    pub data: CreateDatabaseData,
34}
35
36impl CreateDatabaseProcedure {
37    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateDatabase";
38
39    pub fn new(
40        catalog: String,
41        schema: String,
42        create_if_not_exists: bool,
43        options: HashMap<String, String>,
44        context: DdlContext,
45    ) -> Self {
46        Self {
47            context,
48            data: CreateDatabaseData {
49                state: CreateDatabaseState::Prepare,
50                catalog,
51                schema,
52                create_if_not_exists,
53                options,
54            },
55        }
56    }
57
58    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
59        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
60
61        Ok(Self { context, data })
62    }
63
64    pub async fn on_prepare(&mut self) -> Result<Status> {
65        let exists = self
66            .context
67            .table_metadata_manager
68            .schema_manager()
69            .exists(SchemaNameKey::new(&self.data.catalog, &self.data.schema))
70            .await?;
71
72        if exists && self.data.create_if_not_exists {
73            return Ok(Status::done());
74        }
75
76        ensure!(
77            !exists,
78            error::SchemaAlreadyExistsSnafu {
79                catalog: &self.data.catalog,
80                schema: &self.data.schema,
81            }
82        );
83
84        self.data.state = CreateDatabaseState::CreateMetadata;
85        Ok(Status::executing(true))
86    }
87
88    pub async fn on_create_metadata(&mut self) -> Result<Status> {
89        let value: SchemaNameValue = (&self.data.options).try_into()?;
90
91        self.context
92            .table_metadata_manager
93            .schema_manager()
94            .create(
95                SchemaNameKey::new(&self.data.catalog, &self.data.schema),
96                Some(value),
97                self.data.create_if_not_exists,
98            )
99            .await?;
100
101        Ok(Status::done())
102    }
103}
104
105#[async_trait]
106impl Procedure for CreateDatabaseProcedure {
107    fn type_name(&self) -> &str {
108        Self::TYPE_NAME
109    }
110
111    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
112        let state = &self.data.state;
113
114        match state {
115            CreateDatabaseState::Prepare => self.on_prepare().await,
116            CreateDatabaseState::CreateMetadata => self.on_create_metadata().await,
117        }
118        .map_err(handle_retry_error)
119    }
120
121    fn dump(&self) -> ProcedureResult<String> {
122        serde_json::to_string(&self.data).context(ToJsonSnafu)
123    }
124
125    fn lock_key(&self) -> LockKey {
126        let lock_key = vec![
127            CatalogLock::Read(&self.data.catalog).into(),
128            SchemaLock::write(&self.data.catalog, &self.data.schema).into(),
129        ];
130
131        LockKey::new(lock_key)
132    }
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
136pub enum CreateDatabaseState {
137    Prepare,
138    CreateMetadata,
139}
140
141#[serde_as]
142#[derive(Debug, Serialize, Deserialize)]
143pub struct CreateDatabaseData {
144    pub state: CreateDatabaseState,
145    pub catalog: String,
146    pub schema: String,
147    pub create_if_not_exists: bool,
148    #[serde_as(deserialize_as = "DefaultOnNull")]
149    pub options: HashMap<String, String>,
150}