1pub mod information_schema;
16mod memory_table;
17pub mod numbers_table_provider;
18pub mod pg_catalog;
19pub mod predicate;
20mod utils;
21
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use common_error::ext::BoxedError;
26use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
27use common_telemetry::tracing::Span;
28use datatypes::schema::SchemaRef;
29use futures_util::StreamExt;
30use snafu::ResultExt;
31use store_api::data_source::DataSource;
32use store_api::storage::ScanRequest;
33use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
34use table::metadata::{
35 FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
36};
37use table::{Table, TableRef};
38
39use crate::error::Result;
40
41pub trait SystemSchemaProvider {
42 fn tables(&self) -> &HashMap<String, TableRef>;
44
45 fn table(&self, name: &str) -> Option<TableRef> {
47 self.tables().get(name).cloned()
48 }
49
50 fn table_names(&self) -> Vec<String> {
52 let mut tables = self.tables().values().clone().collect::<Vec<_>>();
53
54 tables.sort_by(|t1, t2| {
55 t1.table_info()
56 .table_id()
57 .partial_cmp(&t2.table_info().table_id())
58 .unwrap()
59 });
60 tables
61 .into_iter()
62 .map(|t| t.table_info().name.clone())
63 .collect()
64 }
65}
66
67trait SystemSchemaProviderInner {
68 fn catalog_name(&self) -> &str;
69 fn schema_name() -> &'static str;
70 fn build_table(&self, name: &str) -> Option<TableRef> {
71 self.system_table(name).map(|table| {
72 let table_info = Self::table_info(self.catalog_name().to_string(), &table);
73 let filter_pushdown = FilterPushDownType::Inexact;
74 let data_source = Arc::new(SystemTableDataSource::new(table));
75 let table = Table::new(table_info, filter_pushdown, data_source);
76 Arc::new(table)
77 })
78 }
79 fn system_table(&self, name: &str) -> Option<SystemTableRef>;
80
81 fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef {
82 let table_meta = TableMetaBuilder::empty()
83 .schema(table.schema())
84 .primary_key_indices(vec![])
85 .next_column_id(0)
86 .build()
87 .unwrap();
88 let table_info = TableInfoBuilder::default()
89 .table_id(table.table_id())
90 .name(table.table_name().to_string())
91 .catalog_name(catalog_name)
92 .schema_name(Self::schema_name().to_string())
93 .meta(table_meta)
94 .table_type(table.table_type())
95 .build()
96 .unwrap();
97 Arc::new(table_info)
98 }
99}
100
101pub trait SystemTable {
102 fn table_id(&self) -> TableId;
103
104 fn table_name(&self) -> &'static str;
105
106 fn schema(&self) -> SchemaRef;
107
108 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
109
110 fn table_type(&self) -> TableType {
111 TableType::Temporary
112 }
113}
114
115pub type SystemTableRef = Arc<dyn SystemTable + Send + Sync>;
116
117struct SystemTableDataSource {
118 table: SystemTableRef,
119}
120
121impl SystemTableDataSource {
122 fn new(table: SystemTableRef) -> Self {
123 Self { table }
124 }
125
126 fn try_project(&self, projection: &[usize]) -> std::result::Result<SchemaRef, BoxedError> {
127 let schema = self
128 .table
129 .schema()
130 .try_project(projection)
131 .context(SchemaConversionSnafu)
132 .map_err(BoxedError::new)?;
133 Ok(Arc::new(schema))
134 }
135}
136
137impl DataSource for SystemTableDataSource {
138 fn get_stream(
139 &self,
140 request: ScanRequest,
141 ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
142 let projected_schema = match &request.projection {
143 Some(projection) => self.try_project(projection)?,
144 None => self.table.schema(),
145 };
146
147 let projection = request.projection.clone();
148 let stream = self
149 .table
150 .to_stream(request)
151 .map_err(BoxedError::new)
152 .context(TablesRecordBatchSnafu)
153 .map_err(BoxedError::new)?
154 .map(move |batch| match (&projection, batch) {
155 (Some(p), Ok(b)) if b.num_columns() != p.len() => b.try_project(p),
159 (_, res) => res,
160 });
161
162 let stream = RecordBatchStreamWrapper {
163 schema: projected_schema,
164 stream: Box::pin(stream),
165 output_ordering: None,
166 metrics: Default::default(),
167 span: Span::current(),
168 };
169
170 Ok(Box::pin(stream))
171 }
172}