common_meta/ddl/create_logical_tables/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use common_telemetry::{info, warn};
18use itertools::Itertools;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
25use crate::ddl::utils::raw_table_info;
26use crate::error::{Result, TableInfoNotFoundSnafu};
27use crate::instruction::CacheIdent;
28
29impl CreateLogicalTablesProcedure {
30    pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31        if self.data.physical_columns.is_empty() {
32            warn!(
33                "No physical columns found, leaving the physical table's schema unchanged when creating logical tables"
34            );
35            return Ok(());
36        }
37
38        // Fetches old physical table's info
39        let physical_table_info = self
40            .context
41            .table_metadata_manager
42            .table_info_manager()
43            .get(self.data.physical_table_id)
44            .await?
45            .with_context(|| TableInfoNotFoundSnafu {
46                table: format!("table id - {}", self.data.physical_table_id),
47            })?;
48
49        // Generates new table info
50        let raw_table_info = physical_table_info.deref().table_info.clone();
51
52        let new_table_info = raw_table_info::build_new_physical_table_info(
53            raw_table_info,
54            &self.data.physical_columns,
55        );
56
57        let physical_table_name = TableName::new(
58            &new_table_info.catalog_name,
59            &new_table_info.schema_name,
60            &new_table_info.name,
61        );
62
63        // Update physical table's metadata and we don't need to touch per-region settings.
64        self.context
65            .table_metadata_manager
66            .update_table_info(&physical_table_info, None, new_table_info)
67            .await?;
68
69        // Invalid physical table cache
70        self.context
71            .cache_invalidator
72            .invalidate(
73                &Context::default(),
74                &[
75                    CacheIdent::TableId(self.data.physical_table_id),
76                    CacheIdent::TableName(physical_table_name),
77                ],
78            )
79            .await?;
80
81        Ok(())
82    }
83
84    pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result<Vec<TableId>> {
85        let remaining_tasks = self.data.remaining_tasks();
86        let num_tables = remaining_tasks.len();
87
88        if num_tables > 0 {
89            let chunk_size = self
90                .context
91                .table_metadata_manager
92                .create_logical_tables_metadata_chunk_size();
93            if num_tables > chunk_size {
94                let chunks = remaining_tasks
95                    .into_iter()
96                    .chunks(chunk_size)
97                    .into_iter()
98                    .map(|chunk| chunk.collect::<Vec<_>>())
99                    .collect::<Vec<_>>();
100                for chunk in chunks {
101                    self.context
102                        .table_metadata_manager
103                        .create_logical_tables_metadata(chunk)
104                        .await?;
105                }
106            } else {
107                self.context
108                    .table_metadata_manager
109                    .create_logical_tables_metadata(remaining_tasks)
110                    .await?;
111            }
112        }
113
114        // The `table_id` MUST be collected after the [Prepare::Prepare],
115        // ensures the all `table_id`s have been allocated.
116        let table_ids = self
117            .data
118            .tasks
119            .iter()
120            .map(|task| task.table_info.ident.table_id)
121            .collect::<Vec<_>>();
122
123        info!(
124            "Created {num_tables} tables {table_ids:?} metadata for physical table {}",
125            self.data.physical_table_id
126        );
127
128        Ok(table_ids)
129    }
130}