common_meta/ddl/
comment_on.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use chrono::Utc;
19use common_catalog::format_full_table_name;
20use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
21use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
22use common_telemetry::tracing::info;
23use datatypes::schema::{COMMENT_KEY as COLUMN_COMMENT_KEY, Schema};
24use serde::{Deserialize, Serialize};
25use snafu::{OptionExt, ResultExt, ensure};
26use store_api::storage::TableId;
27use strum::AsRefStr;
28use table::metadata::TableInfo;
29use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY;
30use table::table_name::TableName;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::DdlContext;
34use crate::ddl::utils::map_to_procedure_error;
35use crate::error::{ColumnNotFoundSnafu, FlowNotFoundSnafu, Result, TableNotFoundSnafu};
36use crate::instruction::CacheIdent;
37use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue};
38use crate::key::table_info::{TableInfoKey, TableInfoValue};
39use crate::key::table_name::TableNameKey;
40use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue};
41use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
42use crate::rpc::ddl::{CommentObjectType, CommentOnTask};
43use crate::rpc::store::PutRequest;
44
45pub struct CommentOnProcedure {
46    pub context: DdlContext,
47    pub data: CommentOnData,
48}
49
50impl CommentOnProcedure {
51    pub const TYPE_NAME: &'static str = "metasrv-procedure::CommentOn";
52
53    pub fn new(task: CommentOnTask, context: DdlContext) -> Self {
54        Self {
55            context,
56            data: CommentOnData::new(task),
57        }
58    }
59
60    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
61        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
62
63        Ok(Self { context, data })
64    }
65
66    pub async fn on_prepare(&mut self) -> Result<Status> {
67        match self.data.object_type {
68            CommentObjectType::Table | CommentObjectType::Column => {
69                self.prepare_table_or_column().await?;
70            }
71            CommentObjectType::Flow => {
72                self.prepare_flow().await?;
73            }
74        }
75
76        // Fast path: if comment is unchanged, skip update
77        if self.data.is_unchanged {
78            let object_desc = match self.data.object_type {
79                CommentObjectType::Table => format!(
80                    "table {}",
81                    format_full_table_name(
82                        &self.data.catalog_name,
83                        &self.data.schema_name,
84                        &self.data.object_name,
85                    )
86                ),
87                CommentObjectType::Column => format!(
88                    "column {}.{}",
89                    format_full_table_name(
90                        &self.data.catalog_name,
91                        &self.data.schema_name,
92                        &self.data.object_name,
93                    ),
94                    self.data.column_name.as_ref().unwrap()
95                ),
96                CommentObjectType::Flow => {
97                    format!("flow {}.{}", self.data.catalog_name, self.data.object_name)
98                }
99            };
100            info!("Comment unchanged for {}, skipping update", object_desc);
101            return Ok(Status::done());
102        }
103
104        self.data.state = CommentOnState::UpdateMetadata;
105        Ok(Status::executing(true))
106    }
107
108    async fn prepare_table_or_column(&mut self) -> Result<()> {
109        let table_name_key = TableNameKey::new(
110            &self.data.catalog_name,
111            &self.data.schema_name,
112            &self.data.object_name,
113        );
114
115        let table_id = self
116            .context
117            .table_metadata_manager
118            .table_name_manager()
119            .get(table_name_key)
120            .await?
121            .with_context(|| TableNotFoundSnafu {
122                table_name: format_full_table_name(
123                    &self.data.catalog_name,
124                    &self.data.schema_name,
125                    &self.data.object_name,
126                ),
127            })?
128            .table_id();
129
130        let table_info = self
131            .context
132            .table_metadata_manager
133            .table_info_manager()
134            .get(table_id)
135            .await?
136            .with_context(|| TableNotFoundSnafu {
137                table_name: format_full_table_name(
138                    &self.data.catalog_name,
139                    &self.data.schema_name,
140                    &self.data.object_name,
141                ),
142            })?;
143
144        // For column comments, validate the column exists
145        if self.data.object_type == CommentObjectType::Column {
146            let column_name = self.data.column_name.as_ref().unwrap();
147            let column_exists = table_info
148                .table_info
149                .meta
150                .schema
151                .column_schemas()
152                .iter()
153                .any(|col| &col.name == column_name);
154
155            ensure!(
156                column_exists,
157                ColumnNotFoundSnafu {
158                    column_name,
159                    column_id: 0u32, // column_id is not known here
160                }
161            );
162        }
163
164        self.data.table_id = Some(table_id);
165
166        // Check if comment is unchanged for early exit optimization
167        match self.data.object_type {
168            CommentObjectType::Table => {
169                let current_comment = &table_info.table_info.desc;
170                if &self.data.comment == current_comment {
171                    self.data.is_unchanged = true;
172                }
173            }
174            CommentObjectType::Column => {
175                let column_name = self.data.column_name.as_ref().unwrap();
176                let column_schema = table_info
177                    .table_info
178                    .meta
179                    .schema
180                    .column_schemas()
181                    .iter()
182                    .find(|col| &col.name == column_name)
183                    .unwrap(); // Safe: validated above
184
185                let current_comment = column_schema.metadata().get(COLUMN_COMMENT_KEY);
186                if self.data.comment.as_deref() == current_comment.map(String::as_str) {
187                    self.data.is_unchanged = true;
188                }
189            }
190            CommentObjectType::Flow => {
191                // this branch is handled in `prepare_flow`
192            }
193        }
194
195        self.data.table_info = Some(table_info);
196
197        Ok(())
198    }
199
200    async fn prepare_flow(&mut self) -> Result<()> {
201        let flow_name_value = self
202            .context
203            .flow_metadata_manager
204            .flow_name_manager()
205            .get(&self.data.catalog_name, &self.data.object_name)
206            .await?
207            .with_context(|| FlowNotFoundSnafu {
208                flow_name: &self.data.object_name,
209            })?;
210
211        let flow_id = flow_name_value.flow_id();
212        let flow_info = self
213            .context
214            .flow_metadata_manager
215            .flow_info_manager()
216            .get_raw(flow_id)
217            .await?
218            .with_context(|| FlowNotFoundSnafu {
219                flow_name: &self.data.object_name,
220            })?;
221
222        self.data.flow_id = Some(flow_id);
223
224        // Check if comment is unchanged for early exit optimization
225        let current_comment = &flow_info.get_inner_ref().comment;
226        let new_comment = self.data.comment.as_deref().unwrap_or("");
227        if new_comment == current_comment.as_str() {
228            self.data.is_unchanged = true;
229        }
230
231        self.data.flow_info = Some(flow_info);
232
233        Ok(())
234    }
235
236    pub async fn on_update_metadata(&mut self) -> Result<Status> {
237        match self.data.object_type {
238            CommentObjectType::Table => {
239                self.update_table_comment().await?;
240            }
241            CommentObjectType::Column => {
242                self.update_column_comment().await?;
243            }
244            CommentObjectType::Flow => {
245                self.update_flow_comment().await?;
246            }
247        }
248
249        self.data.state = CommentOnState::InvalidateCache;
250        Ok(Status::executing(true))
251    }
252
253    async fn update_table_comment(&mut self) -> Result<()> {
254        let table_info_value = self.data.table_info.as_ref().unwrap();
255        let mut new_table_info = table_info_value.table_info.clone();
256
257        new_table_info.desc = self.data.comment.clone();
258
259        // Sync comment to table options
260        sync_table_comment_option(
261            &mut new_table_info.meta.options,
262            new_table_info.desc.as_deref(),
263        );
264
265        self.update_table_info(table_info_value, new_table_info)
266            .await?;
267
268        info!(
269            "Updated comment for table {}.{}.{}",
270            self.data.catalog_name, self.data.schema_name, self.data.object_name
271        );
272
273        Ok(())
274    }
275
276    async fn update_column_comment(&mut self) -> Result<()> {
277        let table_info_value = self.data.table_info.as_ref().unwrap();
278        let mut new_table_info = table_info_value.table_info.clone();
279
280        let column_name = self.data.column_name.as_ref().unwrap();
281        let mut column_schemas = new_table_info.meta.schema.column_schemas().to_vec();
282        let column_schema = column_schemas
283            .iter_mut()
284            .find(|col| &col.name == column_name)
285            .unwrap(); // Safe: validated in prepare
286
287        update_column_comment_metadata(column_schema, self.data.comment.clone());
288
289        new_table_info.meta.schema = Arc::new(Schema::new_with_version(
290            column_schemas,
291            new_table_info.meta.schema.version(),
292        ));
293        self.update_table_info(table_info_value, new_table_info)
294            .await?;
295
296        info!(
297            "Updated comment for column {}.{}.{}.{}",
298            self.data.catalog_name, self.data.schema_name, self.data.object_name, column_name
299        );
300
301        Ok(())
302    }
303
304    async fn update_flow_comment(&mut self) -> Result<()> {
305        let flow_id = self.data.flow_id.unwrap();
306        let flow_info_value = self.data.flow_info.as_ref().unwrap();
307
308        let mut new_flow_info = flow_info_value.get_inner_ref().clone();
309        new_flow_info.comment = self.data.comment.clone().unwrap_or_default();
310        new_flow_info.updated_time = Utc::now();
311
312        let raw_value = new_flow_info.try_as_raw_value()?;
313
314        self.context
315            .table_metadata_manager
316            .kv_backend()
317            .put(
318                PutRequest::new()
319                    .with_key(FlowInfoKey::new(flow_id).to_bytes())
320                    .with_value(raw_value),
321            )
322            .await?;
323
324        info!(
325            "Updated comment for flow {}.{}",
326            self.data.catalog_name, self.data.object_name
327        );
328
329        Ok(())
330    }
331
332    async fn update_table_info(
333        &self,
334        current_table_info: &DeserializedValueWithBytes<TableInfoValue>,
335        new_table_info: TableInfo,
336    ) -> Result<()> {
337        let table_id = current_table_info.table_info.ident.table_id;
338        let new_table_info_value = current_table_info.update(new_table_info);
339        let raw_value = new_table_info_value.try_as_raw_value()?;
340
341        self.context
342            .table_metadata_manager
343            .kv_backend()
344            .put(
345                PutRequest::new()
346                    .with_key(TableInfoKey::new(table_id).to_bytes())
347                    .with_value(raw_value),
348            )
349            .await?;
350
351        Ok(())
352    }
353
354    pub async fn on_invalidate_cache(&mut self) -> Result<Status> {
355        let cache_invalidator = &self.context.cache_invalidator;
356
357        match self.data.object_type {
358            CommentObjectType::Table | CommentObjectType::Column => {
359                let table_id = self.data.table_id.unwrap();
360                let table_name = TableName::new(
361                    self.data.catalog_name.clone(),
362                    self.data.schema_name.clone(),
363                    self.data.object_name.clone(),
364                );
365
366                let cache_ident = vec![
367                    CacheIdent::TableId(table_id),
368                    CacheIdent::TableName(table_name),
369                ];
370
371                cache_invalidator
372                    .invalidate(&Context::default(), &cache_ident)
373                    .await?;
374            }
375            CommentObjectType::Flow => {
376                let flow_id = self.data.flow_id.unwrap();
377                let cache_ident = vec![CacheIdent::FlowId(flow_id)];
378
379                cache_invalidator
380                    .invalidate(&Context::default(), &cache_ident)
381                    .await?;
382            }
383        }
384
385        Ok(Status::done())
386    }
387}
388
389#[async_trait]
390impl Procedure for CommentOnProcedure {
391    fn type_name(&self) -> &str {
392        Self::TYPE_NAME
393    }
394
395    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
396        match self.data.state {
397            CommentOnState::Prepare => self.on_prepare().await,
398            CommentOnState::UpdateMetadata => self.on_update_metadata().await,
399            CommentOnState::InvalidateCache => self.on_invalidate_cache().await,
400        }
401        .map_err(map_to_procedure_error)
402    }
403
404    fn dump(&self) -> ProcedureResult<String> {
405        serde_json::to_string(&self.data).context(ToJsonSnafu)
406    }
407
408    fn lock_key(&self) -> LockKey {
409        let catalog = &self.data.catalog_name;
410        let schema = &self.data.schema_name;
411
412        let lock_key = match self.data.object_type {
413            CommentObjectType::Table | CommentObjectType::Column => {
414                vec![
415                    CatalogLock::Read(catalog).into(),
416                    SchemaLock::read(catalog, schema).into(),
417                    TableNameLock::new(catalog, schema, &self.data.object_name).into(),
418                ]
419            }
420            CommentObjectType::Flow => {
421                vec![
422                    CatalogLock::Read(catalog).into(),
423                    FlowNameLock::new(catalog, &self.data.object_name).into(),
424                ]
425            }
426        };
427
428        LockKey::new(lock_key)
429    }
430}
431
432#[derive(Debug, Serialize, Deserialize, AsRefStr)]
433enum CommentOnState {
434    Prepare,
435    UpdateMetadata,
436    InvalidateCache,
437}
438
439/// The data of comment on procedure.
440#[derive(Debug, Serialize, Deserialize)]
441pub struct CommentOnData {
442    state: CommentOnState,
443    catalog_name: String,
444    schema_name: String,
445    object_type: CommentObjectType,
446    object_name: String,
447    /// Column name (only for Column comments)
448    column_name: Option<String>,
449    comment: Option<String>,
450    /// Cached table ID (for Table/Column)
451    #[serde(skip_serializing_if = "Option::is_none")]
452    table_id: Option<TableId>,
453    /// Cached table info (for Table/Column)
454    #[serde(skip)]
455    table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
456    /// Cached flow ID (for Flow)
457    #[serde(skip_serializing_if = "Option::is_none")]
458    flow_id: Option<FlowId>,
459    /// Cached flow info (for Flow)
460    #[serde(skip)]
461    flow_info: Option<DeserializedValueWithBytes<FlowInfoValue>>,
462    /// Whether the comment is unchanged (optimization for early exit)
463    #[serde(skip)]
464    is_unchanged: bool,
465}
466
467impl CommentOnData {
468    pub fn new(task: CommentOnTask) -> Self {
469        Self {
470            state: CommentOnState::Prepare,
471            catalog_name: task.catalog_name,
472            schema_name: task.schema_name,
473            object_type: task.object_type,
474            object_name: task.object_name,
475            column_name: task.column_name,
476            comment: task.comment,
477            table_id: None,
478            table_info: None,
479            flow_id: None,
480            flow_info: None,
481            is_unchanged: false,
482        }
483    }
484}
485
486fn update_column_comment_metadata(
487    column_schema: &mut datatypes::schema::ColumnSchema,
488    comment: Option<String>,
489) {
490    match comment {
491        Some(value) => {
492            column_schema
493                .mut_metadata()
494                .insert(COLUMN_COMMENT_KEY.to_string(), value);
495        }
496        None => {
497            column_schema.mut_metadata().remove(COLUMN_COMMENT_KEY);
498        }
499    }
500}
501
502fn sync_table_comment_option(options: &mut table::requests::TableOptions, comment: Option<&str>) {
503    match comment {
504        Some(value) => {
505            options
506                .extra_options
507                .insert(TABLE_COMMENT_KEY.to_string(), value.to_string());
508        }
509        None => {
510            options.extra_options.remove(TABLE_COMMENT_KEY);
511        }
512    }
513}