catalog/system_schema/
memory_table.rs1mod table_columns;
16
17use std::sync::Arc;
18
19use arrow_schema::SchemaRef as ArrowSchemaRef;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::schema::SchemaRef;
28use datatypes::vectors::VectorRef;
29use snafu::ResultExt;
30use store_api::storage::{ScanRequest, TableId};
31
32use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
33use crate::system_schema::SystemTable;
34
35#[derive(Debug)]
37pub(crate) struct MemoryTable {
38 pub(crate) table_id: TableId,
39 pub(crate) table_name: &'static str,
40 pub(crate) schema: SchemaRef,
41 pub(crate) columns: Vec<VectorRef>,
42}
43
44impl MemoryTable {
45 pub fn new(
47 table_id: TableId,
48 table_name: &'static str,
49 schema: SchemaRef,
50 columns: Vec<VectorRef>,
51 ) -> Self {
52 Self {
53 table_id,
54 table_name,
55 schema,
56 columns,
57 }
58 }
59
60 pub fn builder(&self) -> MemoryTableBuilder {
61 MemoryTableBuilder::new(self.schema.clone(), self.columns.clone())
62 }
63}
64
65pub(crate) struct MemoryTableBuilder {
66 schema: SchemaRef,
67 columns: Vec<VectorRef>,
68}
69
70impl MemoryTableBuilder {
71 fn new(schema: SchemaRef, columns: Vec<VectorRef>) -> Self {
72 Self { schema, columns }
73 }
74
75 pub async fn memory_records(&mut self) -> Result<RecordBatch> {
77 if self.columns.is_empty() {
78 Ok(RecordBatch::new_empty(self.schema.clone()))
79 } else {
80 RecordBatch::new(self.schema.clone(), std::mem::take(&mut self.columns))
81 .context(CreateRecordBatchSnafu)
82 }
83 }
84}
85
86impl DfPartitionStream for MemoryTable {
87 fn schema(&self) -> &ArrowSchemaRef {
88 self.schema.arrow_schema()
89 }
90
91 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
92 let schema = self.schema.arrow_schema().clone();
93 let mut builder = self.builder();
94 Box::pin(DfRecordBatchStreamAdapter::new(
95 schema,
96 futures::stream::once(async move {
97 builder
98 .memory_records()
99 .await
100 .map(|x| x.into_df_record_batch())
101 .map_err(Into::into)
102 }),
103 ))
104 }
105}
106
107impl SystemTable for MemoryTable {
108 fn table_id(&self) -> TableId {
109 self.table_id
110 }
111
112 fn table_name(&self) -> &'static str {
113 self.table_name
114 }
115
116 fn schema(&self) -> SchemaRef {
117 self.schema.clone()
118 }
119
120 fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
121 let schema = self.schema.arrow_schema().clone();
122 let mut builder = self.builder();
123 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
124 schema,
125 futures::stream::once(async move {
126 builder
127 .memory_records()
128 .await
129 .map(|x| x.into_df_record_batch())
130 .map_err(Into::into)
131 }),
132 ));
133 Ok(Box::pin(
134 RecordBatchStreamAdapter::try_new(stream)
135 .map_err(BoxedError::new)
136 .context(InternalSnafu)?,
137 ))
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use std::sync::Arc;
144
145 use common_recordbatch::RecordBatches;
146 use datatypes::prelude::ConcreteDataType;
147 use datatypes::schema::{ColumnSchema, Schema};
148 use datatypes::vectors::StringVector;
149
150 use super::*;
151 use crate::system_schema::SystemTable;
152
153 #[tokio::test]
154 async fn test_memory_table() {
155 let schema = Arc::new(Schema::new(vec![
156 ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
157 ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
158 ]));
159
160 let table = MemoryTable::new(
161 42,
162 "test",
163 schema.clone(),
164 vec![
165 Arc::new(StringVector::from(vec!["a1", "a2"])),
166 Arc::new(StringVector::from(vec!["b1", "b2"])),
167 ],
168 );
169
170 assert_eq!(42, table.table_id());
171 assert_eq!("test", table.table_name);
172 assert_eq!(schema, SystemTable::schema(&table));
173
174 let stream = table.to_stream(ScanRequest::default()).unwrap();
175
176 let batches = RecordBatches::try_collect(stream).await.unwrap();
177
178 assert_eq!(
179 "\
180+----+----+
181| a | b |
182+----+----+
183| a1 | b1 |
184| a2 | b2 |
185+----+----+",
186 batches.pretty_print().unwrap()
187 );
188 }
189
190 #[tokio::test]
191 async fn test_empty_memory_table() {
192 let schema = Arc::new(Schema::new(vec![
193 ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
194 ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
195 ]));
196
197 let table = MemoryTable::new(42, "test", schema.clone(), vec![]);
198
199 assert_eq!(42, table.table_id());
200 assert_eq!("test", table.table_name());
201 assert_eq!(schema, SystemTable::schema(&table));
202
203 let stream = table.to_stream(ScanRequest::default()).unwrap();
204
205 let batches = RecordBatches::try_collect(stream).await.unwrap();
206
207 assert_eq!(
208 "\
209+---+---+
210| a | b |
211+---+---+
212+---+---+",
213 batches.pretty_print().unwrap()
214 );
215 }
216}