common_meta/ddl/
alter_database.rs1use async_trait::async_trait;
16use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
17use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
18use common_telemetry::tracing::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ResultExt, ensure};
21use strum::AsRefStr;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::DdlContext;
25use crate::ddl::utils::map_to_procedure_error;
26use crate::error::{Result, SchemaNotFoundSnafu};
27use crate::instruction::CacheIdent;
28use crate::key::DeserializedValueWithBytes;
29use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
30use crate::lock_key::{CatalogLock, SchemaLock};
31use crate::rpc::ddl::UnsetDatabaseOption::{self};
32use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
33
34pub struct AlterDatabaseProcedure {
35 pub context: DdlContext,
36 pub data: AlterDatabaseData,
37}
38
39fn build_new_schema_value(
40 mut value: SchemaNameValue,
41 alter_kind: &AlterDatabaseKind,
42) -> Result<SchemaNameValue> {
43 match alter_kind {
44 AlterDatabaseKind::SetDatabaseOptions(options) => {
45 for option in options.0.iter() {
46 match option {
47 SetDatabaseOption::Ttl(ttl) => {
48 value.ttl = Some(*ttl);
49 }
50 SetDatabaseOption::Other(key, val) => {
51 value.extra_options.insert(key.clone(), val.clone());
52 }
53 }
54 }
55 }
56 AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
57 for key in keys.0.iter() {
58 match key {
59 UnsetDatabaseOption::Ttl => value.ttl = None,
60 UnsetDatabaseOption::Other(key) => {
61 value.extra_options.remove(key);
62 }
63 }
64 }
65 }
66 }
67 Ok(value)
68}
69
70impl AlterDatabaseProcedure {
71 pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
72
73 pub fn new(task: AlterDatabaseTask, context: DdlContext) -> Result<Self> {
74 Ok(Self {
75 context,
76 data: AlterDatabaseData::new(task)?,
77 })
78 }
79
80 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82
83 Ok(Self { context, data })
84 }
85
86 pub async fn on_prepare(&mut self) -> Result<Status> {
87 let value = self
88 .context
89 .table_metadata_manager
90 .schema_manager()
91 .get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
92 .await?;
93
94 ensure!(
95 value.is_some(),
96 SchemaNotFoundSnafu {
97 table_schema: self.data.schema(),
98 }
99 );
100
101 self.data.schema_value = value;
102 self.data.state = AlterDatabaseState::UpdateMetadata;
103
104 Ok(Status::executing(true))
105 }
106
107 pub async fn on_update_metadata(&mut self) -> Result<Status> {
108 let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
109
110 let current_schema_value = self.data.schema_value.as_ref().unwrap();
112
113 let new_schema_value = build_new_schema_value(
114 current_schema_value.get_inner_ref().clone(),
115 &self.data.kind,
116 )?;
117
118 self.context
119 .table_metadata_manager
120 .schema_manager()
121 .update(schema_name, current_schema_value, &new_schema_value)
122 .await?;
123
124 info!("Updated database metadata for schema {schema_name}");
125 self.data.state = AlterDatabaseState::InvalidateSchemaCache;
126 Ok(Status::executing(true))
127 }
128
129 pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
130 let cache_invalidator = &self.context.cache_invalidator;
131 cache_invalidator
132 .invalidate(
133 &Context::default(),
134 &[CacheIdent::SchemaName(SchemaName {
135 catalog_name: self.data.catalog().to_string(),
136 schema_name: self.data.schema().to_string(),
137 })],
138 )
139 .await?;
140
141 Ok(Status::done())
142 }
143}
144
145#[async_trait]
146impl Procedure for AlterDatabaseProcedure {
147 fn type_name(&self) -> &str {
148 Self::TYPE_NAME
149 }
150
151 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
152 match self.data.state {
153 AlterDatabaseState::Prepare => self.on_prepare().await,
154 AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
155 AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
156 }
157 .map_err(map_to_procedure_error)
158 }
159
160 fn dump(&self) -> ProcedureResult<String> {
161 serde_json::to_string(&self.data).context(ToJsonSnafu)
162 }
163
164 fn lock_key(&self) -> LockKey {
165 let catalog = self.data.catalog();
166 let schema = self.data.schema();
167
168 let lock_key = vec![
169 CatalogLock::Read(catalog).into(),
170 SchemaLock::write(catalog, schema).into(),
171 ];
172
173 LockKey::new(lock_key)
174 }
175}
176
177#[derive(Debug, Serialize, Deserialize, AsRefStr)]
178enum AlterDatabaseState {
179 Prepare,
180 UpdateMetadata,
181 InvalidateSchemaCache,
182}
183
184#[derive(Debug, Serialize, Deserialize)]
186pub struct AlterDatabaseData {
187 state: AlterDatabaseState,
188 kind: AlterDatabaseKind,
189 catalog_name: String,
190 schema_name: String,
191 schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
192}
193
194impl AlterDatabaseData {
195 pub fn new(task: AlterDatabaseTask) -> Result<Self> {
196 Ok(Self {
197 state: AlterDatabaseState::Prepare,
198 kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
199 catalog_name: task.alter_expr.catalog_name,
200 schema_name: task.alter_expr.schema_name,
201 schema_value: None,
202 })
203 }
204
205 pub fn catalog(&self) -> &str {
206 &self.catalog_name
207 }
208
209 pub fn schema(&self) -> &str {
210 &self.schema_name
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use std::time::Duration;
217
218 use crate::ddl::alter_database::build_new_schema_value;
219 use crate::key::schema_name::SchemaNameValue;
220 use crate::rpc::ddl::{
221 AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
222 UnsetDatabaseOptions,
223 };
224
225 #[test]
226 fn test_build_new_schema_value() {
227 let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
228 SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
229 ]));
230 let current_schema_value = SchemaNameValue::default();
231 let new_schema_value =
232 build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
233 assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));
234
235 let unset_ttl_alter_kind =
236 AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
237 UnsetDatabaseOption::Ttl,
238 ]));
239 let new_schema_value =
240 build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
241 assert_eq!(new_schema_value.ttl, None);
242 }
243
244 #[test]
245 fn test_build_new_schema_value_with_compaction_options() {
246 let set_compaction = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
247 SetDatabaseOption::Other("compaction.type".to_string(), "twcs".to_string()),
248 SetDatabaseOption::Other("compaction.twcs.time_window".to_string(), "1d".to_string()),
249 ]));
250
251 let current_schema_value = SchemaNameValue::default();
252 let new_schema_value =
253 build_new_schema_value(current_schema_value.clone(), &set_compaction).unwrap();
254
255 assert_eq!(
256 new_schema_value.extra_options.get("compaction.type"),
257 Some(&"twcs".to_string())
258 );
259 assert_eq!(
260 new_schema_value
261 .extra_options
262 .get("compaction.twcs.time_window"),
263 Some(&"1d".to_string())
264 );
265
266 let unset_compaction = AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
267 UnsetDatabaseOption::Other("compaction.type".to_string()),
268 ]));
269
270 let new_schema_value = build_new_schema_value(new_schema_value, &unset_compaction).unwrap();
271
272 assert_eq!(new_schema_value.extra_options.get("compaction.type"), None);
273 assert_eq!(
274 new_schema_value
275 .extra_options
276 .get("compaction.twcs.time_window"),
277 Some(&"1d".to_string())
278 );
279 }
280}