common_meta/ddl/
create_database.rs1use std::collections::HashMap;
16
17use async_trait::async_trait;
18use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
19use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
20use serde::{Deserialize, Serialize};
21use serde_with::{serde_as, DefaultOnNull};
22use snafu::{ensure, ResultExt};
23use strum::AsRefStr;
24
25use crate::ddl::utils::handle_retry_error;
26use crate::ddl::DdlContext;
27use crate::error::{self, Result};
28use crate::key::schema_name::{SchemaNameKey, SchemaNameValue};
29use crate::lock_key::{CatalogLock, SchemaLock};
30
31pub struct CreateDatabaseProcedure {
32 pub context: DdlContext,
33 pub data: CreateDatabaseData,
34}
35
36impl CreateDatabaseProcedure {
37 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateDatabase";
38
39 pub fn new(
40 catalog: String,
41 schema: String,
42 create_if_not_exists: bool,
43 options: HashMap<String, String>,
44 context: DdlContext,
45 ) -> Self {
46 Self {
47 context,
48 data: CreateDatabaseData {
49 state: CreateDatabaseState::Prepare,
50 catalog,
51 schema,
52 create_if_not_exists,
53 options,
54 },
55 }
56 }
57
58 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
59 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
60
61 Ok(Self { context, data })
62 }
63
64 pub async fn on_prepare(&mut self) -> Result<Status> {
65 let exists = self
66 .context
67 .table_metadata_manager
68 .schema_manager()
69 .exists(SchemaNameKey::new(&self.data.catalog, &self.data.schema))
70 .await?;
71
72 if exists && self.data.create_if_not_exists {
73 return Ok(Status::done());
74 }
75
76 ensure!(
77 !exists,
78 error::SchemaAlreadyExistsSnafu {
79 catalog: &self.data.catalog,
80 schema: &self.data.schema,
81 }
82 );
83
84 self.data.state = CreateDatabaseState::CreateMetadata;
85 Ok(Status::executing(true))
86 }
87
88 pub async fn on_create_metadata(&mut self) -> Result<Status> {
89 let value: SchemaNameValue = (&self.data.options).try_into()?;
90
91 self.context
92 .table_metadata_manager
93 .schema_manager()
94 .create(
95 SchemaNameKey::new(&self.data.catalog, &self.data.schema),
96 Some(value),
97 self.data.create_if_not_exists,
98 )
99 .await?;
100
101 Ok(Status::done())
102 }
103}
104
105#[async_trait]
106impl Procedure for CreateDatabaseProcedure {
107 fn type_name(&self) -> &str {
108 Self::TYPE_NAME
109 }
110
111 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
112 let state = &self.data.state;
113
114 match state {
115 CreateDatabaseState::Prepare => self.on_prepare().await,
116 CreateDatabaseState::CreateMetadata => self.on_create_metadata().await,
117 }
118 .map_err(handle_retry_error)
119 }
120
121 fn dump(&self) -> ProcedureResult<String> {
122 serde_json::to_string(&self.data).context(ToJsonSnafu)
123 }
124
125 fn lock_key(&self) -> LockKey {
126 let lock_key = vec![
127 CatalogLock::Read(&self.data.catalog).into(),
128 SchemaLock::write(&self.data.catalog, &self.data.schema).into(),
129 ];
130
131 LockKey::new(lock_key)
132 }
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
136pub enum CreateDatabaseState {
137 Prepare,
138 CreateMetadata,
139}
140
141#[serde_as]
142#[derive(Debug, Serialize, Deserialize)]
143pub struct CreateDatabaseData {
144 pub state: CreateDatabaseState,
145 pub catalog: String,
146 pub schema: String,
147 pub create_if_not_exists: bool,
148 #[serde_as(deserialize_as = "DefaultOnNull")]
149 pub options: HashMap<String, String>,
150}