table/test_util/
memtable.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17
18use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchMetrics;
21use common_recordbatch::error::Result as RecordBatchResult;
22use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
23use datatypes::prelude::*;
24use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
25use datatypes::vectors::UInt32Vector;
26use futures::task::{Context, Poll};
27use futures::Stream;
28use snafu::prelude::*;
29use store_api::data_source::DataSource;
30use store_api::storage::{RegionNumber, ScanRequest};
31
32use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu};
33use crate::metadata::{
34    FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
35};
36use crate::{Table, TableRef};
37
38pub struct MemTable;
39
40impl MemTable {
41    pub fn table(table_name: impl Into<String>, recordbatch: RecordBatch) -> TableRef {
42        Self::new_with_region(table_name, recordbatch, vec![0])
43    }
44
45    pub fn new_with_region(
46        table_name: impl Into<String>,
47        recordbatch: RecordBatch,
48        regions: Vec<RegionNumber>,
49    ) -> TableRef {
50        Self::new_with_catalog(
51            table_name,
52            recordbatch,
53            1,
54            DEFAULT_CATALOG_NAME.to_string(),
55            DEFAULT_SCHEMA_NAME.to_string(),
56            regions,
57        )
58    }
59
60    pub fn new_with_catalog(
61        table_name: impl Into<String>,
62        recordbatch: RecordBatch,
63        table_id: TableId,
64        catalog_name: String,
65        schema_name: String,
66        regions: Vec<RegionNumber>,
67    ) -> TableRef {
68        let schema = recordbatch.schema.clone();
69
70        let meta = TableMetaBuilder::empty()
71            .schema(schema)
72            .primary_key_indices(vec![])
73            .value_indices(vec![])
74            .engine("mito".to_string())
75            .next_column_id(0)
76            .options(Default::default())
77            .created_on(Default::default())
78            .region_numbers(regions)
79            .build()
80            .unwrap();
81
82        let info = Arc::new(
83            TableInfoBuilder::default()
84                .table_id(table_id)
85                .table_version(0 as TableVersion)
86                .name(table_name.into())
87                .schema_name(schema_name)
88                .catalog_name(catalog_name)
89                .desc(None)
90                .table_type(TableType::Base)
91                .meta(meta)
92                .build()
93                .unwrap(),
94        );
95
96        let data_source = Arc::new(MemtableDataSource { recordbatch });
97        let table = Table::new(info, FilterPushDownType::Unsupported, data_source);
98        Arc::new(table)
99    }
100
101    /// Creates a 1 column 100 rows table, with table name "numbers", column name "uint32s" and
102    /// column type "uint32". Column data increased from 0 to 100.
103    pub fn default_numbers_table() -> TableRef {
104        Self::specified_numbers_table(100)
105    }
106
107    pub fn specified_numbers_table(rows: u32) -> TableRef {
108        let column_schemas = vec![ColumnSchema::new(
109            "uint32s",
110            ConcreteDataType::uint32_datatype(),
111            true,
112        )];
113        let schema = Arc::new(Schema::new(column_schemas));
114        let columns: Vec<VectorRef> = vec![Arc::new(UInt32Vector::from_slice(
115            (0..rows).collect::<Vec<_>>(),
116        ))];
117        let recordbatch = RecordBatch::new(schema, columns).unwrap();
118        MemTable::table("numbers", recordbatch)
119    }
120}
121
122struct MemtableDataSource {
123    recordbatch: RecordBatch,
124}
125
126impl DataSource for MemtableDataSource {
127    fn get_stream(
128        &self,
129        request: ScanRequest,
130    ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
131        let df_recordbatch = if let Some(indices) = request.projection {
132            self.recordbatch
133                .df_record_batch()
134                .project(&indices)
135                .context(TableProjectionSnafu)
136                .map_err(BoxedError::new)?
137        } else {
138            self.recordbatch.df_record_batch().clone()
139        };
140
141        let rows = df_recordbatch.num_rows();
142        let limit = if let Some(limit) = request.limit {
143            limit.min(rows)
144        } else {
145            rows
146        };
147        let df_recordbatch = df_recordbatch.slice(0, limit);
148
149        let recordbatch = RecordBatch::try_from_df_record_batch(
150            Arc::new(
151                Schema::try_from(df_recordbatch.schema())
152                    .context(SchemaConversionSnafu)
153                    .map_err(BoxedError::new)?,
154            ),
155            df_recordbatch,
156        )
157        .map_err(BoxedError::new)
158        .context(TablesRecordBatchSnafu)
159        .map_err(BoxedError::new)?;
160
161        Ok(Box::pin(MemtableStream {
162            schema: recordbatch.schema.clone(),
163            recordbatch: Some(recordbatch),
164        }))
165    }
166}
167
168impl RecordBatchStream for MemtableStream {
169    fn schema(&self) -> SchemaRef {
170        self.schema.clone()
171    }
172
173    fn output_ordering(&self) -> Option<&[OrderOption]> {
174        None
175    }
176
177    fn metrics(&self) -> Option<RecordBatchMetrics> {
178        None
179    }
180}
181
182struct MemtableStream {
183    schema: SchemaRef,
184    recordbatch: Option<RecordBatch>,
185}
186
187impl Stream for MemtableStream {
188    type Item = RecordBatchResult<RecordBatch>;
189
190    fn poll_next(mut self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191        match self.recordbatch.take() {
192            Some(records) => Poll::Ready(Some(Ok(records))),
193            None => Poll::Ready(None),
194        }
195    }
196}
197
198#[cfg(test)]
199mod test {
200    use common_recordbatch::util;
201    use datatypes::prelude::*;
202    use datatypes::schema::ColumnSchema;
203    use datatypes::vectors::{Helper, Int32Vector, StringVector};
204
205    use super::*;
206
207    #[tokio::test]
208    async fn test_scan_with_projection() {
209        let table = build_testing_table();
210
211        let scan_req = ScanRequest {
212            projection: Some(vec![1]),
213            ..Default::default()
214        };
215        let stream = table.scan_to_stream(scan_req).await.unwrap();
216        let recordbatch = util::collect(stream).await.unwrap();
217        assert_eq!(1, recordbatch.len());
218        let columns = recordbatch[0].df_record_batch().columns();
219        assert_eq!(1, columns.len());
220
221        let string_column = Helper::try_into_vector(&columns[0]).unwrap();
222        let string_column = string_column
223            .as_any()
224            .downcast_ref::<StringVector>()
225            .unwrap();
226        let string_column = string_column.iter_data().flatten().collect::<Vec<&str>>();
227        assert_eq!(vec!["hello", "greptime"], string_column);
228    }
229
230    #[tokio::test]
231    async fn test_scan_with_limit() {
232        let table = build_testing_table();
233
234        let scan_req = ScanRequest {
235            limit: Some(2),
236            ..Default::default()
237        };
238        let stream = table.scan_to_stream(scan_req).await.unwrap();
239        let recordbatch = util::collect(stream).await.unwrap();
240        assert_eq!(1, recordbatch.len());
241        let columns = recordbatch[0].df_record_batch().columns();
242        assert_eq!(2, columns.len());
243
244        let i32_column = Helper::try_into_vector(&columns[0]).unwrap();
245        let i32_column = i32_column.as_any().downcast_ref::<Int32Vector>().unwrap();
246        let i32_column = i32_column.iter_data().flatten().collect::<Vec<i32>>();
247        assert_eq!(vec![-100], i32_column);
248
249        let string_column = Helper::try_into_vector(&columns[1]).unwrap();
250        let string_column = string_column
251            .as_any()
252            .downcast_ref::<StringVector>()
253            .unwrap();
254        let string_column = string_column.iter_data().flatten().collect::<Vec<&str>>();
255        assert_eq!(vec!["hello"], string_column);
256    }
257
258    fn build_testing_table() -> TableRef {
259        let i32_column_schema =
260            ColumnSchema::new("i32_numbers", ConcreteDataType::int32_datatype(), true);
261        let string_column_schema =
262            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true);
263        let column_schemas = vec![i32_column_schema, string_column_schema];
264
265        let schema = Arc::new(Schema::new(column_schemas));
266        let columns: Vec<VectorRef> = vec![
267            Arc::new(Int32Vector::from(vec![
268                Some(-100),
269                None,
270                Some(1),
271                Some(100),
272            ])),
273            Arc::new(StringVector::from(vec![
274                Some("hello"),
275                None,
276                Some("greptime"),
277                None,
278            ])),
279        ];
280        let recordbatch = RecordBatch::new(schema, columns).unwrap();
281        MemTable::table("", recordbatch)
282    }
283}