common_meta/ddl/
alter_database.rs1use async_trait::async_trait;
16use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
17use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
18use common_telemetry::tracing::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, ResultExt};
21use strum::AsRefStr;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::utils::handle_retry_error;
25use crate::ddl::DdlContext;
26use crate::error::{Result, SchemaNotFoundSnafu};
27use crate::instruction::CacheIdent;
28use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
29use crate::key::DeserializedValueWithBytes;
30use crate::lock_key::{CatalogLock, SchemaLock};
31use crate::rpc::ddl::UnsetDatabaseOption::{self};
32use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
33
34pub struct AlterDatabaseProcedure {
35 pub context: DdlContext,
36 pub data: AlterDatabaseData,
37}
38
39fn build_new_schema_value(
40 mut value: SchemaNameValue,
41 alter_kind: &AlterDatabaseKind,
42) -> Result<SchemaNameValue> {
43 match alter_kind {
44 AlterDatabaseKind::SetDatabaseOptions(options) => {
45 for option in options.0.iter() {
46 match option {
47 SetDatabaseOption::Ttl(ttl) => {
48 value.ttl = Some(*ttl);
49 }
50 }
51 }
52 }
53 AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
54 for key in keys.0.iter() {
55 match key {
56 UnsetDatabaseOption::Ttl => value.ttl = None,
57 }
58 }
59 }
60 }
61 Ok(value)
62}
63
64impl AlterDatabaseProcedure {
65 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
66
67 pub fn new(task: AlterDatabaseTask, context: DdlContext) -> Result<Self> {
68 Ok(Self {
69 context,
70 data: AlterDatabaseData::new(task)?,
71 })
72 }
73
74 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
75 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
76
77 Ok(Self { context, data })
78 }
79
80 pub async fn on_prepare(&mut self) -> Result<Status> {
81 let value = self
82 .context
83 .table_metadata_manager
84 .schema_manager()
85 .get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
86 .await?;
87
88 ensure!(
89 value.is_some(),
90 SchemaNotFoundSnafu {
91 table_schema: self.data.schema(),
92 }
93 );
94
95 self.data.schema_value = value;
96 self.data.state = AlterDatabaseState::UpdateMetadata;
97
98 Ok(Status::executing(true))
99 }
100
101 pub async fn on_update_metadata(&mut self) -> Result<Status> {
102 let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
103
104 let current_schema_value = self.data.schema_value.as_ref().unwrap();
106
107 let new_schema_value = build_new_schema_value(
108 current_schema_value.get_inner_ref().clone(),
109 &self.data.kind,
110 )?;
111
112 self.context
113 .table_metadata_manager
114 .schema_manager()
115 .update(schema_name, current_schema_value, &new_schema_value)
116 .await?;
117
118 info!("Updated database metadata for schema {schema_name}");
119 self.data.state = AlterDatabaseState::InvalidateSchemaCache;
120 Ok(Status::executing(true))
121 }
122
123 pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
124 let cache_invalidator = &self.context.cache_invalidator;
125 cache_invalidator
126 .invalidate(
127 &Context::default(),
128 &[CacheIdent::SchemaName(SchemaName {
129 catalog_name: self.data.catalog().to_string(),
130 schema_name: self.data.schema().to_string(),
131 })],
132 )
133 .await?;
134
135 Ok(Status::done())
136 }
137}
138
139#[async_trait]
140impl Procedure for AlterDatabaseProcedure {
141 fn type_name(&self) -> &str {
142 Self::TYPE_NAME
143 }
144
145 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
146 match self.data.state {
147 AlterDatabaseState::Prepare => self.on_prepare().await,
148 AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
149 AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
150 }
151 .map_err(handle_retry_error)
152 }
153
154 fn dump(&self) -> ProcedureResult<String> {
155 serde_json::to_string(&self.data).context(ToJsonSnafu)
156 }
157
158 fn lock_key(&self) -> LockKey {
159 let catalog = self.data.catalog();
160 let schema = self.data.schema();
161
162 let lock_key = vec![
163 CatalogLock::Read(catalog).into(),
164 SchemaLock::write(catalog, schema).into(),
165 ];
166
167 LockKey::new(lock_key)
168 }
169}
170
171#[derive(Debug, Serialize, Deserialize, AsRefStr)]
172enum AlterDatabaseState {
173 Prepare,
174 UpdateMetadata,
175 InvalidateSchemaCache,
176}
177
178#[derive(Debug, Serialize, Deserialize)]
180pub struct AlterDatabaseData {
181 state: AlterDatabaseState,
182 kind: AlterDatabaseKind,
183 catalog_name: String,
184 schema_name: String,
185 schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
186}
187
188impl AlterDatabaseData {
189 pub fn new(task: AlterDatabaseTask) -> Result<Self> {
190 Ok(Self {
191 state: AlterDatabaseState::Prepare,
192 kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
193 catalog_name: task.alter_expr.catalog_name,
194 schema_name: task.alter_expr.schema_name,
195 schema_value: None,
196 })
197 }
198
199 pub fn catalog(&self) -> &str {
200 &self.catalog_name
201 }
202
203 pub fn schema(&self) -> &str {
204 &self.schema_name
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use std::time::Duration;
211
212 use crate::ddl::alter_database::build_new_schema_value;
213 use crate::key::schema_name::SchemaNameValue;
214 use crate::rpc::ddl::{
215 AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
216 UnsetDatabaseOptions,
217 };
218
219 #[test]
220 fn test_build_new_schema_value() {
221 let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
222 SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
223 ]));
224 let current_schema_value = SchemaNameValue::default();
225 let new_schema_value =
226 build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
227 assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));
228
229 let unset_ttl_alter_kind =
230 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
231 UnsetDatabaseOption::Ttl,
232 ]));
233 let new_schema_value =
234 build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
235 assert_eq!(new_schema_value.ttl, None);
236 }
237}