common_meta/ddl/
drop_view.rs1use async_trait::async_trait;
16use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
17use common_procedure::{
18 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
19};
20use common_telemetry::info;
21use serde::{Deserialize, Serialize};
22use snafu::{ensure, OptionExt, ResultExt};
23use strum::AsRefStr;
24use table::metadata::{RawTableInfo, TableId, TableType};
25use table::table_reference::TableReference;
26
27use crate::cache_invalidator::Context;
28use crate::ddl::utils::handle_retry_error;
29use crate::ddl::DdlContext;
30use crate::error::{self, Result};
31use crate::instruction::CacheIdent;
32use crate::key::table_name::TableNameKey;
33use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
34use crate::metrics;
35use crate::rpc::ddl::DropViewTask;
36
37pub struct DropViewProcedure {
39 pub(crate) context: DdlContext,
41 pub(crate) data: DropViewData,
43}
44
45impl DropViewProcedure {
46 pub const TYPE_NAME: &'static str = "metasrv-procedure::DropView";
47
48 pub fn new(task: DropViewTask, context: DdlContext) -> Self {
49 Self {
50 context,
51 data: DropViewData {
52 state: DropViewState::Prepare,
53 task,
54 },
55 }
56 }
57
58 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
59 let data: DropViewData = serde_json::from_str(json).context(FromJsonSnafu)?;
60
61 Ok(Self { context, data })
62 }
63
64 #[cfg(test)]
65 pub(crate) fn state(&self) -> DropViewState {
66 self.data.state
67 }
68
69 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
73 let table_ref = self.data.table_ref();
74
75 let exists = self
76 .context
77 .table_metadata_manager
78 .table_name_manager()
79 .exists(TableNameKey::new(
80 table_ref.catalog,
81 table_ref.schema,
82 table_ref.table,
83 ))
84 .await?;
85
86 if !exists && self.data.task.drop_if_exists {
87 return Ok(Status::done());
88 }
89
90 ensure!(
91 exists,
92 error::ViewNotFoundSnafu {
93 view_name: table_ref.to_string(),
94 }
95 );
96
97 self.check_view_metadata().await?;
98 self.data.state = DropViewState::DeleteMetadata;
99
100 Ok(Status::executing(true))
101 }
102
103 async fn check_view_metadata(&mut self) -> Result<()> {
104 let view_id = self.data.view_id();
105 let table_info_value = self
106 .context
107 .table_metadata_manager
108 .table_info_manager()
109 .get(view_id)
110 .await?
111 .with_context(|| error::TableInfoNotFoundSnafu {
112 table: self.data.table_ref().to_string(),
113 })?;
114
115 self.ensure_is_view(&table_info_value.table_info)?;
116 self.ensure_view_info_exists(view_id).await?;
117
118 Ok(())
119 }
120
121 fn ensure_is_view(&self, table_info: &RawTableInfo) -> Result<()> {
122 ensure!(
123 table_info.table_type == TableType::View,
124 error::InvalidViewInfoSnafu {
125 err_msg: format!("{} is not a view", self.data.table_ref()),
126 }
127 );
128 Ok(())
129 }
130
131 async fn ensure_view_info_exists(&self, view_id: TableId) -> Result<()> {
132 self.context
133 .table_metadata_manager
134 .view_info_manager()
135 .get(view_id)
136 .await?
137 .with_context(|| error::ViewNotFoundSnafu {
138 view_name: self.data.table_ref().to_string(),
139 })?;
140 Ok(())
141 }
142
143 async fn on_delete_metadata(&mut self) -> Result<Status> {
144 let view_id = self.data.view_id();
145 self.context
146 .table_metadata_manager
147 .destroy_view_info(view_id, &self.data.table_ref().into())
148 .await?;
149
150 info!("Deleted view metadata for view {view_id}");
151
152 self.data.state = DropViewState::InvalidateViewCache;
153 Ok(Status::executing(true))
154 }
155
156 async fn on_broadcast(&mut self) -> Result<Status> {
157 let view_id = self.data.view_id();
158 let ctx = Context {
159 subject: Some("Invalidate view cache by dropping view".to_string()),
160 };
161
162 self.context
163 .cache_invalidator
164 .invalidate(
165 &ctx,
166 &[
167 CacheIdent::TableId(view_id),
168 CacheIdent::TableName(self.data.table_ref().into()),
169 ],
170 )
171 .await?;
172
173 Ok(Status::done())
174 }
175}
176
177#[async_trait]
178impl Procedure for DropViewProcedure {
179 fn type_name(&self) -> &str {
180 Self::TYPE_NAME
181 }
182
183 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
184 let state = &self.data.state;
185 let _timer = metrics::METRIC_META_PROCEDURE_DROP_VIEW
186 .with_label_values(&[state.as_ref()])
187 .start_timer();
188
189 match self.data.state {
190 DropViewState::Prepare => self.on_prepare().await,
191 DropViewState::DeleteMetadata => self.on_delete_metadata().await,
192 DropViewState::InvalidateViewCache => self.on_broadcast().await,
193 }
194 .map_err(handle_retry_error)
195 }
196
197 fn dump(&self) -> ProcedureResult<String> {
198 serde_json::to_string(&self.data).context(ToJsonSnafu)
199 }
200
201 fn lock_key(&self) -> LockKey {
202 let table_ref = &self.data.table_ref();
203 let view_id = self.data.view_id();
204 let lock_key = vec![
205 CatalogLock::Read(table_ref.catalog).into(),
206 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
207 TableLock::Write(view_id).into(),
208 ];
209
210 LockKey::new(lock_key)
211 }
212}
213
214#[derive(Debug, Serialize, Deserialize)]
216pub(crate) struct DropViewData {
217 state: DropViewState,
218 task: DropViewTask,
219}
220
221impl DropViewData {
222 fn table_ref(&self) -> TableReference {
223 self.task.table_ref()
224 }
225
226 fn view_id(&self) -> TableId {
227 self.task.view_id
228 }
229}
230
231#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq, Clone, Copy)]
233pub(crate) enum DropViewState {
234 Prepare,
236 DeleteMetadata,
238 InvalidateViewCache,
240}