common_meta/ddl/create_logical_tables/
update_metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use common_telemetry::{info, warn};
18use itertools::Itertools;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
25use crate::ddl::physical_table_metadata;
26use crate::error::{Result, TableInfoNotFoundSnafu};
27use crate::instruction::CacheIdent;
28
29impl CreateLogicalTablesProcedure {
30    pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31        if self.data.physical_columns.is_empty() {
32            warn!("No physical columns found, leaving the physical table's schema unchanged when creating logical tables");
33            return Ok(());
34        }
35
36        // Fetches old physical table's info
37        let physical_table_info = self
38            .context
39            .table_metadata_manager
40            .table_info_manager()
41            .get(self.data.physical_table_id)
42            .await?
43            .with_context(|| TableInfoNotFoundSnafu {
44                table: format!("table id - {}", self.data.physical_table_id),
45            })?;
46
47        // Generates new table info
48        let raw_table_info = physical_table_info.deref().table_info.clone();
49
50        let new_table_info = physical_table_metadata::build_new_physical_table_info(
51            raw_table_info,
52            &self.data.physical_columns,
53        );
54
55        let physical_table_name = TableName::new(
56            &new_table_info.catalog_name,
57            &new_table_info.schema_name,
58            &new_table_info.name,
59        );
60
61        // Update physical table's metadata and we don't need to touch per-region settings.
62        self.context
63            .table_metadata_manager
64            .update_table_info(&physical_table_info, None, new_table_info)
65            .await?;
66
67        // Invalid physical table cache
68        self.context
69            .cache_invalidator
70            .invalidate(
71                &Context::default(),
72                &[
73                    CacheIdent::TableId(self.data.physical_table_id),
74                    CacheIdent::TableName(physical_table_name),
75                ],
76            )
77            .await?;
78
79        Ok(())
80    }
81
82    pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result<Vec<TableId>> {
83        let remaining_tasks = self.data.remaining_tasks();
84        let num_tables = remaining_tasks.len();
85
86        if num_tables > 0 {
87            let chunk_size = self
88                .context
89                .table_metadata_manager
90                .create_logical_tables_metadata_chunk_size();
91            if num_tables > chunk_size {
92                let chunks = remaining_tasks
93                    .into_iter()
94                    .chunks(chunk_size)
95                    .into_iter()
96                    .map(|chunk| chunk.collect::<Vec<_>>())
97                    .collect::<Vec<_>>();
98                for chunk in chunks {
99                    self.context
100                        .table_metadata_manager
101                        .create_logical_tables_metadata(chunk)
102                        .await?;
103                }
104            } else {
105                self.context
106                    .table_metadata_manager
107                    .create_logical_tables_metadata(remaining_tasks)
108                    .await?;
109            }
110        }
111
112        // The `table_id` MUST be collected after the [Prepare::Prepare],
113        // ensures the all `table_id`s have been allocated.
114        let table_ids = self
115            .data
116            .tasks
117            .iter()
118            .map(|task| task.table_info.ident.table_id)
119            .collect::<Vec<_>>();
120
121        info!(
122            "Created {num_tables} tables {table_ids:?} metadata for physical table {}",
123            self.data.physical_table_id
124        );
125
126        Ok(table_ids)
127    }
128}