1use std::sync::Arc;
16
17use async_trait::async_trait;
18use chrono::Utc;
19use common_catalog::format_full_table_name;
20use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
21use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
22use common_telemetry::tracing::info;
23use datatypes::schema::{COMMENT_KEY as COLUMN_COMMENT_KEY, Schema};
24use serde::{Deserialize, Serialize};
25use snafu::{OptionExt, ResultExt, ensure};
26use store_api::storage::TableId;
27use strum::AsRefStr;
28use table::metadata::TableInfo;
29use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY;
30use table::table_name::TableName;
31
32use crate::cache_invalidator::Context;
33use crate::ddl::DdlContext;
34use crate::ddl::utils::map_to_procedure_error;
35use crate::error::{ColumnNotFoundSnafu, FlowNotFoundSnafu, Result, TableNotFoundSnafu};
36use crate::instruction::CacheIdent;
37use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue};
38use crate::key::table_info::{TableInfoKey, TableInfoValue};
39use crate::key::table_name::TableNameKey;
40use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue};
41use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
42use crate::rpc::ddl::{CommentObjectType, CommentOnTask};
43use crate::rpc::store::PutRequest;
44
45pub struct CommentOnProcedure {
46 pub context: DdlContext,
47 pub data: CommentOnData,
48}
49
50impl CommentOnProcedure {
51 pub const TYPE_NAME: &'static str = "metasrv-procedure::CommentOn";
52
53 pub fn new(task: CommentOnTask, context: DdlContext) -> Self {
54 Self {
55 context,
56 data: CommentOnData::new(task),
57 }
58 }
59
60 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
61 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
62
63 Ok(Self { context, data })
64 }
65
66 pub async fn on_prepare(&mut self) -> Result<Status> {
67 match self.data.object_type {
68 CommentObjectType::Table | CommentObjectType::Column => {
69 self.prepare_table_or_column().await?;
70 }
71 CommentObjectType::Flow => {
72 self.prepare_flow().await?;
73 }
74 }
75
76 if self.data.is_unchanged {
78 let object_desc = match self.data.object_type {
79 CommentObjectType::Table => format!(
80 "table {}",
81 format_full_table_name(
82 &self.data.catalog_name,
83 &self.data.schema_name,
84 &self.data.object_name,
85 )
86 ),
87 CommentObjectType::Column => format!(
88 "column {}.{}",
89 format_full_table_name(
90 &self.data.catalog_name,
91 &self.data.schema_name,
92 &self.data.object_name,
93 ),
94 self.data.column_name.as_ref().unwrap()
95 ),
96 CommentObjectType::Flow => {
97 format!("flow {}.{}", self.data.catalog_name, self.data.object_name)
98 }
99 };
100 info!("Comment unchanged for {}, skipping update", object_desc);
101 return Ok(Status::done());
102 }
103
104 self.data.state = CommentOnState::UpdateMetadata;
105 Ok(Status::executing(true))
106 }
107
108 async fn prepare_table_or_column(&mut self) -> Result<()> {
109 let table_name_key = TableNameKey::new(
110 &self.data.catalog_name,
111 &self.data.schema_name,
112 &self.data.object_name,
113 );
114
115 let table_id = self
116 .context
117 .table_metadata_manager
118 .table_name_manager()
119 .get(table_name_key)
120 .await?
121 .with_context(|| TableNotFoundSnafu {
122 table_name: format_full_table_name(
123 &self.data.catalog_name,
124 &self.data.schema_name,
125 &self.data.object_name,
126 ),
127 })?
128 .table_id();
129
130 let table_info = self
131 .context
132 .table_metadata_manager
133 .table_info_manager()
134 .get(table_id)
135 .await?
136 .with_context(|| TableNotFoundSnafu {
137 table_name: format_full_table_name(
138 &self.data.catalog_name,
139 &self.data.schema_name,
140 &self.data.object_name,
141 ),
142 })?;
143
144 if self.data.object_type == CommentObjectType::Column {
146 let column_name = self.data.column_name.as_ref().unwrap();
147 let column_exists = table_info
148 .table_info
149 .meta
150 .schema
151 .column_schemas()
152 .iter()
153 .any(|col| &col.name == column_name);
154
155 ensure!(
156 column_exists,
157 ColumnNotFoundSnafu {
158 column_name,
159 column_id: 0u32, }
161 );
162 }
163
164 self.data.table_id = Some(table_id);
165
166 match self.data.object_type {
168 CommentObjectType::Table => {
169 let current_comment = &table_info.table_info.desc;
170 if &self.data.comment == current_comment {
171 self.data.is_unchanged = true;
172 }
173 }
174 CommentObjectType::Column => {
175 let column_name = self.data.column_name.as_ref().unwrap();
176 let column_schema = table_info
177 .table_info
178 .meta
179 .schema
180 .column_schemas()
181 .iter()
182 .find(|col| &col.name == column_name)
183 .unwrap(); let current_comment = column_schema.metadata().get(COLUMN_COMMENT_KEY);
186 if self.data.comment.as_deref() == current_comment.map(String::as_str) {
187 self.data.is_unchanged = true;
188 }
189 }
190 CommentObjectType::Flow => {
191 }
193 }
194
195 self.data.table_info = Some(table_info);
196
197 Ok(())
198 }
199
200 async fn prepare_flow(&mut self) -> Result<()> {
201 let flow_name_value = self
202 .context
203 .flow_metadata_manager
204 .flow_name_manager()
205 .get(&self.data.catalog_name, &self.data.object_name)
206 .await?
207 .with_context(|| FlowNotFoundSnafu {
208 flow_name: &self.data.object_name,
209 })?;
210
211 let flow_id = flow_name_value.flow_id();
212 let flow_info = self
213 .context
214 .flow_metadata_manager
215 .flow_info_manager()
216 .get_raw(flow_id)
217 .await?
218 .with_context(|| FlowNotFoundSnafu {
219 flow_name: &self.data.object_name,
220 })?;
221
222 self.data.flow_id = Some(flow_id);
223
224 let current_comment = &flow_info.get_inner_ref().comment;
226 let new_comment = self.data.comment.as_deref().unwrap_or("");
227 if new_comment == current_comment.as_str() {
228 self.data.is_unchanged = true;
229 }
230
231 self.data.flow_info = Some(flow_info);
232
233 Ok(())
234 }
235
236 pub async fn on_update_metadata(&mut self) -> Result<Status> {
237 match self.data.object_type {
238 CommentObjectType::Table => {
239 self.update_table_comment().await?;
240 }
241 CommentObjectType::Column => {
242 self.update_column_comment().await?;
243 }
244 CommentObjectType::Flow => {
245 self.update_flow_comment().await?;
246 }
247 }
248
249 self.data.state = CommentOnState::InvalidateCache;
250 Ok(Status::executing(true))
251 }
252
253 async fn update_table_comment(&mut self) -> Result<()> {
254 let table_info_value = self.data.table_info.as_ref().unwrap();
255 let mut new_table_info = table_info_value.table_info.clone();
256
257 new_table_info.desc = self.data.comment.clone();
258
259 sync_table_comment_option(
261 &mut new_table_info.meta.options,
262 new_table_info.desc.as_deref(),
263 );
264
265 self.update_table_info(table_info_value, new_table_info)
266 .await?;
267
268 info!(
269 "Updated comment for table {}.{}.{}",
270 self.data.catalog_name, self.data.schema_name, self.data.object_name
271 );
272
273 Ok(())
274 }
275
276 async fn update_column_comment(&mut self) -> Result<()> {
277 let table_info_value = self.data.table_info.as_ref().unwrap();
278 let mut new_table_info = table_info_value.table_info.clone();
279
280 let column_name = self.data.column_name.as_ref().unwrap();
281 let mut column_schemas = new_table_info.meta.schema.column_schemas().to_vec();
282 let column_schema = column_schemas
283 .iter_mut()
284 .find(|col| &col.name == column_name)
285 .unwrap(); update_column_comment_metadata(column_schema, self.data.comment.clone());
288
289 new_table_info.meta.schema = Arc::new(Schema::new_with_version(
290 column_schemas,
291 new_table_info.meta.schema.version(),
292 ));
293 self.update_table_info(table_info_value, new_table_info)
294 .await?;
295
296 info!(
297 "Updated comment for column {}.{}.{}.{}",
298 self.data.catalog_name, self.data.schema_name, self.data.object_name, column_name
299 );
300
301 Ok(())
302 }
303
304 async fn update_flow_comment(&mut self) -> Result<()> {
305 let flow_id = self.data.flow_id.unwrap();
306 let flow_info_value = self.data.flow_info.as_ref().unwrap();
307
308 let mut new_flow_info = flow_info_value.get_inner_ref().clone();
309 new_flow_info.comment = self.data.comment.clone().unwrap_or_default();
310 new_flow_info.updated_time = Utc::now();
311
312 let raw_value = new_flow_info.try_as_raw_value()?;
313
314 self.context
315 .table_metadata_manager
316 .kv_backend()
317 .put(
318 PutRequest::new()
319 .with_key(FlowInfoKey::new(flow_id).to_bytes())
320 .with_value(raw_value),
321 )
322 .await?;
323
324 info!(
325 "Updated comment for flow {}.{}",
326 self.data.catalog_name, self.data.object_name
327 );
328
329 Ok(())
330 }
331
332 async fn update_table_info(
333 &self,
334 current_table_info: &DeserializedValueWithBytes<TableInfoValue>,
335 new_table_info: TableInfo,
336 ) -> Result<()> {
337 let table_id = current_table_info.table_info.ident.table_id;
338 let new_table_info_value = current_table_info.update(new_table_info);
339 let raw_value = new_table_info_value.try_as_raw_value()?;
340
341 self.context
342 .table_metadata_manager
343 .kv_backend()
344 .put(
345 PutRequest::new()
346 .with_key(TableInfoKey::new(table_id).to_bytes())
347 .with_value(raw_value),
348 )
349 .await?;
350
351 Ok(())
352 }
353
354 pub async fn on_invalidate_cache(&mut self) -> Result<Status> {
355 let cache_invalidator = &self.context.cache_invalidator;
356
357 match self.data.object_type {
358 CommentObjectType::Table | CommentObjectType::Column => {
359 let table_id = self.data.table_id.unwrap();
360 let table_name = TableName::new(
361 self.data.catalog_name.clone(),
362 self.data.schema_name.clone(),
363 self.data.object_name.clone(),
364 );
365
366 let cache_ident = vec![
367 CacheIdent::TableId(table_id),
368 CacheIdent::TableName(table_name),
369 ];
370
371 cache_invalidator
372 .invalidate(&Context::default(), &cache_ident)
373 .await?;
374 }
375 CommentObjectType::Flow => {
376 let flow_id = self.data.flow_id.unwrap();
377 let cache_ident = vec![CacheIdent::FlowId(flow_id)];
378
379 cache_invalidator
380 .invalidate(&Context::default(), &cache_ident)
381 .await?;
382 }
383 }
384
385 Ok(Status::done())
386 }
387}
388
389#[async_trait]
390impl Procedure for CommentOnProcedure {
391 fn type_name(&self) -> &str {
392 Self::TYPE_NAME
393 }
394
395 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
396 match self.data.state {
397 CommentOnState::Prepare => self.on_prepare().await,
398 CommentOnState::UpdateMetadata => self.on_update_metadata().await,
399 CommentOnState::InvalidateCache => self.on_invalidate_cache().await,
400 }
401 .map_err(map_to_procedure_error)
402 }
403
404 fn dump(&self) -> ProcedureResult<String> {
405 serde_json::to_string(&self.data).context(ToJsonSnafu)
406 }
407
408 fn lock_key(&self) -> LockKey {
409 let catalog = &self.data.catalog_name;
410 let schema = &self.data.schema_name;
411
412 let lock_key = match self.data.object_type {
413 CommentObjectType::Table | CommentObjectType::Column => {
414 vec![
415 CatalogLock::Read(catalog).into(),
416 SchemaLock::read(catalog, schema).into(),
417 TableNameLock::new(catalog, schema, &self.data.object_name).into(),
418 ]
419 }
420 CommentObjectType::Flow => {
421 vec![
422 CatalogLock::Read(catalog).into(),
423 FlowNameLock::new(catalog, &self.data.object_name).into(),
424 ]
425 }
426 };
427
428 LockKey::new(lock_key)
429 }
430}
431
432#[derive(Debug, Serialize, Deserialize, AsRefStr)]
433enum CommentOnState {
434 Prepare,
435 UpdateMetadata,
436 InvalidateCache,
437}
438
439#[derive(Debug, Serialize, Deserialize)]
441pub struct CommentOnData {
442 state: CommentOnState,
443 catalog_name: String,
444 schema_name: String,
445 object_type: CommentObjectType,
446 object_name: String,
447 column_name: Option<String>,
449 comment: Option<String>,
450 #[serde(skip_serializing_if = "Option::is_none")]
452 table_id: Option<TableId>,
453 #[serde(skip)]
455 table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
456 #[serde(skip_serializing_if = "Option::is_none")]
458 flow_id: Option<FlowId>,
459 #[serde(skip)]
461 flow_info: Option<DeserializedValueWithBytes<FlowInfoValue>>,
462 #[serde(skip)]
464 is_unchanged: bool,
465}
466
467impl CommentOnData {
468 pub fn new(task: CommentOnTask) -> Self {
469 Self {
470 state: CommentOnState::Prepare,
471 catalog_name: task.catalog_name,
472 schema_name: task.schema_name,
473 object_type: task.object_type,
474 object_name: task.object_name,
475 column_name: task.column_name,
476 comment: task.comment,
477 table_id: None,
478 table_info: None,
479 flow_id: None,
480 flow_info: None,
481 is_unchanged: false,
482 }
483 }
484}
485
486fn update_column_comment_metadata(
487 column_schema: &mut datatypes::schema::ColumnSchema,
488 comment: Option<String>,
489) {
490 match comment {
491 Some(value) => {
492 column_schema
493 .mut_metadata()
494 .insert(COLUMN_COMMENT_KEY.to_string(), value);
495 }
496 None => {
497 column_schema.mut_metadata().remove(COLUMN_COMMENT_KEY);
498 }
499 }
500}
501
502fn sync_table_comment_option(options: &mut table::requests::TableOptions, comment: Option<&str>) {
503 match comment {
504 Some(value) => {
505 options
506 .extra_options
507 .insert(TABLE_COMMENT_KEY.to_string(), value.to_string());
508 }
509 None => {
510 options.extra_options.remove(TABLE_COMMENT_KEY);
511 }
512 }
513}