common_meta/ddl/
create_view.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
17use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use strum::AsRefStr;
22use table::metadata::{RawTableInfo, TableId, TableType};
23use table::table_reference::TableReference;
24
25use crate::cache_invalidator::Context;
26use crate::ddl::utils::handle_retry_error;
27use crate::ddl::{DdlContext, TableMetadata};
28use crate::error::{self, Result};
29use crate::instruction::CacheIdent;
30use crate::key::table_name::TableNameKey;
31use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
32use crate::metrics;
33use crate::rpc::ddl::CreateViewTask;
34
35// The procedure to execute `[CreateViewTask]`.
36pub struct CreateViewProcedure {
37    pub context: DdlContext,
38    pub data: CreateViewData,
39}
40
41impl CreateViewProcedure {
42    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateView";
43
44    pub fn new(task: CreateViewTask, context: DdlContext) -> Self {
45        Self {
46            context,
47            data: CreateViewData {
48                state: CreateViewState::Prepare,
49                task,
50                need_update: false,
51            },
52        }
53    }
54
55    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
56        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
57
58        Ok(CreateViewProcedure { context, data })
59    }
60
61    fn view_info(&self) -> &RawTableInfo {
62        &self.data.task.view_info
63    }
64
65    fn need_update(&self) -> bool {
66        self.data.need_update
67    }
68
69    pub(crate) fn view_id(&self) -> TableId {
70        self.view_info().ident.table_id
71    }
72
73    #[cfg(any(test, feature = "testing"))]
74    pub fn set_allocated_metadata(&mut self, view_id: TableId) {
75        self.data.set_allocated_metadata(view_id, false)
76    }
77
78    /// On the prepare step, it performs:
79    /// - Checks whether the view exists.
80    /// - Allocates the view id.
81    ///
82    /// Abort(non-retry):
83    /// - ViewName exists and `create_if_not_exists` is false.
84    /// - Failed to allocate [ViewMetadata].
85    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
86        let expr = &self.data.task.create_view;
87        let view_name_value = self
88            .context
89            .table_metadata_manager
90            .table_name_manager()
91            .get(TableNameKey::new(
92                &expr.catalog_name,
93                &expr.schema_name,
94                &expr.view_name,
95            ))
96            .await?;
97
98        // If `view_id` is None, creating the new view,
99        // otherwise:
100        // - replaces the exists one when `or_replace` is true.
101        // - returns the exists one when `create_if_not_exists` is true.
102        // - throws the `[ViewAlreadyExistsSnafu]` error.
103        let mut view_id = None;
104
105        if let Some(value) = view_name_value {
106            ensure!(
107                expr.create_if_not_exists || expr.or_replace,
108                error::ViewAlreadyExistsSnafu {
109                    view_name: self.data.table_ref().to_string(),
110                }
111            );
112
113            let exists_view_id = value.table_id();
114
115            if !expr.or_replace {
116                return Ok(Status::done_with_output(exists_view_id));
117            }
118            view_id = Some(exists_view_id);
119        }
120
121        if let Some(view_id) = view_id {
122            let view_info_value = self
123                .context
124                .table_metadata_manager
125                .table_info_manager()
126                .get(view_id)
127                .await?
128                .with_context(|| error::TableInfoNotFoundSnafu {
129                    table: self.data.table_ref().to_string(),
130                })?;
131
132            // Ensure the exists one is view, we can't replace a table.
133            ensure!(
134                view_info_value.table_info.table_type == TableType::View,
135                error::TableAlreadyExistsSnafu {
136                    table_name: self.data.table_ref().to_string(),
137                }
138            );
139
140            self.data.set_allocated_metadata(view_id, true);
141        } else {
142            // Allocate the new `view_id`.
143            let TableMetadata { table_id, .. } = self
144                .context
145                .table_metadata_allocator
146                .create_view(&None)
147                .await?;
148            self.data.set_allocated_metadata(table_id, false);
149        }
150
151        self.data.state = CreateViewState::CreateMetadata;
152
153        Ok(Status::executing(true))
154    }
155
156    async fn invalidate_view_cache(&self) -> Result<()> {
157        let cache_invalidator = &self.context.cache_invalidator;
158        let ctx = Context {
159            subject: Some("Invalidate view cache by creating view".to_string()),
160        };
161
162        cache_invalidator
163            .invalidate(
164                &ctx,
165                &[
166                    CacheIdent::TableName(self.data.table_ref().into()),
167                    CacheIdent::TableId(self.view_id()),
168                ],
169            )
170            .await?;
171
172        Ok(())
173    }
174
175    /// Creates view metadata
176    ///
177    /// Abort(not-retry):
178    /// - Failed to create view metadata.
179    async fn on_create_metadata(&mut self, ctx: &ProcedureContext) -> Result<Status> {
180        let view_id = self.view_id();
181        let manager = &self.context.table_metadata_manager;
182
183        if self.need_update() {
184            // Retrieve the current view info and try to update it.
185            let current_view_info = manager
186                .view_info_manager()
187                .get(view_id)
188                .await?
189                .with_context(|| error::ViewNotFoundSnafu {
190                    view_name: self.data.table_ref().to_string(),
191                })?;
192            let new_logical_plan = self.data.task.raw_logical_plan().clone();
193            let table_names = self.data.task.table_names();
194            let columns = self.data.task.columns().clone();
195            let plan_columns = self.data.task.plan_columns().clone();
196            let new_view_definition = self.data.task.view_definition().to_string();
197
198            manager
199                .update_view_info(
200                    view_id,
201                    &current_view_info,
202                    new_logical_plan,
203                    table_names,
204                    columns,
205                    plan_columns,
206                    new_view_definition,
207                )
208                .await?;
209
210            info!("Updated view metadata for view {view_id}");
211        } else {
212            let raw_view_info = self.view_info().clone();
213            manager
214                .create_view_metadata(
215                    raw_view_info,
216                    self.data.task.raw_logical_plan().clone(),
217                    self.data.task.table_names(),
218                    self.data.task.columns().clone(),
219                    self.data.task.plan_columns().clone(),
220                    self.data.task.view_definition().to_string(),
221                )
222                .await?;
223
224            info!(
225                "Created view metadata for view {view_id} with procedure: {}",
226                ctx.procedure_id
227            );
228        }
229        self.invalidate_view_cache().await?;
230
231        Ok(Status::done_with_output(view_id))
232    }
233}
234
235#[async_trait]
236impl Procedure for CreateViewProcedure {
237    fn type_name(&self) -> &str {
238        Self::TYPE_NAME
239    }
240
241    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
242        let state = &self.data.state;
243
244        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_VIEW
245            .with_label_values(&[state.as_ref()])
246            .start_timer();
247
248        match state {
249            CreateViewState::Prepare => self.on_prepare().await,
250            CreateViewState::CreateMetadata => self.on_create_metadata(ctx).await,
251        }
252        .map_err(handle_retry_error)
253    }
254
255    fn dump(&self) -> ProcedureResult<String> {
256        serde_json::to_string(&self.data).context(ToJsonSnafu)
257    }
258
259    fn lock_key(&self) -> LockKey {
260        let table_ref = &self.data.table_ref();
261
262        LockKey::new(vec![
263            CatalogLock::Read(table_ref.catalog).into(),
264            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
265            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
266        ])
267    }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
271pub enum CreateViewState {
272    /// Prepares to create the table
273    Prepare,
274    /// Creates metadata
275    CreateMetadata,
276}
277
278#[derive(Debug, Serialize, Deserialize)]
279pub struct CreateViewData {
280    pub state: CreateViewState,
281    pub task: CreateViewTask,
282    /// Whether to update the view info.
283    pub need_update: bool,
284}
285
286impl CreateViewData {
287    fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) {
288        self.task.view_info.ident.table_id = view_id;
289        self.need_update = need_update;
290    }
291
292    fn table_ref(&self) -> TableReference<'_> {
293        self.task.table_ref()
294    }
295}