1use common_error::ext::BoxedError;
18use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
19use common_meta::key::table_name::{TableNameKey, TableNameManager};
20use datatypes::schema::ColumnDefaultConstraint;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use table::metadata::TableId;
24
25use crate::adapter::util::table_info_value_to_relation_desc;
26use crate::adapter::TableName;
27use crate::error::{
28 Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
29};
30use crate::repr::RelationDesc;
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct TableDesc {
35 pub relation_desc: RelationDesc,
36 pub default_values: Vec<Option<ColumnDefaultConstraint>>,
37}
38
39impl TableDesc {
40 pub fn new(
41 relation_desc: RelationDesc,
42 default_values: Vec<Option<ColumnDefaultConstraint>>,
43 ) -> Self {
44 Self {
45 relation_desc,
46 default_values,
47 }
48 }
49
50 pub fn new_no_default(relation_desc: RelationDesc) -> Self {
51 Self {
52 relation_desc,
53 default_values: vec![],
54 }
55 }
56}
57
58#[async_trait::async_trait]
60pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
61 async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error>;
62 async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;
63
64 async fn table(&self, name: &TableName) -> Result<TableDesc, Error> {
66 let id = self.table_id_from_name(name).await?;
67 self.table_from_id(&id).await
68 }
69 async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error>;
70}
71
72#[derive(Clone)]
74pub struct ManagedTableSource {
75 table_info_manager: TableInfoManager,
77 table_name_manager: TableNameManager,
78}
79
80#[async_trait::async_trait]
81impl FlowTableSource for ManagedTableSource {
82 async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
83 let table_info_value = self
84 .get_table_info_value(table_id)
85 .await?
86 .with_context(|| TableNotFoundSnafu {
87 name: format!("TableId = {:?}, Can't found table info", table_id),
88 })?;
89 let desc = table_info_value_to_relation_desc(table_info_value)?;
90
91 Ok(desc)
92 }
93 async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
94 self.get_table_name(table_id).await
95 }
96 async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
97 self.get_opt_table_id_from_name(name)
98 .await?
99 .with_context(|| TableNotFoundSnafu {
100 name: name.join("."),
101 })
102 }
103}
104
105impl ManagedTableSource {
106 pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
107 ManagedTableSource {
108 table_info_manager,
109 table_name_manager,
110 }
111 }
112
113 pub async fn get_time_index_column_from_table_id(
115 &self,
116 table_id: TableId,
117 ) -> Result<(usize, datatypes::schema::ColumnSchema), Error> {
118 let info = self
119 .table_info_manager
120 .get(table_id)
121 .await
122 .map_err(BoxedError::new)
123 .context(ExternalSnafu)?
124 .context(UnexpectedSnafu {
125 reason: format!("Table id = {:?}, couldn't found table info", table_id),
126 })?;
127 let raw_schema = &info.table_info.meta.schema;
128 let Some(ts_index) = raw_schema.timestamp_index else {
129 UnexpectedSnafu {
130 reason: format!("Table id = {:?}, couldn't found timestamp index", table_id),
131 }
132 .fail()?
133 };
134 let col_schema = raw_schema.column_schemas[ts_index].clone();
135 Ok((ts_index, col_schema))
136 }
137
138 pub async fn get_table_id_from_proto_name(
139 &self,
140 name: &greptime_proto::v1::TableName,
141 ) -> Result<TableId, Error> {
142 self.table_name_manager
143 .get(TableNameKey::new(
144 &name.catalog_name,
145 &name.schema_name,
146 &name.table_name,
147 ))
148 .await
149 .with_context(|_| TableNotFoundMetaSnafu {
150 msg: format!("Table name = {:?}, couldn't found table id", name),
151 })?
152 .with_context(|| UnexpectedSnafu {
153 reason: format!("Table name = {:?}, couldn't found table id", name),
154 })
155 .map(|id| id.table_id())
156 }
157
158 pub async fn get_opt_table_id_from_name(
160 &self,
161 name: &TableName,
162 ) -> Result<Option<TableId>, Error> {
163 let ret = self
164 .table_name_manager
165 .get(TableNameKey::new(&name[0], &name[1], &name[2]))
166 .await
167 .with_context(|_| TableNotFoundMetaSnafu {
168 msg: format!("Table name = {:?}, couldn't found table id", name),
169 })?
170 .map(|id| id.table_id());
171 Ok(ret)
172 }
173
174 pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
176 self.table_info_manager
177 .get(*table_id)
178 .await
179 .map_err(BoxedError::new)
180 .context(ExternalSnafu)?
181 .with_context(|| UnexpectedSnafu {
182 reason: format!("Table id = {:?}, couldn't found table name", table_id),
183 })
184 .map(|name| name.table_name())
185 .map(|name| [name.catalog_name, name.schema_name, name.table_name])
186 }
187
188 pub async fn get_table_info_value(
190 &self,
191 table_id: &TableId,
192 ) -> Result<Option<TableInfoValue>, Error> {
193 Ok(self
194 .table_info_manager
195 .get(*table_id)
196 .await
197 .with_context(|_| TableNotFoundMetaSnafu {
198 msg: format!("TableId = {:?}, couldn't found table name", table_id),
199 })?
200 .map(|v| v.into_inner()))
201 }
202
203 pub async fn get_table_name_schema(
204 &self,
205 table_id: &TableId,
206 ) -> Result<(TableName, TableDesc), Error> {
207 let table_info_value = self
208 .get_table_info_value(table_id)
209 .await?
210 .with_context(|| TableNotFoundSnafu {
211 name: format!("TableId = {:?}, Can't found table info", table_id),
212 })?;
213
214 let table_name = table_info_value.table_name();
215 let table_name = [
216 table_name.catalog_name,
217 table_name.schema_name,
218 table_name.table_name,
219 ];
220
221 let desc = table_info_value_to_relation_desc(table_info_value)?;
222 Ok((table_name, desc))
223 }
224
225 pub async fn check_table_exist(&self, table_id: &TableId) -> Result<bool, Error> {
226 self.table_info_manager
227 .exists(*table_id)
228 .await
229 .map_err(BoxedError::new)
230 .context(ExternalSnafu)
231 }
232}
233
234impl std::fmt::Debug for ManagedTableSource {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 f.debug_struct("KvBackendTableSource").finish()
237 }
238}
239
240#[cfg(test)]
241pub(crate) mod test {
242 use std::collections::HashMap;
243
244 use datatypes::data_type::ConcreteDataType as CDT;
245
246 use super::*;
247 use crate::repr::{ColumnType, RelationType};
248
249 pub struct FlowDummyTableSource {
250 pub id_names_to_desc: Vec<(TableId, TableName, TableDesc)>,
251 id_to_idx: HashMap<TableId, usize>,
252 name_to_idx: HashMap<TableName, usize>,
253 }
254
255 impl Default for FlowDummyTableSource {
256 fn default() -> Self {
257 let id_names_to_desc = vec![
258 (
259 1024,
260 [
261 "greptime".to_string(),
262 "public".to_string(),
263 "numbers".to_string(),
264 ],
265 TableDesc::new_no_default(
266 RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
267 .into_named(vec![Some("number".to_string())]),
268 ),
269 ),
270 (
271 1025,
272 [
273 "greptime".to_string(),
274 "public".to_string(),
275 "numbers_with_ts".to_string(),
276 ],
277 TableDesc::new_no_default(
278 RelationType::new(vec![
279 ColumnType::new(CDT::uint32_datatype(), false),
280 ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
281 ])
282 .into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
283 ),
284 ),
285 ];
286 let id_to_idx = id_names_to_desc
287 .iter()
288 .enumerate()
289 .map(|(idx, (id, _name, _desc))| (*id, idx))
290 .collect();
291 let name_to_idx = id_names_to_desc
292 .iter()
293 .enumerate()
294 .map(|(idx, (_id, name, _desc))| (name.clone(), idx))
295 .collect();
296 Self {
297 id_names_to_desc,
298 id_to_idx,
299 name_to_idx,
300 }
301 }
302 }
303
304 #[async_trait::async_trait]
305 impl FlowTableSource for FlowDummyTableSource {
306 async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
307 let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
308 name: format!("Table id = {:?}, couldn't found table desc", table_id),
309 })?;
310 let desc = self
311 .id_names_to_desc
312 .get(*idx)
313 .map(|x| x.2.clone())
314 .context(TableNotFoundSnafu {
315 name: format!("Table id = {:?}, couldn't found table desc", table_id),
316 })?;
317 Ok(desc)
318 }
319
320 async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
321 let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
322 name: format!("Table id = {:?}, couldn't found table desc", table_id),
323 })?;
324 self.id_names_to_desc
325 .get(*idx)
326 .map(|x| x.1.clone())
327 .context(TableNotFoundSnafu {
328 name: format!("Table id = {:?}, couldn't found table desc", table_id),
329 })
330 }
331
332 async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
333 for (id, table_name, _desc) in &self.id_names_to_desc {
334 if name == table_name {
335 return Ok(*id);
336 }
337 }
338 TableNotFoundSnafu {
339 name: format!("Table name = {:?}, couldn't found table id", name),
340 }
341 .fail()?
342 }
343 }
344
345 impl std::fmt::Debug for FlowDummyTableSource {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 f.debug_struct("DummyTableSource").finish()
348 }
349 }
350}