catalog/system_schema/
memory_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod table_columns;
16
17use std::sync::Arc;
18
19use arrow_schema::SchemaRef as ArrowSchemaRef;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::schema::SchemaRef;
28use datatypes::vectors::VectorRef;
29use snafu::ResultExt;
30use store_api::storage::{ScanRequest, TableId};
31
32use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
33use crate::system_schema::SystemTable;
34
35/// A memory table with specified schema and columns.
36#[derive(Debug)]
37pub(crate) struct MemoryTable {
38    pub(crate) table_id: TableId,
39    pub(crate) table_name: &'static str,
40    pub(crate) schema: SchemaRef,
41    pub(crate) columns: Vec<VectorRef>,
42}
43
44impl MemoryTable {
45    /// Creates a memory table with table id, name, schema and columns.
46    pub fn new(
47        table_id: TableId,
48        table_name: &'static str,
49        schema: SchemaRef,
50        columns: Vec<VectorRef>,
51    ) -> Self {
52        Self {
53            table_id,
54            table_name,
55            schema,
56            columns,
57        }
58    }
59
60    pub fn builder(&self) -> MemoryTableBuilder {
61        MemoryTableBuilder::new(self.schema.clone(), self.columns.clone())
62    }
63}
64
65pub(crate) struct MemoryTableBuilder {
66    schema: SchemaRef,
67    columns: Vec<VectorRef>,
68}
69
70impl MemoryTableBuilder {
71    fn new(schema: SchemaRef, columns: Vec<VectorRef>) -> Self {
72        Self { schema, columns }
73    }
74
75    /// Construct the `information_schema.{table_name}` virtual table
76    pub async fn memory_records(&mut self) -> Result<RecordBatch> {
77        if self.columns.is_empty() {
78            Ok(RecordBatch::new_empty(self.schema.clone()))
79        } else {
80            RecordBatch::new(self.schema.clone(), std::mem::take(&mut self.columns))
81                .context(CreateRecordBatchSnafu)
82        }
83    }
84}
85
86impl DfPartitionStream for MemoryTable {
87    fn schema(&self) -> &ArrowSchemaRef {
88        self.schema.arrow_schema()
89    }
90
91    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
92        let schema = self.schema.arrow_schema().clone();
93        let mut builder = self.builder();
94        Box::pin(DfRecordBatchStreamAdapter::new(
95            schema,
96            futures::stream::once(async move {
97                builder
98                    .memory_records()
99                    .await
100                    .map(|x| x.into_df_record_batch())
101                    .map_err(Into::into)
102            }),
103        ))
104    }
105}
106
107impl SystemTable for MemoryTable {
108    fn table_id(&self) -> TableId {
109        self.table_id
110    }
111
112    fn table_name(&self) -> &'static str {
113        self.table_name
114    }
115
116    fn schema(&self) -> SchemaRef {
117        self.schema.clone()
118    }
119
120    fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
121        let schema = self.schema.arrow_schema().clone();
122        let mut builder = self.builder();
123        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
124            schema,
125            futures::stream::once(async move {
126                builder
127                    .memory_records()
128                    .await
129                    .map(|x| x.into_df_record_batch())
130                    .map_err(Into::into)
131            }),
132        ));
133        Ok(Box::pin(
134            RecordBatchStreamAdapter::try_new(stream)
135                .map_err(BoxedError::new)
136                .context(InternalSnafu)?,
137        ))
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use std::sync::Arc;
144
145    use common_recordbatch::RecordBatches;
146    use datatypes::prelude::ConcreteDataType;
147    use datatypes::schema::{ColumnSchema, Schema};
148    use datatypes::vectors::StringVector;
149
150    use super::*;
151    use crate::system_schema::SystemTable;
152
153    #[tokio::test]
154    async fn test_memory_table() {
155        let schema = Arc::new(Schema::new(vec![
156            ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
157            ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
158        ]));
159
160        let table = MemoryTable::new(
161            42,
162            "test",
163            schema.clone(),
164            vec![
165                Arc::new(StringVector::from(vec!["a1", "a2"])),
166                Arc::new(StringVector::from(vec!["b1", "b2"])),
167            ],
168        );
169
170        assert_eq!(42, table.table_id());
171        assert_eq!("test", table.table_name);
172        assert_eq!(schema, SystemTable::schema(&table));
173
174        let stream = table.to_stream(ScanRequest::default()).unwrap();
175
176        let batches = RecordBatches::try_collect(stream).await.unwrap();
177
178        assert_eq!(
179            "\
180+----+----+
181| a  | b  |
182+----+----+
183| a1 | b1 |
184| a2 | b2 |
185+----+----+",
186            batches.pretty_print().unwrap()
187        );
188    }
189
190    #[tokio::test]
191    async fn test_empty_memory_table() {
192        let schema = Arc::new(Schema::new(vec![
193            ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
194            ColumnSchema::new("b", ConcreteDataType::string_datatype(), false),
195        ]));
196
197        let table = MemoryTable::new(42, "test", schema.clone(), vec![]);
198
199        assert_eq!(42, table.table_id());
200        assert_eq!("test", table.table_name());
201        assert_eq!(schema, SystemTable::schema(&table));
202
203        let stream = table.to_stream(ScanRequest::default()).unwrap();
204
205        let batches = RecordBatches::try_collect(stream).await.unwrap();
206
207        assert_eq!(
208            "\
209+---+---+
210| a | b |
211+---+---+
212+---+---+",
213            batches.pretty_print().unwrap()
214        );
215    }
216}