flow/adapter/
table_source.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! How to query table information from database
16
17use common_error::ext::BoxedError;
18use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
19use common_meta::key::table_name::{TableNameKey, TableNameManager};
20use datatypes::schema::ColumnDefaultConstraint;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use table::metadata::TableId;
24
25use crate::adapter::util::table_info_value_to_relation_desc;
26use crate::adapter::TableName;
27use crate::error::{
28    Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
29};
30use crate::repr::RelationDesc;
31
32/// Table description, include relation desc and default values, which is the minimal information flow needed for table
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
34pub struct TableDesc {
35    pub relation_desc: RelationDesc,
36    pub default_values: Vec<Option<ColumnDefaultConstraint>>,
37}
38
39impl TableDesc {
40    pub fn new(
41        relation_desc: RelationDesc,
42        default_values: Vec<Option<ColumnDefaultConstraint>>,
43    ) -> Self {
44        Self {
45            relation_desc,
46            default_values,
47        }
48    }
49
50    pub fn new_no_default(relation_desc: RelationDesc) -> Self {
51        Self {
52            relation_desc,
53            default_values: vec![],
54        }
55    }
56}
57
58/// Table source but for flow, provide table schema by table name/id
59#[async_trait::async_trait]
60pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
61    async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error>;
62    async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;
63
64    /// Get the table schema by table name
65    async fn table(&self, name: &TableName) -> Result<TableDesc, Error> {
66        let id = self.table_id_from_name(name).await?;
67        self.table_from_id(&id).await
68    }
69    async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error>;
70}
71
72/// managed table source information, query from table info manager and table name manager
73#[derive(Clone)]
74pub struct ManagedTableSource {
75    /// for query `TableId -> TableName` mapping
76    table_info_manager: TableInfoManager,
77    table_name_manager: TableNameManager,
78}
79
80#[async_trait::async_trait]
81impl FlowTableSource for ManagedTableSource {
82    async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
83        let table_info_value = self
84            .get_table_info_value(table_id)
85            .await?
86            .with_context(|| TableNotFoundSnafu {
87                name: format!("TableId = {:?}, Can't found table info", table_id),
88            })?;
89        let desc = table_info_value_to_relation_desc(table_info_value)?;
90
91        Ok(desc)
92    }
93    async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
94        self.get_table_name(table_id).await
95    }
96    async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
97        self.get_opt_table_id_from_name(name)
98            .await?
99            .with_context(|| TableNotFoundSnafu {
100                name: name.join("."),
101            })
102    }
103}
104
105impl ManagedTableSource {
106    pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
107        ManagedTableSource {
108            table_info_manager,
109            table_name_manager,
110        }
111    }
112
113    /// Get the time index column from table id
114    pub async fn get_time_index_column_from_table_id(
115        &self,
116        table_id: TableId,
117    ) -> Result<(usize, datatypes::schema::ColumnSchema), Error> {
118        let info = self
119            .table_info_manager
120            .get(table_id)
121            .await
122            .map_err(BoxedError::new)
123            .context(ExternalSnafu)?
124            .context(UnexpectedSnafu {
125                reason: format!("Table id = {:?}, couldn't found table info", table_id),
126            })?;
127        let raw_schema = &info.table_info.meta.schema;
128        let Some(ts_index) = raw_schema.timestamp_index else {
129            UnexpectedSnafu {
130                reason: format!("Table id = {:?}, couldn't found timestamp index", table_id),
131            }
132            .fail()?
133        };
134        let col_schema = raw_schema.column_schemas[ts_index].clone();
135        Ok((ts_index, col_schema))
136    }
137
138    pub async fn get_table_id_from_proto_name(
139        &self,
140        name: &greptime_proto::v1::TableName,
141    ) -> Result<TableId, Error> {
142        self.table_name_manager
143            .get(TableNameKey::new(
144                &name.catalog_name,
145                &name.schema_name,
146                &name.table_name,
147            ))
148            .await
149            .with_context(|_| TableNotFoundMetaSnafu {
150                msg: format!("Table name = {:?}, couldn't found table id", name),
151            })?
152            .with_context(|| UnexpectedSnafu {
153                reason: format!("Table name = {:?}, couldn't found table id", name),
154            })
155            .map(|id| id.table_id())
156    }
157
158    /// If the table haven't been created in database, the tableId returned would be null
159    pub async fn get_opt_table_id_from_name(
160        &self,
161        name: &TableName,
162    ) -> Result<Option<TableId>, Error> {
163        let ret = self
164            .table_name_manager
165            .get(TableNameKey::new(&name[0], &name[1], &name[2]))
166            .await
167            .with_context(|_| TableNotFoundMetaSnafu {
168                msg: format!("Table name = {:?}, couldn't found table id", name),
169            })?
170            .map(|id| id.table_id());
171        Ok(ret)
172    }
173
174    /// query metasrv about the table name and table id
175    pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
176        self.table_info_manager
177            .get(*table_id)
178            .await
179            .map_err(BoxedError::new)
180            .context(ExternalSnafu)?
181            .with_context(|| UnexpectedSnafu {
182                reason: format!("Table id = {:?}, couldn't found table name", table_id),
183            })
184            .map(|name| name.table_name())
185            .map(|name| [name.catalog_name, name.schema_name, name.table_name])
186    }
187
188    /// query metasrv about the `TableInfoValue` and table id
189    pub async fn get_table_info_value(
190        &self,
191        table_id: &TableId,
192    ) -> Result<Option<TableInfoValue>, Error> {
193        Ok(self
194            .table_info_manager
195            .get(*table_id)
196            .await
197            .with_context(|_| TableNotFoundMetaSnafu {
198                msg: format!("TableId = {:?}, couldn't found table name", table_id),
199            })?
200            .map(|v| v.into_inner()))
201    }
202
203    pub async fn get_table_name_schema(
204        &self,
205        table_id: &TableId,
206    ) -> Result<(TableName, TableDesc), Error> {
207        let table_info_value = self
208            .get_table_info_value(table_id)
209            .await?
210            .with_context(|| TableNotFoundSnafu {
211                name: format!("TableId = {:?}, Can't found table info", table_id),
212            })?;
213
214        let table_name = table_info_value.table_name();
215        let table_name = [
216            table_name.catalog_name,
217            table_name.schema_name,
218            table_name.table_name,
219        ];
220
221        let desc = table_info_value_to_relation_desc(table_info_value)?;
222        Ok((table_name, desc))
223    }
224
225    pub async fn check_table_exist(&self, table_id: &TableId) -> Result<bool, Error> {
226        self.table_info_manager
227            .exists(*table_id)
228            .await
229            .map_err(BoxedError::new)
230            .context(ExternalSnafu)
231    }
232}
233
234impl std::fmt::Debug for ManagedTableSource {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        f.debug_struct("KvBackendTableSource").finish()
237    }
238}
239
240#[cfg(test)]
241pub(crate) mod test {
242    use std::collections::HashMap;
243
244    use datatypes::data_type::ConcreteDataType as CDT;
245
246    use super::*;
247    use crate::repr::{ColumnType, RelationType};
248
249    pub struct FlowDummyTableSource {
250        pub id_names_to_desc: Vec<(TableId, TableName, TableDesc)>,
251        id_to_idx: HashMap<TableId, usize>,
252        name_to_idx: HashMap<TableName, usize>,
253    }
254
255    impl Default for FlowDummyTableSource {
256        fn default() -> Self {
257            let id_names_to_desc = vec![
258                (
259                    1024,
260                    [
261                        "greptime".to_string(),
262                        "public".to_string(),
263                        "numbers".to_string(),
264                    ],
265                    TableDesc::new_no_default(
266                        RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
267                            .into_named(vec![Some("number".to_string())]),
268                    ),
269                ),
270                (
271                    1025,
272                    [
273                        "greptime".to_string(),
274                        "public".to_string(),
275                        "numbers_with_ts".to_string(),
276                    ],
277                    TableDesc::new_no_default(
278                        RelationType::new(vec![
279                            ColumnType::new(CDT::uint32_datatype(), false),
280                            ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
281                        ])
282                        .into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
283                    ),
284                ),
285            ];
286            let id_to_idx = id_names_to_desc
287                .iter()
288                .enumerate()
289                .map(|(idx, (id, _name, _desc))| (*id, idx))
290                .collect();
291            let name_to_idx = id_names_to_desc
292                .iter()
293                .enumerate()
294                .map(|(idx, (_id, name, _desc))| (name.clone(), idx))
295                .collect();
296            Self {
297                id_names_to_desc,
298                id_to_idx,
299                name_to_idx,
300            }
301        }
302    }
303
304    #[async_trait::async_trait]
305    impl FlowTableSource for FlowDummyTableSource {
306        async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
307            let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
308                name: format!("Table id = {:?}, couldn't found table desc", table_id),
309            })?;
310            let desc = self
311                .id_names_to_desc
312                .get(*idx)
313                .map(|x| x.2.clone())
314                .context(TableNotFoundSnafu {
315                    name: format!("Table id = {:?}, couldn't found table desc", table_id),
316                })?;
317            Ok(desc)
318        }
319
320        async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
321            let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
322                name: format!("Table id = {:?}, couldn't found table desc", table_id),
323            })?;
324            self.id_names_to_desc
325                .get(*idx)
326                .map(|x| x.1.clone())
327                .context(TableNotFoundSnafu {
328                    name: format!("Table id = {:?}, couldn't found table desc", table_id),
329                })
330        }
331
332        async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
333            for (id, table_name, _desc) in &self.id_names_to_desc {
334                if name == table_name {
335                    return Ok(*id);
336                }
337            }
338            TableNotFoundSnafu {
339                name: format!("Table name = {:?}, couldn't found table id", name),
340            }
341            .fail()?
342        }
343    }
344
345    impl std::fmt::Debug for FlowDummyTableSource {
346        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347            f.debug_struct("DummyTableSource").finish()
348        }
349    }
350}