common_meta/ddl/
create_view.rs1use async_trait::async_trait;
16use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
17use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use strum::AsRefStr;
22use table::metadata::{RawTableInfo, TableId, TableType};
23use table::table_reference::TableReference;
24
25use crate::cache_invalidator::Context;
26use crate::ddl::utils::handle_retry_error;
27use crate::ddl::{DdlContext, TableMetadata};
28use crate::error::{self, Result};
29use crate::instruction::CacheIdent;
30use crate::key::table_name::TableNameKey;
31use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
32use crate::metrics;
33use crate::rpc::ddl::CreateViewTask;
34
35pub struct CreateViewProcedure {
37 pub context: DdlContext,
38 pub data: CreateViewData,
39}
40
41impl CreateViewProcedure {
42 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateView";
43
44 pub fn new(task: CreateViewTask, context: DdlContext) -> Self {
45 Self {
46 context,
47 data: CreateViewData {
48 state: CreateViewState::Prepare,
49 task,
50 need_update: false,
51 },
52 }
53 }
54
55 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
56 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
57
58 Ok(CreateViewProcedure { context, data })
59 }
60
61 fn view_info(&self) -> &RawTableInfo {
62 &self.data.task.view_info
63 }
64
65 fn need_update(&self) -> bool {
66 self.data.need_update
67 }
68
69 pub(crate) fn view_id(&self) -> TableId {
70 self.view_info().ident.table_id
71 }
72
73 #[cfg(any(test, feature = "testing"))]
74 pub fn set_allocated_metadata(&mut self, view_id: TableId) {
75 self.data.set_allocated_metadata(view_id, false)
76 }
77
78 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86 let expr = &self.data.task.create_view;
87 let view_name_value = self
88 .context
89 .table_metadata_manager
90 .table_name_manager()
91 .get(TableNameKey::new(
92 &expr.catalog_name,
93 &expr.schema_name,
94 &expr.view_name,
95 ))
96 .await?;
97
98 let mut view_id = None;
104
105 if let Some(value) = view_name_value {
106 ensure!(
107 expr.create_if_not_exists || expr.or_replace,
108 error::ViewAlreadyExistsSnafu {
109 view_name: self.data.table_ref().to_string(),
110 }
111 );
112
113 let exists_view_id = value.table_id();
114
115 if !expr.or_replace {
116 return Ok(Status::done_with_output(exists_view_id));
117 }
118 view_id = Some(exists_view_id);
119 }
120
121 if let Some(view_id) = view_id {
122 let view_info_value = self
123 .context
124 .table_metadata_manager
125 .table_info_manager()
126 .get(view_id)
127 .await?
128 .with_context(|| error::TableInfoNotFoundSnafu {
129 table: self.data.table_ref().to_string(),
130 })?;
131
132 ensure!(
134 view_info_value.table_info.table_type == TableType::View,
135 error::TableAlreadyExistsSnafu {
136 table_name: self.data.table_ref().to_string(),
137 }
138 );
139
140 self.data.set_allocated_metadata(view_id, true);
141 } else {
142 let TableMetadata { table_id, .. } = self
144 .context
145 .table_metadata_allocator
146 .create_view(&None)
147 .await?;
148 self.data.set_allocated_metadata(table_id, false);
149 }
150
151 self.data.state = CreateViewState::CreateMetadata;
152
153 Ok(Status::executing(true))
154 }
155
156 async fn invalidate_view_cache(&self) -> Result<()> {
157 let cache_invalidator = &self.context.cache_invalidator;
158 let ctx = Context {
159 subject: Some("Invalidate view cache by creating view".to_string()),
160 };
161
162 cache_invalidator
163 .invalidate(
164 &ctx,
165 &[
166 CacheIdent::TableName(self.data.table_ref().into()),
167 CacheIdent::TableId(self.view_id()),
168 ],
169 )
170 .await?;
171
172 Ok(())
173 }
174
175 async fn on_create_metadata(&mut self, ctx: &ProcedureContext) -> Result<Status> {
180 let view_id = self.view_id();
181 let manager = &self.context.table_metadata_manager;
182
183 if self.need_update() {
184 let current_view_info = manager
186 .view_info_manager()
187 .get(view_id)
188 .await?
189 .with_context(|| error::ViewNotFoundSnafu {
190 view_name: self.data.table_ref().to_string(),
191 })?;
192 let new_logical_plan = self.data.task.raw_logical_plan().clone();
193 let table_names = self.data.task.table_names();
194 let columns = self.data.task.columns().clone();
195 let plan_columns = self.data.task.plan_columns().clone();
196 let new_view_definition = self.data.task.view_definition().to_string();
197
198 manager
199 .update_view_info(
200 view_id,
201 ¤t_view_info,
202 new_logical_plan,
203 table_names,
204 columns,
205 plan_columns,
206 new_view_definition,
207 )
208 .await?;
209
210 info!("Updated view metadata for view {view_id}");
211 } else {
212 let raw_view_info = self.view_info().clone();
213 manager
214 .create_view_metadata(
215 raw_view_info,
216 self.data.task.raw_logical_plan().clone(),
217 self.data.task.table_names(),
218 self.data.task.columns().clone(),
219 self.data.task.plan_columns().clone(),
220 self.data.task.view_definition().to_string(),
221 )
222 .await?;
223
224 info!(
225 "Created view metadata for view {view_id} with procedure: {}",
226 ctx.procedure_id
227 );
228 }
229 self.invalidate_view_cache().await?;
230
231 Ok(Status::done_with_output(view_id))
232 }
233}
234
235#[async_trait]
236impl Procedure for CreateViewProcedure {
237 fn type_name(&self) -> &str {
238 Self::TYPE_NAME
239 }
240
241 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
242 let state = &self.data.state;
243
244 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_VIEW
245 .with_label_values(&[state.as_ref()])
246 .start_timer();
247
248 match state {
249 CreateViewState::Prepare => self.on_prepare().await,
250 CreateViewState::CreateMetadata => self.on_create_metadata(ctx).await,
251 }
252 .map_err(handle_retry_error)
253 }
254
255 fn dump(&self) -> ProcedureResult<String> {
256 serde_json::to_string(&self.data).context(ToJsonSnafu)
257 }
258
259 fn lock_key(&self) -> LockKey {
260 let table_ref = &self.data.table_ref();
261
262 LockKey::new(vec![
263 CatalogLock::Read(table_ref.catalog).into(),
264 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
265 TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
266 ])
267 }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
271pub enum CreateViewState {
272 Prepare,
274 CreateMetadata,
276}
277
278#[derive(Debug, Serialize, Deserialize)]
279pub struct CreateViewData {
280 pub state: CreateViewState,
281 pub task: CreateViewTask,
282 pub need_update: bool,
284}
285
286impl CreateViewData {
287 fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) {
288 self.task.view_info.ident.table_id = view_id;
289 self.need_update = need_update;
290 }
291
292 fn table_ref(&self) -> TableReference<'_> {
293 self.task.table_ref()
294 }
295}