common_meta/ddl/
alter_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
17use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
18use common_telemetry::tracing::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ResultExt, ensure};
21use strum::AsRefStr;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::DdlContext;
25use crate::ddl::utils::map_to_procedure_error;
26use crate::error::{Result, SchemaNotFoundSnafu};
27use crate::instruction::CacheIdent;
28use crate::key::DeserializedValueWithBytes;
29use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
30use crate::lock_key::{CatalogLock, SchemaLock};
31use crate::rpc::ddl::UnsetDatabaseOption::{self};
32use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
33
34pub struct AlterDatabaseProcedure {
35    pub context: DdlContext,
36    pub data: AlterDatabaseData,
37}
38
39fn build_new_schema_value(
40    mut value: SchemaNameValue,
41    alter_kind: &AlterDatabaseKind,
42) -> Result<SchemaNameValue> {
43    match alter_kind {
44        AlterDatabaseKind::SetDatabaseOptions(options) => {
45            for option in options.0.iter() {
46                match option {
47                    SetDatabaseOption::Ttl(ttl) => {
48                        value.ttl = Some(*ttl);
49                    }
50                    SetDatabaseOption::Other(key, val) => {
51                        value.extra_options.insert(key.clone(), val.clone());
52                    }
53                }
54            }
55        }
56        AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
57            for key in keys.0.iter() {
58                match key {
59                    UnsetDatabaseOption::Ttl => value.ttl = None,
60                    UnsetDatabaseOption::Other(key) => {
61                        value.extra_options.remove(key);
62                    }
63                }
64            }
65        }
66    }
67    Ok(value)
68}
69
70impl AlterDatabaseProcedure {
71    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
72
73    pub fn new(task: AlterDatabaseTask, context: DdlContext) -> Result<Self> {
74        Ok(Self {
75            context,
76            data: AlterDatabaseData::new(task)?,
77        })
78    }
79
80    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
82
83        Ok(Self { context, data })
84    }
85
86    pub async fn on_prepare(&mut self) -> Result<Status> {
87        let value = self
88            .context
89            .table_metadata_manager
90            .schema_manager()
91            .get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
92            .await?;
93
94        ensure!(
95            value.is_some(),
96            SchemaNotFoundSnafu {
97                table_schema: self.data.schema(),
98            }
99        );
100
101        self.data.schema_value = value;
102        self.data.state = AlterDatabaseState::UpdateMetadata;
103
104        Ok(Status::executing(true))
105    }
106
107    pub async fn on_update_metadata(&mut self) -> Result<Status> {
108        let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
109
110        // Safety: schema_value is not None.
111        let current_schema_value = self.data.schema_value.as_ref().unwrap();
112
113        let new_schema_value = build_new_schema_value(
114            current_schema_value.get_inner_ref().clone(),
115            &self.data.kind,
116        )?;
117
118        self.context
119            .table_metadata_manager
120            .schema_manager()
121            .update(schema_name, current_schema_value, &new_schema_value)
122            .await?;
123
124        info!("Updated database metadata for schema {schema_name}");
125        self.data.state = AlterDatabaseState::InvalidateSchemaCache;
126        Ok(Status::executing(true))
127    }
128
129    pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
130        let cache_invalidator = &self.context.cache_invalidator;
131        cache_invalidator
132            .invalidate(
133                &Context::default(),
134                &[CacheIdent::SchemaName(SchemaName {
135                    catalog_name: self.data.catalog().to_string(),
136                    schema_name: self.data.schema().to_string(),
137                })],
138            )
139            .await?;
140
141        Ok(Status::done())
142    }
143}
144
145#[async_trait]
146impl Procedure for AlterDatabaseProcedure {
147    fn type_name(&self) -> &str {
148        Self::TYPE_NAME
149    }
150
151    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
152        match self.data.state {
153            AlterDatabaseState::Prepare => self.on_prepare().await,
154            AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
155            AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
156        }
157        .map_err(map_to_procedure_error)
158    }
159
160    fn dump(&self) -> ProcedureResult<String> {
161        serde_json::to_string(&self.data).context(ToJsonSnafu)
162    }
163
164    fn lock_key(&self) -> LockKey {
165        let catalog = self.data.catalog();
166        let schema = self.data.schema();
167
168        let lock_key = vec![
169            CatalogLock::Read(catalog).into(),
170            SchemaLock::write(catalog, schema).into(),
171        ];
172
173        LockKey::new(lock_key)
174    }
175}
176
177#[derive(Debug, Serialize, Deserialize, AsRefStr)]
178enum AlterDatabaseState {
179    Prepare,
180    UpdateMetadata,
181    InvalidateSchemaCache,
182}
183
184/// The data of alter database procedure.
185#[derive(Debug, Serialize, Deserialize)]
186pub struct AlterDatabaseData {
187    state: AlterDatabaseState,
188    kind: AlterDatabaseKind,
189    catalog_name: String,
190    schema_name: String,
191    schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
192}
193
194impl AlterDatabaseData {
195    pub fn new(task: AlterDatabaseTask) -> Result<Self> {
196        Ok(Self {
197            state: AlterDatabaseState::Prepare,
198            kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
199            catalog_name: task.alter_expr.catalog_name,
200            schema_name: task.alter_expr.schema_name,
201            schema_value: None,
202        })
203    }
204
205    pub fn catalog(&self) -> &str {
206        &self.catalog_name
207    }
208
209    pub fn schema(&self) -> &str {
210        &self.schema_name
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use std::time::Duration;
217
218    use crate::ddl::alter_database::build_new_schema_value;
219    use crate::key::schema_name::SchemaNameValue;
220    use crate::rpc::ddl::{
221        AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
222        UnsetDatabaseOptions,
223    };
224
225    #[test]
226    fn test_build_new_schema_value() {
227        let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
228            SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
229        ]));
230        let current_schema_value = SchemaNameValue::default();
231        let new_schema_value =
232            build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
233        assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));
234
235        let unset_ttl_alter_kind =
236            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
237                UnsetDatabaseOption::Ttl,
238            ]));
239        let new_schema_value =
240            build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
241        assert_eq!(new_schema_value.ttl, None);
242    }
243
244    #[test]
245    fn test_build_new_schema_value_with_compaction_options() {
246        let set_compaction = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
247            SetDatabaseOption::Other("compaction.type".to_string(), "twcs".to_string()),
248            SetDatabaseOption::Other("compaction.twcs.time_window".to_string(), "1d".to_string()),
249        ]));
250
251        let current_schema_value = SchemaNameValue::default();
252        let new_schema_value =
253            build_new_schema_value(current_schema_value.clone(), &set_compaction).unwrap();
254
255        assert_eq!(
256            new_schema_value.extra_options.get("compaction.type"),
257            Some(&"twcs".to_string())
258        );
259        assert_eq!(
260            new_schema_value
261                .extra_options
262                .get("compaction.twcs.time_window"),
263            Some(&"1d".to_string())
264        );
265
266        let unset_compaction = AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
267            UnsetDatabaseOption::Other("compaction.type".to_string()),
268        ]));
269
270        let new_schema_value = build_new_schema_value(new_schema_value, &unset_compaction).unwrap();
271
272        assert_eq!(new_schema_value.extra_options.get("compaction.type"), None);
273        assert_eq!(
274            new_schema_value
275                .extra_options
276                .get("compaction.twcs.time_window"),
277            Some(&"1d".to_string())
278        );
279    }
280}