common_meta/ddl/
comment_on.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use chrono::Utc;
17use common_catalog::format_full_table_name;
18use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
19use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
20use common_telemetry::tracing::info;
21use datatypes::schema::COMMENT_KEY as COLUMN_COMMENT_KEY;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::storage::TableId;
25use strum::AsRefStr;
26use table::metadata::RawTableInfo;
27use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY;
28use table::table_name::TableName;
29
30use crate::cache_invalidator::Context;
31use crate::ddl::DdlContext;
32use crate::ddl::utils::map_to_procedure_error;
33use crate::error::{ColumnNotFoundSnafu, FlowNotFoundSnafu, Result, TableNotFoundSnafu};
34use crate::instruction::CacheIdent;
35use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue};
36use crate::key::table_info::{TableInfoKey, TableInfoValue};
37use crate::key::table_name::TableNameKey;
38use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue};
39use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
40use crate::rpc::ddl::{CommentObjectType, CommentOnTask};
41use crate::rpc::store::PutRequest;
42
43pub struct CommentOnProcedure {
44    pub context: DdlContext,
45    pub data: CommentOnData,
46}
47
48impl CommentOnProcedure {
49    pub const TYPE_NAME: &'static str = "metasrv-procedure::CommentOn";
50
51    pub fn new(task: CommentOnTask, context: DdlContext) -> Self {
52        Self {
53            context,
54            data: CommentOnData::new(task),
55        }
56    }
57
58    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
59        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
60
61        Ok(Self { context, data })
62    }
63
64    pub async fn on_prepare(&mut self) -> Result<Status> {
65        match self.data.object_type {
66            CommentObjectType::Table | CommentObjectType::Column => {
67                self.prepare_table_or_column().await?;
68            }
69            CommentObjectType::Flow => {
70                self.prepare_flow().await?;
71            }
72        }
73
74        // Fast path: if comment is unchanged, skip update
75        if self.data.is_unchanged {
76            let object_desc = match self.data.object_type {
77                CommentObjectType::Table => format!(
78                    "table {}",
79                    format_full_table_name(
80                        &self.data.catalog_name,
81                        &self.data.schema_name,
82                        &self.data.object_name,
83                    )
84                ),
85                CommentObjectType::Column => format!(
86                    "column {}.{}",
87                    format_full_table_name(
88                        &self.data.catalog_name,
89                        &self.data.schema_name,
90                        &self.data.object_name,
91                    ),
92                    self.data.column_name.as_ref().unwrap()
93                ),
94                CommentObjectType::Flow => {
95                    format!("flow {}.{}", self.data.catalog_name, self.data.object_name)
96                }
97            };
98            info!("Comment unchanged for {}, skipping update", object_desc);
99            return Ok(Status::done());
100        }
101
102        self.data.state = CommentOnState::UpdateMetadata;
103        Ok(Status::executing(true))
104    }
105
106    async fn prepare_table_or_column(&mut self) -> Result<()> {
107        let table_name_key = TableNameKey::new(
108            &self.data.catalog_name,
109            &self.data.schema_name,
110            &self.data.object_name,
111        );
112
113        let table_id = self
114            .context
115            .table_metadata_manager
116            .table_name_manager()
117            .get(table_name_key)
118            .await?
119            .with_context(|| TableNotFoundSnafu {
120                table_name: format_full_table_name(
121                    &self.data.catalog_name,
122                    &self.data.schema_name,
123                    &self.data.object_name,
124                ),
125            })?
126            .table_id();
127
128        let table_info = self
129            .context
130            .table_metadata_manager
131            .table_info_manager()
132            .get(table_id)
133            .await?
134            .with_context(|| TableNotFoundSnafu {
135                table_name: format_full_table_name(
136                    &self.data.catalog_name,
137                    &self.data.schema_name,
138                    &self.data.object_name,
139                ),
140            })?;
141
142        // For column comments, validate the column exists
143        if self.data.object_type == CommentObjectType::Column {
144            let column_name = self.data.column_name.as_ref().unwrap();
145            let column_exists = table_info
146                .table_info
147                .meta
148                .schema
149                .column_schemas
150                .iter()
151                .any(|col| &col.name == column_name);
152
153            ensure!(
154                column_exists,
155                ColumnNotFoundSnafu {
156                    column_name,
157                    column_id: 0u32, // column_id is not known here
158                }
159            );
160        }
161
162        self.data.table_id = Some(table_id);
163
164        // Check if comment is unchanged for early exit optimization
165        match self.data.object_type {
166            CommentObjectType::Table => {
167                let current_comment = &table_info.table_info.desc;
168                if &self.data.comment == current_comment {
169                    self.data.is_unchanged = true;
170                }
171            }
172            CommentObjectType::Column => {
173                let column_name = self.data.column_name.as_ref().unwrap();
174                let column_schema = table_info
175                    .table_info
176                    .meta
177                    .schema
178                    .column_schemas
179                    .iter()
180                    .find(|col| &col.name == column_name)
181                    .unwrap(); // Safe: validated above
182
183                let current_comment = column_schema.metadata().get(COLUMN_COMMENT_KEY);
184                if self.data.comment.as_deref() == current_comment.map(String::as_str) {
185                    self.data.is_unchanged = true;
186                }
187            }
188            CommentObjectType::Flow => {
189                // this branch is handled in `prepare_flow`
190            }
191        }
192
193        self.data.table_info = Some(table_info);
194
195        Ok(())
196    }
197
198    async fn prepare_flow(&mut self) -> Result<()> {
199        let flow_name_value = self
200            .context
201            .flow_metadata_manager
202            .flow_name_manager()
203            .get(&self.data.catalog_name, &self.data.object_name)
204            .await?
205            .with_context(|| FlowNotFoundSnafu {
206                flow_name: &self.data.object_name,
207            })?;
208
209        let flow_id = flow_name_value.flow_id();
210        let flow_info = self
211            .context
212            .flow_metadata_manager
213            .flow_info_manager()
214            .get_raw(flow_id)
215            .await?
216            .with_context(|| FlowNotFoundSnafu {
217                flow_name: &self.data.object_name,
218            })?;
219
220        self.data.flow_id = Some(flow_id);
221
222        // Check if comment is unchanged for early exit optimization
223        let current_comment = &flow_info.get_inner_ref().comment;
224        let new_comment = self.data.comment.as_deref().unwrap_or("");
225        if new_comment == current_comment.as_str() {
226            self.data.is_unchanged = true;
227        }
228
229        self.data.flow_info = Some(flow_info);
230
231        Ok(())
232    }
233
234    pub async fn on_update_metadata(&mut self) -> Result<Status> {
235        match self.data.object_type {
236            CommentObjectType::Table => {
237                self.update_table_comment().await?;
238            }
239            CommentObjectType::Column => {
240                self.update_column_comment().await?;
241            }
242            CommentObjectType::Flow => {
243                self.update_flow_comment().await?;
244            }
245        }
246
247        self.data.state = CommentOnState::InvalidateCache;
248        Ok(Status::executing(true))
249    }
250
251    async fn update_table_comment(&mut self) -> Result<()> {
252        let table_info_value = self.data.table_info.as_ref().unwrap();
253        let mut new_table_info = table_info_value.table_info.clone();
254
255        new_table_info.desc = self.data.comment.clone();
256
257        // Sync comment to table options
258        sync_table_comment_option(
259            &mut new_table_info.meta.options,
260            new_table_info.desc.as_deref(),
261        );
262
263        self.update_table_info(table_info_value, new_table_info)
264            .await?;
265
266        info!(
267            "Updated comment for table {}.{}.{}",
268            self.data.catalog_name, self.data.schema_name, self.data.object_name
269        );
270
271        Ok(())
272    }
273
274    async fn update_column_comment(&mut self) -> Result<()> {
275        let table_info_value = self.data.table_info.as_ref().unwrap();
276        let mut new_table_info = table_info_value.table_info.clone();
277
278        let column_name = self.data.column_name.as_ref().unwrap();
279        let column_schema = new_table_info
280            .meta
281            .schema
282            .column_schemas
283            .iter_mut()
284            .find(|col| &col.name == column_name)
285            .unwrap(); // Safe: validated in prepare
286
287        update_column_comment_metadata(column_schema, self.data.comment.clone());
288
289        self.update_table_info(table_info_value, new_table_info)
290            .await?;
291
292        info!(
293            "Updated comment for column {}.{}.{}.{}",
294            self.data.catalog_name, self.data.schema_name, self.data.object_name, column_name
295        );
296
297        Ok(())
298    }
299
300    async fn update_flow_comment(&mut self) -> Result<()> {
301        let flow_id = self.data.flow_id.unwrap();
302        let flow_info_value = self.data.flow_info.as_ref().unwrap();
303
304        let mut new_flow_info = flow_info_value.get_inner_ref().clone();
305        new_flow_info.comment = self.data.comment.clone().unwrap_or_default();
306        new_flow_info.updated_time = Utc::now();
307
308        let raw_value = new_flow_info.try_as_raw_value()?;
309
310        self.context
311            .table_metadata_manager
312            .kv_backend()
313            .put(
314                PutRequest::new()
315                    .with_key(FlowInfoKey::new(flow_id).to_bytes())
316                    .with_value(raw_value),
317            )
318            .await?;
319
320        info!(
321            "Updated comment for flow {}.{}",
322            self.data.catalog_name, self.data.object_name
323        );
324
325        Ok(())
326    }
327
328    async fn update_table_info(
329        &self,
330        current_table_info: &DeserializedValueWithBytes<TableInfoValue>,
331        new_table_info: RawTableInfo,
332    ) -> Result<()> {
333        let table_id = current_table_info.table_info.ident.table_id;
334        let new_table_info_value = current_table_info.update(new_table_info);
335        let raw_value = new_table_info_value.try_as_raw_value()?;
336
337        self.context
338            .table_metadata_manager
339            .kv_backend()
340            .put(
341                PutRequest::new()
342                    .with_key(TableInfoKey::new(table_id).to_bytes())
343                    .with_value(raw_value),
344            )
345            .await?;
346
347        Ok(())
348    }
349
350    pub async fn on_invalidate_cache(&mut self) -> Result<Status> {
351        let cache_invalidator = &self.context.cache_invalidator;
352
353        match self.data.object_type {
354            CommentObjectType::Table | CommentObjectType::Column => {
355                let table_id = self.data.table_id.unwrap();
356                let table_name = TableName::new(
357                    self.data.catalog_name.clone(),
358                    self.data.schema_name.clone(),
359                    self.data.object_name.clone(),
360                );
361
362                let cache_ident = vec![
363                    CacheIdent::TableId(table_id),
364                    CacheIdent::TableName(table_name),
365                ];
366
367                cache_invalidator
368                    .invalidate(&Context::default(), &cache_ident)
369                    .await?;
370            }
371            CommentObjectType::Flow => {
372                let flow_id = self.data.flow_id.unwrap();
373                let cache_ident = vec![CacheIdent::FlowId(flow_id)];
374
375                cache_invalidator
376                    .invalidate(&Context::default(), &cache_ident)
377                    .await?;
378            }
379        }
380
381        Ok(Status::done())
382    }
383}
384
385#[async_trait]
386impl Procedure for CommentOnProcedure {
387    fn type_name(&self) -> &str {
388        Self::TYPE_NAME
389    }
390
391    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
392        match self.data.state {
393            CommentOnState::Prepare => self.on_prepare().await,
394            CommentOnState::UpdateMetadata => self.on_update_metadata().await,
395            CommentOnState::InvalidateCache => self.on_invalidate_cache().await,
396        }
397        .map_err(map_to_procedure_error)
398    }
399
400    fn dump(&self) -> ProcedureResult<String> {
401        serde_json::to_string(&self.data).context(ToJsonSnafu)
402    }
403
404    fn lock_key(&self) -> LockKey {
405        let catalog = &self.data.catalog_name;
406        let schema = &self.data.schema_name;
407
408        let lock_key = match self.data.object_type {
409            CommentObjectType::Table | CommentObjectType::Column => {
410                vec![
411                    CatalogLock::Read(catalog).into(),
412                    SchemaLock::read(catalog, schema).into(),
413                    TableNameLock::new(catalog, schema, &self.data.object_name).into(),
414                ]
415            }
416            CommentObjectType::Flow => {
417                vec![
418                    CatalogLock::Read(catalog).into(),
419                    FlowNameLock::new(catalog, &self.data.object_name).into(),
420                ]
421            }
422        };
423
424        LockKey::new(lock_key)
425    }
426}
427
428#[derive(Debug, Serialize, Deserialize, AsRefStr)]
429enum CommentOnState {
430    Prepare,
431    UpdateMetadata,
432    InvalidateCache,
433}
434
435/// The data of comment on procedure.
436#[derive(Debug, Serialize, Deserialize)]
437pub struct CommentOnData {
438    state: CommentOnState,
439    catalog_name: String,
440    schema_name: String,
441    object_type: CommentObjectType,
442    object_name: String,
443    /// Column name (only for Column comments)
444    column_name: Option<String>,
445    comment: Option<String>,
446    /// Cached table ID (for Table/Column)
447    #[serde(skip_serializing_if = "Option::is_none")]
448    table_id: Option<TableId>,
449    /// Cached table info (for Table/Column)
450    #[serde(skip)]
451    table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
452    /// Cached flow ID (for Flow)
453    #[serde(skip_serializing_if = "Option::is_none")]
454    flow_id: Option<FlowId>,
455    /// Cached flow info (for Flow)
456    #[serde(skip)]
457    flow_info: Option<DeserializedValueWithBytes<FlowInfoValue>>,
458    /// Whether the comment is unchanged (optimization for early exit)
459    #[serde(skip)]
460    is_unchanged: bool,
461}
462
463impl CommentOnData {
464    pub fn new(task: CommentOnTask) -> Self {
465        Self {
466            state: CommentOnState::Prepare,
467            catalog_name: task.catalog_name,
468            schema_name: task.schema_name,
469            object_type: task.object_type,
470            object_name: task.object_name,
471            column_name: task.column_name,
472            comment: task.comment,
473            table_id: None,
474            table_info: None,
475            flow_id: None,
476            flow_info: None,
477            is_unchanged: false,
478        }
479    }
480}
481
482fn update_column_comment_metadata(
483    column_schema: &mut datatypes::schema::ColumnSchema,
484    comment: Option<String>,
485) {
486    match comment {
487        Some(value) => {
488            column_schema
489                .mut_metadata()
490                .insert(COLUMN_COMMENT_KEY.to_string(), value);
491        }
492        None => {
493            column_schema.mut_metadata().remove(COLUMN_COMMENT_KEY);
494        }
495    }
496}
497
498fn sync_table_comment_option(options: &mut table::requests::TableOptions, comment: Option<&str>) {
499    match comment {
500        Some(value) => {
501            options
502                .extra_options
503                .insert(TABLE_COMMENT_KEY.to_string(), value.to_string());
504        }
505        None => {
506            options.extra_options.remove(TABLE_COMMENT_KEY);
507        }
508    }
509}