1use std::collections::VecDeque;
16
17use async_stream::try_stream;
18use common_catalog::consts::METRIC_ENGINE;
19use common_catalog::format_full_table_name;
20use common_meta::key::table_name::TableNameKey;
21use common_meta::key::table_route::TableRouteValue;
22use common_meta::key::TableMetadataManager;
23use common_meta::kv_backend::KvBackendRef;
24use futures::Stream;
25use snafu::{OptionExt, ResultExt};
26use store_api::storage::TableId;
27use table::metadata::RawTableInfo;
28
29use crate::error::{Result, TableMetadataSnafu, UnexpectedSnafu};
30
31pub enum IteratorInput {
33 TableIds(VecDeque<TableId>),
34 TableNames(VecDeque<(String, String, String)>),
35}
36
37impl IteratorInput {
38 pub fn new_table_ids(table_ids: Vec<TableId>) -> Self {
40 Self::TableIds(table_ids.into())
41 }
42
43 pub fn new_table_names(table_names: Vec<(String, String, String)>) -> Self {
45 Self::TableNames(table_names.into())
46 }
47}
48
49pub struct TableMetadataIterator {
54 input: IteratorInput,
55 table_metadata_manager: TableMetadataManager,
56}
57
58pub struct FullTableMetadata {
60 pub table_id: TableId,
61 pub table_info: RawTableInfo,
62 pub table_route: TableRouteValue,
63}
64
65impl FullTableMetadata {
66 pub fn is_physical_table(&self) -> bool {
68 self.table_route.is_physical()
69 }
70
71 pub fn is_metric_engine(&self) -> bool {
73 self.table_info.meta.engine == METRIC_ENGINE
74 }
75
76 pub fn full_table_name(&self) -> String {
78 format_full_table_name(
79 &self.table_info.catalog_name,
80 &self.table_info.schema_name,
81 &self.table_info.name,
82 )
83 }
84}
85
86impl TableMetadataIterator {
87 pub fn new(kvbackend: KvBackendRef, input: IteratorInput) -> Self {
88 let table_metadata_manager = TableMetadataManager::new(kvbackend);
89 Self {
90 input,
91 table_metadata_manager,
92 }
93 }
94
95 pub async fn next(&mut self) -> Result<Option<FullTableMetadata>> {
103 match &mut self.input {
104 IteratorInput::TableIds(table_ids) => {
105 if let Some(table_id) = table_ids.pop_front() {
106 let full_table_metadata = self.get_table_metadata(table_id).await?;
107 return Ok(Some(full_table_metadata));
108 }
109 }
110
111 IteratorInput::TableNames(table_names) => {
112 if let Some(full_table_name) = table_names.pop_front() {
113 let table_id = self.get_table_id_by_name(full_table_name).await?;
114 let full_table_metadata = self.get_table_metadata(table_id).await?;
115 return Ok(Some(full_table_metadata));
116 }
117 }
118 }
119
120 Ok(None)
121 }
122
123 pub fn into_stream(mut self) -> impl Stream<Item = Result<FullTableMetadata>> {
125 try_stream!({
126 while let Some(full_table_metadata) = self.next().await? {
127 yield full_table_metadata;
128 }
129 })
130 }
131
132 async fn get_table_id_by_name(
133 &mut self,
134 (catalog_name, schema_name, table_name): (String, String, String),
135 ) -> Result<TableId> {
136 let key = TableNameKey::new(&catalog_name, &schema_name, &table_name);
137 let table_id = self
138 .table_metadata_manager
139 .table_name_manager()
140 .get(key)
141 .await
142 .context(TableMetadataSnafu)?
143 .with_context(|| UnexpectedSnafu {
144 msg: format!(
145 "Table not found: {}",
146 format_full_table_name(&catalog_name, &schema_name, &table_name)
147 ),
148 })?
149 .table_id();
150 Ok(table_id)
151 }
152
153 async fn get_table_metadata(&mut self, table_id: TableId) -> Result<FullTableMetadata> {
154 let (table_info, table_route) = self
155 .table_metadata_manager
156 .get_full_table_info(table_id)
157 .await
158 .context(TableMetadataSnafu)?;
159
160 let table_info = table_info
161 .with_context(|| UnexpectedSnafu {
162 msg: format!("Table info not found for table id: {table_id}"),
163 })?
164 .into_inner()
165 .table_info;
166 let table_route = table_route
167 .with_context(|| UnexpectedSnafu {
168 msg: format!("Table route not found for table id: {table_id}"),
169 })?
170 .into_inner();
171
172 Ok(FullTableMetadata {
173 table_id,
174 table_info,
175 table_route,
176 })
177 }
178}