1use async_trait::async_trait;
16use chrono::Utc;
17use common_catalog::format_full_table_name;
18use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
19use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
20use common_telemetry::tracing::info;
21use datatypes::schema::COMMENT_KEY as COLUMN_COMMENT_KEY;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use store_api::storage::TableId;
25use strum::AsRefStr;
26use table::metadata::RawTableInfo;
27use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY;
28use table::table_name::TableName;
29
30use crate::cache_invalidator::Context;
31use crate::ddl::DdlContext;
32use crate::ddl::utils::map_to_procedure_error;
33use crate::error::{ColumnNotFoundSnafu, FlowNotFoundSnafu, Result, TableNotFoundSnafu};
34use crate::instruction::CacheIdent;
35use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue};
36use crate::key::table_info::{TableInfoKey, TableInfoValue};
37use crate::key::table_name::TableNameKey;
38use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue};
39use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
40use crate::rpc::ddl::{CommentObjectType, CommentOnTask};
41use crate::rpc::store::PutRequest;
42
43pub struct CommentOnProcedure {
44 pub context: DdlContext,
45 pub data: CommentOnData,
46}
47
48impl CommentOnProcedure {
49 pub const TYPE_NAME: &'static str = "metasrv-procedure::CommentOn";
50
51 pub fn new(task: CommentOnTask, context: DdlContext) -> Self {
52 Self {
53 context,
54 data: CommentOnData::new(task),
55 }
56 }
57
58 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
59 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
60
61 Ok(Self { context, data })
62 }
63
64 pub async fn on_prepare(&mut self) -> Result<Status> {
65 match self.data.object_type {
66 CommentObjectType::Table | CommentObjectType::Column => {
67 self.prepare_table_or_column().await?;
68 }
69 CommentObjectType::Flow => {
70 self.prepare_flow().await?;
71 }
72 }
73
74 if self.data.is_unchanged {
76 let object_desc = match self.data.object_type {
77 CommentObjectType::Table => format!(
78 "table {}",
79 format_full_table_name(
80 &self.data.catalog_name,
81 &self.data.schema_name,
82 &self.data.object_name,
83 )
84 ),
85 CommentObjectType::Column => format!(
86 "column {}.{}",
87 format_full_table_name(
88 &self.data.catalog_name,
89 &self.data.schema_name,
90 &self.data.object_name,
91 ),
92 self.data.column_name.as_ref().unwrap()
93 ),
94 CommentObjectType::Flow => {
95 format!("flow {}.{}", self.data.catalog_name, self.data.object_name)
96 }
97 };
98 info!("Comment unchanged for {}, skipping update", object_desc);
99 return Ok(Status::done());
100 }
101
102 self.data.state = CommentOnState::UpdateMetadata;
103 Ok(Status::executing(true))
104 }
105
106 async fn prepare_table_or_column(&mut self) -> Result<()> {
107 let table_name_key = TableNameKey::new(
108 &self.data.catalog_name,
109 &self.data.schema_name,
110 &self.data.object_name,
111 );
112
113 let table_id = self
114 .context
115 .table_metadata_manager
116 .table_name_manager()
117 .get(table_name_key)
118 .await?
119 .with_context(|| TableNotFoundSnafu {
120 table_name: format_full_table_name(
121 &self.data.catalog_name,
122 &self.data.schema_name,
123 &self.data.object_name,
124 ),
125 })?
126 .table_id();
127
128 let table_info = self
129 .context
130 .table_metadata_manager
131 .table_info_manager()
132 .get(table_id)
133 .await?
134 .with_context(|| TableNotFoundSnafu {
135 table_name: format_full_table_name(
136 &self.data.catalog_name,
137 &self.data.schema_name,
138 &self.data.object_name,
139 ),
140 })?;
141
142 if self.data.object_type == CommentObjectType::Column {
144 let column_name = self.data.column_name.as_ref().unwrap();
145 let column_exists = table_info
146 .table_info
147 .meta
148 .schema
149 .column_schemas
150 .iter()
151 .any(|col| &col.name == column_name);
152
153 ensure!(
154 column_exists,
155 ColumnNotFoundSnafu {
156 column_name,
157 column_id: 0u32, }
159 );
160 }
161
162 self.data.table_id = Some(table_id);
163
164 match self.data.object_type {
166 CommentObjectType::Table => {
167 let current_comment = &table_info.table_info.desc;
168 if &self.data.comment == current_comment {
169 self.data.is_unchanged = true;
170 }
171 }
172 CommentObjectType::Column => {
173 let column_name = self.data.column_name.as_ref().unwrap();
174 let column_schema = table_info
175 .table_info
176 .meta
177 .schema
178 .column_schemas
179 .iter()
180 .find(|col| &col.name == column_name)
181 .unwrap(); let current_comment = column_schema.metadata().get(COLUMN_COMMENT_KEY);
184 if self.data.comment.as_deref() == current_comment.map(String::as_str) {
185 self.data.is_unchanged = true;
186 }
187 }
188 CommentObjectType::Flow => {
189 }
191 }
192
193 self.data.table_info = Some(table_info);
194
195 Ok(())
196 }
197
198 async fn prepare_flow(&mut self) -> Result<()> {
199 let flow_name_value = self
200 .context
201 .flow_metadata_manager
202 .flow_name_manager()
203 .get(&self.data.catalog_name, &self.data.object_name)
204 .await?
205 .with_context(|| FlowNotFoundSnafu {
206 flow_name: &self.data.object_name,
207 })?;
208
209 let flow_id = flow_name_value.flow_id();
210 let flow_info = self
211 .context
212 .flow_metadata_manager
213 .flow_info_manager()
214 .get_raw(flow_id)
215 .await?
216 .with_context(|| FlowNotFoundSnafu {
217 flow_name: &self.data.object_name,
218 })?;
219
220 self.data.flow_id = Some(flow_id);
221
222 let current_comment = &flow_info.get_inner_ref().comment;
224 let new_comment = self.data.comment.as_deref().unwrap_or("");
225 if new_comment == current_comment.as_str() {
226 self.data.is_unchanged = true;
227 }
228
229 self.data.flow_info = Some(flow_info);
230
231 Ok(())
232 }
233
234 pub async fn on_update_metadata(&mut self) -> Result<Status> {
235 match self.data.object_type {
236 CommentObjectType::Table => {
237 self.update_table_comment().await?;
238 }
239 CommentObjectType::Column => {
240 self.update_column_comment().await?;
241 }
242 CommentObjectType::Flow => {
243 self.update_flow_comment().await?;
244 }
245 }
246
247 self.data.state = CommentOnState::InvalidateCache;
248 Ok(Status::executing(true))
249 }
250
251 async fn update_table_comment(&mut self) -> Result<()> {
252 let table_info_value = self.data.table_info.as_ref().unwrap();
253 let mut new_table_info = table_info_value.table_info.clone();
254
255 new_table_info.desc = self.data.comment.clone();
256
257 sync_table_comment_option(
259 &mut new_table_info.meta.options,
260 new_table_info.desc.as_deref(),
261 );
262
263 self.update_table_info(table_info_value, new_table_info)
264 .await?;
265
266 info!(
267 "Updated comment for table {}.{}.{}",
268 self.data.catalog_name, self.data.schema_name, self.data.object_name
269 );
270
271 Ok(())
272 }
273
274 async fn update_column_comment(&mut self) -> Result<()> {
275 let table_info_value = self.data.table_info.as_ref().unwrap();
276 let mut new_table_info = table_info_value.table_info.clone();
277
278 let column_name = self.data.column_name.as_ref().unwrap();
279 let column_schema = new_table_info
280 .meta
281 .schema
282 .column_schemas
283 .iter_mut()
284 .find(|col| &col.name == column_name)
285 .unwrap(); update_column_comment_metadata(column_schema, self.data.comment.clone());
288
289 self.update_table_info(table_info_value, new_table_info)
290 .await?;
291
292 info!(
293 "Updated comment for column {}.{}.{}.{}",
294 self.data.catalog_name, self.data.schema_name, self.data.object_name, column_name
295 );
296
297 Ok(())
298 }
299
300 async fn update_flow_comment(&mut self) -> Result<()> {
301 let flow_id = self.data.flow_id.unwrap();
302 let flow_info_value = self.data.flow_info.as_ref().unwrap();
303
304 let mut new_flow_info = flow_info_value.get_inner_ref().clone();
305 new_flow_info.comment = self.data.comment.clone().unwrap_or_default();
306 new_flow_info.updated_time = Utc::now();
307
308 let raw_value = new_flow_info.try_as_raw_value()?;
309
310 self.context
311 .table_metadata_manager
312 .kv_backend()
313 .put(
314 PutRequest::new()
315 .with_key(FlowInfoKey::new(flow_id).to_bytes())
316 .with_value(raw_value),
317 )
318 .await?;
319
320 info!(
321 "Updated comment for flow {}.{}",
322 self.data.catalog_name, self.data.object_name
323 );
324
325 Ok(())
326 }
327
328 async fn update_table_info(
329 &self,
330 current_table_info: &DeserializedValueWithBytes<TableInfoValue>,
331 new_table_info: RawTableInfo,
332 ) -> Result<()> {
333 let table_id = current_table_info.table_info.ident.table_id;
334 let new_table_info_value = current_table_info.update(new_table_info);
335 let raw_value = new_table_info_value.try_as_raw_value()?;
336
337 self.context
338 .table_metadata_manager
339 .kv_backend()
340 .put(
341 PutRequest::new()
342 .with_key(TableInfoKey::new(table_id).to_bytes())
343 .with_value(raw_value),
344 )
345 .await?;
346
347 Ok(())
348 }
349
350 pub async fn on_invalidate_cache(&mut self) -> Result<Status> {
351 let cache_invalidator = &self.context.cache_invalidator;
352
353 match self.data.object_type {
354 CommentObjectType::Table | CommentObjectType::Column => {
355 let table_id = self.data.table_id.unwrap();
356 let table_name = TableName::new(
357 self.data.catalog_name.clone(),
358 self.data.schema_name.clone(),
359 self.data.object_name.clone(),
360 );
361
362 let cache_ident = vec![
363 CacheIdent::TableId(table_id),
364 CacheIdent::TableName(table_name),
365 ];
366
367 cache_invalidator
368 .invalidate(&Context::default(), &cache_ident)
369 .await?;
370 }
371 CommentObjectType::Flow => {
372 let flow_id = self.data.flow_id.unwrap();
373 let cache_ident = vec![CacheIdent::FlowId(flow_id)];
374
375 cache_invalidator
376 .invalidate(&Context::default(), &cache_ident)
377 .await?;
378 }
379 }
380
381 Ok(Status::done())
382 }
383}
384
385#[async_trait]
386impl Procedure for CommentOnProcedure {
387 fn type_name(&self) -> &str {
388 Self::TYPE_NAME
389 }
390
391 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
392 match self.data.state {
393 CommentOnState::Prepare => self.on_prepare().await,
394 CommentOnState::UpdateMetadata => self.on_update_metadata().await,
395 CommentOnState::InvalidateCache => self.on_invalidate_cache().await,
396 }
397 .map_err(map_to_procedure_error)
398 }
399
400 fn dump(&self) -> ProcedureResult<String> {
401 serde_json::to_string(&self.data).context(ToJsonSnafu)
402 }
403
404 fn lock_key(&self) -> LockKey {
405 let catalog = &self.data.catalog_name;
406 let schema = &self.data.schema_name;
407
408 let lock_key = match self.data.object_type {
409 CommentObjectType::Table | CommentObjectType::Column => {
410 vec![
411 CatalogLock::Read(catalog).into(),
412 SchemaLock::read(catalog, schema).into(),
413 TableNameLock::new(catalog, schema, &self.data.object_name).into(),
414 ]
415 }
416 CommentObjectType::Flow => {
417 vec![
418 CatalogLock::Read(catalog).into(),
419 FlowNameLock::new(catalog, &self.data.object_name).into(),
420 ]
421 }
422 };
423
424 LockKey::new(lock_key)
425 }
426}
427
428#[derive(Debug, Serialize, Deserialize, AsRefStr)]
429enum CommentOnState {
430 Prepare,
431 UpdateMetadata,
432 InvalidateCache,
433}
434
435#[derive(Debug, Serialize, Deserialize)]
437pub struct CommentOnData {
438 state: CommentOnState,
439 catalog_name: String,
440 schema_name: String,
441 object_type: CommentObjectType,
442 object_name: String,
443 column_name: Option<String>,
445 comment: Option<String>,
446 #[serde(skip_serializing_if = "Option::is_none")]
448 table_id: Option<TableId>,
449 #[serde(skip)]
451 table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
452 #[serde(skip_serializing_if = "Option::is_none")]
454 flow_id: Option<FlowId>,
455 #[serde(skip)]
457 flow_info: Option<DeserializedValueWithBytes<FlowInfoValue>>,
458 #[serde(skip)]
460 is_unchanged: bool,
461}
462
463impl CommentOnData {
464 pub fn new(task: CommentOnTask) -> Self {
465 Self {
466 state: CommentOnState::Prepare,
467 catalog_name: task.catalog_name,
468 schema_name: task.schema_name,
469 object_type: task.object_type,
470 object_name: task.object_name,
471 column_name: task.column_name,
472 comment: task.comment,
473 table_id: None,
474 table_info: None,
475 flow_id: None,
476 flow_info: None,
477 is_unchanged: false,
478 }
479 }
480}
481
482fn update_column_comment_metadata(
483 column_schema: &mut datatypes::schema::ColumnSchema,
484 comment: Option<String>,
485) {
486 match comment {
487 Some(value) => {
488 column_schema
489 .mut_metadata()
490 .insert(COLUMN_COMMENT_KEY.to_string(), value);
491 }
492 None => {
493 column_schema.mut_metadata().remove(COLUMN_COMMENT_KEY);
494 }
495 }
496}
497
498fn sync_table_comment_option(options: &mut table::requests::TableOptions, comment: Option<&str>) {
499 match comment {
500 Some(value) => {
501 options
502 .extra_options
503 .insert(TABLE_COMMENT_KEY.to_string(), value.to_string());
504 }
505 None => {
506 options.extra_options.remove(TABLE_COMMENT_KEY);
507 }
508 }
509}