1pub mod information_schema;
16mod memory_table;
17pub mod numbers_table_provider;
18pub mod pg_catalog;
19pub mod predicate;
20mod utils;
21
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use common_error::ext::BoxedError;
26use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
27use datatypes::schema::SchemaRef;
28use futures_util::StreamExt;
29use snafu::ResultExt;
30use store_api::data_source::DataSource;
31use store_api::storage::ScanRequest;
32use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
33use table::metadata::{
34 FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
35};
36use table::{Table, TableRef};
37
38use crate::error::Result;
39
40pub trait SystemSchemaProvider {
41 fn tables(&self) -> &HashMap<String, TableRef>;
43
44 fn table(&self, name: &str) -> Option<TableRef> {
46 self.tables().get(name).cloned()
47 }
48
49 fn table_names(&self) -> Vec<String> {
51 let mut tables = self.tables().values().clone().collect::<Vec<_>>();
52
53 tables.sort_by(|t1, t2| {
54 t1.table_info()
55 .table_id()
56 .partial_cmp(&t2.table_info().table_id())
57 .unwrap()
58 });
59 tables
60 .into_iter()
61 .map(|t| t.table_info().name.clone())
62 .collect()
63 }
64}
65
66trait SystemSchemaProviderInner {
67 fn catalog_name(&self) -> &str;
68 fn schema_name() -> &'static str;
69 fn build_table(&self, name: &str) -> Option<TableRef> {
70 self.system_table(name).map(|table| {
71 let table_info = Self::table_info(self.catalog_name().to_string(), &table);
72 let filter_pushdown = FilterPushDownType::Inexact;
73 let data_source = Arc::new(SystemTableDataSource::new(table));
74 let table = Table::new(table_info, filter_pushdown, data_source);
75 Arc::new(table)
76 })
77 }
78 fn system_table(&self, name: &str) -> Option<SystemTableRef>;
79
80 fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef {
81 let table_meta = TableMetaBuilder::empty()
82 .schema(table.schema())
83 .primary_key_indices(vec![])
84 .next_column_id(0)
85 .build()
86 .unwrap();
87 let table_info = TableInfoBuilder::default()
88 .table_id(table.table_id())
89 .name(table.table_name().to_string())
90 .catalog_name(catalog_name)
91 .schema_name(Self::schema_name().to_string())
92 .meta(table_meta)
93 .table_type(table.table_type())
94 .build()
95 .unwrap();
96 Arc::new(table_info)
97 }
98}
99
100pub trait SystemTable {
101 fn table_id(&self) -> TableId;
102
103 fn table_name(&self) -> &'static str;
104
105 fn schema(&self) -> SchemaRef;
106
107 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
108
109 fn table_type(&self) -> TableType {
110 TableType::Temporary
111 }
112}
113
114pub type SystemTableRef = Arc<dyn SystemTable + Send + Sync>;
115
116struct SystemTableDataSource {
117 table: SystemTableRef,
118}
119
120impl SystemTableDataSource {
121 fn new(table: SystemTableRef) -> Self {
122 Self { table }
123 }
124
125 fn try_project(&self, projection: &[usize]) -> std::result::Result<SchemaRef, BoxedError> {
126 let schema = self
127 .table
128 .schema()
129 .try_project(projection)
130 .context(SchemaConversionSnafu)
131 .map_err(BoxedError::new)?;
132 Ok(Arc::new(schema))
133 }
134}
135
136impl DataSource for SystemTableDataSource {
137 fn get_stream(
138 &self,
139 request: ScanRequest,
140 ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
141 let projected_schema = match &request.projection {
142 Some(projection) => self.try_project(projection)?,
143 None => self.table.schema(),
144 };
145
146 let projection = request.projection.clone();
147 let stream = self
148 .table
149 .to_stream(request)
150 .map_err(BoxedError::new)
151 .context(TablesRecordBatchSnafu)
152 .map_err(BoxedError::new)?
153 .map(move |batch| match (&projection, batch) {
154 (Some(p), Ok(b)) if b.num_columns() != p.len() => b.try_project(p),
158 (_, res) => res,
159 });
160
161 let stream = RecordBatchStreamWrapper {
162 schema: projected_schema,
163 stream: Box::pin(stream),
164 output_ordering: None,
165 metrics: Default::default(),
166 };
167
168 Ok(Box::pin(stream))
169 }
170}