cli/metadata/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use async_stream::try_stream;
18use common_catalog::consts::METRIC_ENGINE;
19use common_catalog::format_full_table_name;
20use common_meta::key::table_name::TableNameKey;
21use common_meta::key::table_route::TableRouteValue;
22use common_meta::key::TableMetadataManager;
23use common_meta::kv_backend::KvBackendRef;
24use futures::Stream;
25use snafu::{OptionExt, ResultExt};
26use store_api::storage::TableId;
27use table::metadata::RawTableInfo;
28
29use crate::error::{Result, TableMetadataSnafu, UnexpectedSnafu};
30
31/// The input for the iterator.
32pub enum IteratorInput {
33    TableIds(VecDeque<TableId>),
34    TableNames(VecDeque<(String, String, String)>),
35}
36
37impl IteratorInput {
38    /// Creates a new iterator input from a list of table ids.
39    pub fn new_table_ids(table_ids: Vec<TableId>) -> Self {
40        Self::TableIds(table_ids.into())
41    }
42
43    /// Creates a new iterator input from a list of table names.
44    pub fn new_table_names(table_names: Vec<(String, String, String)>) -> Self {
45        Self::TableNames(table_names.into())
46    }
47}
48
49/// An iterator for retrieving table metadata from the metadata store.
50///
51/// This struct provides functionality to iterate over table metadata based on
52/// either [`TableId`] and their associated regions or fully qualified table names.
53pub struct TableMetadataIterator {
54    input: IteratorInput,
55    table_metadata_manager: TableMetadataManager,
56}
57
58/// The full table metadata.
59pub struct FullTableMetadata {
60    pub table_id: TableId,
61    pub table_info: RawTableInfo,
62    pub table_route: TableRouteValue,
63}
64
65impl FullTableMetadata {
66    /// Returns true if it's [TableRouteValue::Physical].
67    pub fn is_physical_table(&self) -> bool {
68        self.table_route.is_physical()
69    }
70
71    /// Returns true if it's a metric engine table.
72    pub fn is_metric_engine(&self) -> bool {
73        self.table_info.meta.engine == METRIC_ENGINE
74    }
75
76    /// Returns the full table name.
77    pub fn full_table_name(&self) -> String {
78        format_full_table_name(
79            &self.table_info.catalog_name,
80            &self.table_info.schema_name,
81            &self.table_info.name,
82        )
83    }
84}
85
86impl TableMetadataIterator {
87    pub fn new(kvbackend: KvBackendRef, input: IteratorInput) -> Self {
88        let table_metadata_manager = TableMetadataManager::new(kvbackend);
89        Self {
90            input,
91            table_metadata_manager,
92        }
93    }
94
95    /// Returns the next table metadata.
96    ///
97    /// This method handles two types of inputs:
98    /// - TableIds: Returns metadata for a specific [`TableId`].
99    /// - TableNames: Returns metadata for a table identified by its full name (catalog.schema.table).
100    ///
101    /// Returns `None` when there are no more tables to process.
102    pub async fn next(&mut self) -> Result<Option<FullTableMetadata>> {
103        match &mut self.input {
104            IteratorInput::TableIds(table_ids) => {
105                if let Some(table_id) = table_ids.pop_front() {
106                    let full_table_metadata = self.get_table_metadata(table_id).await?;
107                    return Ok(Some(full_table_metadata));
108                }
109            }
110
111            IteratorInput::TableNames(table_names) => {
112                if let Some(full_table_name) = table_names.pop_front() {
113                    let table_id = self.get_table_id_by_name(full_table_name).await?;
114                    let full_table_metadata = self.get_table_metadata(table_id).await?;
115                    return Ok(Some(full_table_metadata));
116                }
117            }
118        }
119
120        Ok(None)
121    }
122
123    /// Converts the iterator into a stream of table metadata.
124    pub fn into_stream(mut self) -> impl Stream<Item = Result<FullTableMetadata>> {
125        try_stream!({
126            while let Some(full_table_metadata) = self.next().await? {
127                yield full_table_metadata;
128            }
129        })
130    }
131
132    async fn get_table_id_by_name(
133        &mut self,
134        (catalog_name, schema_name, table_name): (String, String, String),
135    ) -> Result<TableId> {
136        let key = TableNameKey::new(&catalog_name, &schema_name, &table_name);
137        let table_id = self
138            .table_metadata_manager
139            .table_name_manager()
140            .get(key)
141            .await
142            .context(TableMetadataSnafu)?
143            .with_context(|| UnexpectedSnafu {
144                msg: format!(
145                    "Table not found: {}",
146                    format_full_table_name(&catalog_name, &schema_name, &table_name)
147                ),
148            })?
149            .table_id();
150        Ok(table_id)
151    }
152
153    async fn get_table_metadata(&mut self, table_id: TableId) -> Result<FullTableMetadata> {
154        let (table_info, table_route) = self
155            .table_metadata_manager
156            .get_full_table_info(table_id)
157            .await
158            .context(TableMetadataSnafu)?;
159
160        let table_info = table_info
161            .with_context(|| UnexpectedSnafu {
162                msg: format!("Table info not found for table id: {table_id}"),
163            })?
164            .into_inner()
165            .table_info;
166        let table_route = table_route
167            .with_context(|| UnexpectedSnafu {
168                msg: format!("Table route not found for table id: {table_id}"),
169            })?
170            .into_inner();
171
172        Ok(FullTableMetadata {
173            table_id,
174            table_info,
175            table_route,
176        })
177    }
178}