common_meta/ddl/
alter_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
17use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
18use common_telemetry::tracing::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, ResultExt};
21use strum::AsRefStr;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::utils::handle_retry_error;
25use crate::ddl::DdlContext;
26use crate::error::{Result, SchemaNotFoundSnafu};
27use crate::instruction::CacheIdent;
28use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
29use crate::key::DeserializedValueWithBytes;
30use crate::lock_key::{CatalogLock, SchemaLock};
31use crate::rpc::ddl::UnsetDatabaseOption::{self};
32use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
33
34pub struct AlterDatabaseProcedure {
35    pub context: DdlContext,
36    pub data: AlterDatabaseData,
37}
38
39fn build_new_schema_value(
40    mut value: SchemaNameValue,
41    alter_kind: &AlterDatabaseKind,
42) -> Result<SchemaNameValue> {
43    match alter_kind {
44        AlterDatabaseKind::SetDatabaseOptions(options) => {
45            for option in options.0.iter() {
46                match option {
47                    SetDatabaseOption::Ttl(ttl) => {
48                        value.ttl = Some(*ttl);
49                    }
50                }
51            }
52        }
53        AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
54            for key in keys.0.iter() {
55                match key {
56                    UnsetDatabaseOption::Ttl => value.ttl = None,
57                }
58            }
59        }
60    }
61    Ok(value)
62}
63
64impl AlterDatabaseProcedure {
65    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
66
67    pub fn new(task: AlterDatabaseTask, context: DdlContext) -> Result<Self> {
68        Ok(Self {
69            context,
70            data: AlterDatabaseData::new(task)?,
71        })
72    }
73
74    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
75        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
76
77        Ok(Self { context, data })
78    }
79
80    pub async fn on_prepare(&mut self) -> Result<Status> {
81        let value = self
82            .context
83            .table_metadata_manager
84            .schema_manager()
85            .get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
86            .await?;
87
88        ensure!(
89            value.is_some(),
90            SchemaNotFoundSnafu {
91                table_schema: self.data.schema(),
92            }
93        );
94
95        self.data.schema_value = value;
96        self.data.state = AlterDatabaseState::UpdateMetadata;
97
98        Ok(Status::executing(true))
99    }
100
101    pub async fn on_update_metadata(&mut self) -> Result<Status> {
102        let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
103
104        // Safety: schema_value is not None.
105        let current_schema_value = self.data.schema_value.as_ref().unwrap();
106
107        let new_schema_value = build_new_schema_value(
108            current_schema_value.get_inner_ref().clone(),
109            &self.data.kind,
110        )?;
111
112        self.context
113            .table_metadata_manager
114            .schema_manager()
115            .update(schema_name, current_schema_value, &new_schema_value)
116            .await?;
117
118        info!("Updated database metadata for schema {schema_name}");
119        self.data.state = AlterDatabaseState::InvalidateSchemaCache;
120        Ok(Status::executing(true))
121    }
122
123    pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
124        let cache_invalidator = &self.context.cache_invalidator;
125        cache_invalidator
126            .invalidate(
127                &Context::default(),
128                &[CacheIdent::SchemaName(SchemaName {
129                    catalog_name: self.data.catalog().to_string(),
130                    schema_name: self.data.schema().to_string(),
131                })],
132            )
133            .await?;
134
135        Ok(Status::done())
136    }
137}
138
139#[async_trait]
140impl Procedure for AlterDatabaseProcedure {
141    fn type_name(&self) -> &str {
142        Self::TYPE_NAME
143    }
144
145    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
146        match self.data.state {
147            AlterDatabaseState::Prepare => self.on_prepare().await,
148            AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
149            AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
150        }
151        .map_err(handle_retry_error)
152    }
153
154    fn dump(&self) -> ProcedureResult<String> {
155        serde_json::to_string(&self.data).context(ToJsonSnafu)
156    }
157
158    fn lock_key(&self) -> LockKey {
159        let catalog = self.data.catalog();
160        let schema = self.data.schema();
161
162        let lock_key = vec![
163            CatalogLock::Read(catalog).into(),
164            SchemaLock::write(catalog, schema).into(),
165        ];
166
167        LockKey::new(lock_key)
168    }
169}
170
171#[derive(Debug, Serialize, Deserialize, AsRefStr)]
172enum AlterDatabaseState {
173    Prepare,
174    UpdateMetadata,
175    InvalidateSchemaCache,
176}
177
178/// The data of alter database procedure.
179#[derive(Debug, Serialize, Deserialize)]
180pub struct AlterDatabaseData {
181    state: AlterDatabaseState,
182    kind: AlterDatabaseKind,
183    catalog_name: String,
184    schema_name: String,
185    schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
186}
187
188impl AlterDatabaseData {
189    pub fn new(task: AlterDatabaseTask) -> Result<Self> {
190        Ok(Self {
191            state: AlterDatabaseState::Prepare,
192            kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
193            catalog_name: task.alter_expr.catalog_name,
194            schema_name: task.alter_expr.schema_name,
195            schema_value: None,
196        })
197    }
198
199    pub fn catalog(&self) -> &str {
200        &self.catalog_name
201    }
202
203    pub fn schema(&self) -> &str {
204        &self.schema_name
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use std::time::Duration;
211
212    use crate::ddl::alter_database::build_new_schema_value;
213    use crate::key::schema_name::SchemaNameValue;
214    use crate::rpc::ddl::{
215        AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
216        UnsetDatabaseOptions,
217    };
218
219    #[test]
220    fn test_build_new_schema_value() {
221        let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
222            SetDatabaseOption::Ttl(Duration::from_secs(10).into()),
223        ]));
224        let current_schema_value = SchemaNameValue::default();
225        let new_schema_value =
226            build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
227        assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10).into()));
228
229        let unset_ttl_alter_kind =
230            AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
231                UnsetDatabaseOption::Ttl,
232            ]));
233        let new_schema_value =
234            build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
235        assert_eq!(new_schema_value.ttl, None);
236    }
237}