1use std::pin::Pin;
16use std::sync::Arc;
17
18use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchMetrics;
21use common_recordbatch::error::Result as RecordBatchResult;
22use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
23use datatypes::prelude::*;
24use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
25use datatypes::vectors::UInt32Vector;
26use futures::task::{Context, Poll};
27use futures::Stream;
28use snafu::prelude::*;
29use store_api::data_source::DataSource;
30use store_api::storage::{RegionNumber, ScanRequest};
31
32use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu};
33use crate::metadata::{
34 FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
35};
36use crate::{Table, TableRef};
37
38pub struct MemTable;
39
40impl MemTable {
41 pub fn table(table_name: impl Into<String>, recordbatch: RecordBatch) -> TableRef {
42 Self::new_with_region(table_name, recordbatch, vec![0])
43 }
44
45 pub fn new_with_region(
46 table_name: impl Into<String>,
47 recordbatch: RecordBatch,
48 regions: Vec<RegionNumber>,
49 ) -> TableRef {
50 Self::new_with_catalog(
51 table_name,
52 recordbatch,
53 1,
54 DEFAULT_CATALOG_NAME.to_string(),
55 DEFAULT_SCHEMA_NAME.to_string(),
56 regions,
57 )
58 }
59
60 pub fn new_with_catalog(
61 table_name: impl Into<String>,
62 recordbatch: RecordBatch,
63 table_id: TableId,
64 catalog_name: String,
65 schema_name: String,
66 regions: Vec<RegionNumber>,
67 ) -> TableRef {
68 let schema = recordbatch.schema.clone();
69
70 let meta = TableMetaBuilder::empty()
71 .schema(schema)
72 .primary_key_indices(vec![])
73 .value_indices(vec![])
74 .engine("mito".to_string())
75 .next_column_id(0)
76 .options(Default::default())
77 .created_on(Default::default())
78 .region_numbers(regions)
79 .build()
80 .unwrap();
81
82 let info = Arc::new(
83 TableInfoBuilder::default()
84 .table_id(table_id)
85 .table_version(0 as TableVersion)
86 .name(table_name.into())
87 .schema_name(schema_name)
88 .catalog_name(catalog_name)
89 .desc(None)
90 .table_type(TableType::Base)
91 .meta(meta)
92 .build()
93 .unwrap(),
94 );
95
96 let data_source = Arc::new(MemtableDataSource { recordbatch });
97 let table = Table::new(info, FilterPushDownType::Unsupported, data_source);
98 Arc::new(table)
99 }
100
101 pub fn default_numbers_table() -> TableRef {
104 Self::specified_numbers_table(100)
105 }
106
107 pub fn specified_numbers_table(rows: u32) -> TableRef {
108 let column_schemas = vec![ColumnSchema::new(
109 "uint32s",
110 ConcreteDataType::uint32_datatype(),
111 true,
112 )];
113 let schema = Arc::new(Schema::new(column_schemas));
114 let columns: Vec<VectorRef> = vec![Arc::new(UInt32Vector::from_slice(
115 (0..rows).collect::<Vec<_>>(),
116 ))];
117 let recordbatch = RecordBatch::new(schema, columns).unwrap();
118 MemTable::table("numbers", recordbatch)
119 }
120}
121
122struct MemtableDataSource {
123 recordbatch: RecordBatch,
124}
125
126impl DataSource for MemtableDataSource {
127 fn get_stream(
128 &self,
129 request: ScanRequest,
130 ) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
131 let df_recordbatch = if let Some(indices) = request.projection {
132 self.recordbatch
133 .df_record_batch()
134 .project(&indices)
135 .context(TableProjectionSnafu)
136 .map_err(BoxedError::new)?
137 } else {
138 self.recordbatch.df_record_batch().clone()
139 };
140
141 let rows = df_recordbatch.num_rows();
142 let limit = if let Some(limit) = request.limit {
143 limit.min(rows)
144 } else {
145 rows
146 };
147 let df_recordbatch = df_recordbatch.slice(0, limit);
148
149 let recordbatch = RecordBatch::try_from_df_record_batch(
150 Arc::new(
151 Schema::try_from(df_recordbatch.schema())
152 .context(SchemaConversionSnafu)
153 .map_err(BoxedError::new)?,
154 ),
155 df_recordbatch,
156 )
157 .map_err(BoxedError::new)
158 .context(TablesRecordBatchSnafu)
159 .map_err(BoxedError::new)?;
160
161 Ok(Box::pin(MemtableStream {
162 schema: recordbatch.schema.clone(),
163 recordbatch: Some(recordbatch),
164 }))
165 }
166}
167
168impl RecordBatchStream for MemtableStream {
169 fn schema(&self) -> SchemaRef {
170 self.schema.clone()
171 }
172
173 fn output_ordering(&self) -> Option<&[OrderOption]> {
174 None
175 }
176
177 fn metrics(&self) -> Option<RecordBatchMetrics> {
178 None
179 }
180}
181
182struct MemtableStream {
183 schema: SchemaRef,
184 recordbatch: Option<RecordBatch>,
185}
186
187impl Stream for MemtableStream {
188 type Item = RecordBatchResult<RecordBatch>;
189
190 fn poll_next(mut self: Pin<&mut Self>, _ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191 match self.recordbatch.take() {
192 Some(records) => Poll::Ready(Some(Ok(records))),
193 None => Poll::Ready(None),
194 }
195 }
196}
197
198#[cfg(test)]
199mod test {
200 use common_recordbatch::util;
201 use datatypes::prelude::*;
202 use datatypes::schema::ColumnSchema;
203 use datatypes::vectors::{Helper, Int32Vector, StringVector};
204
205 use super::*;
206
207 #[tokio::test]
208 async fn test_scan_with_projection() {
209 let table = build_testing_table();
210
211 let scan_req = ScanRequest {
212 projection: Some(vec![1]),
213 ..Default::default()
214 };
215 let stream = table.scan_to_stream(scan_req).await.unwrap();
216 let recordbatch = util::collect(stream).await.unwrap();
217 assert_eq!(1, recordbatch.len());
218 let columns = recordbatch[0].df_record_batch().columns();
219 assert_eq!(1, columns.len());
220
221 let string_column = Helper::try_into_vector(&columns[0]).unwrap();
222 let string_column = string_column
223 .as_any()
224 .downcast_ref::<StringVector>()
225 .unwrap();
226 let string_column = string_column.iter_data().flatten().collect::<Vec<&str>>();
227 assert_eq!(vec!["hello", "greptime"], string_column);
228 }
229
230 #[tokio::test]
231 async fn test_scan_with_limit() {
232 let table = build_testing_table();
233
234 let scan_req = ScanRequest {
235 limit: Some(2),
236 ..Default::default()
237 };
238 let stream = table.scan_to_stream(scan_req).await.unwrap();
239 let recordbatch = util::collect(stream).await.unwrap();
240 assert_eq!(1, recordbatch.len());
241 let columns = recordbatch[0].df_record_batch().columns();
242 assert_eq!(2, columns.len());
243
244 let i32_column = Helper::try_into_vector(&columns[0]).unwrap();
245 let i32_column = i32_column.as_any().downcast_ref::<Int32Vector>().unwrap();
246 let i32_column = i32_column.iter_data().flatten().collect::<Vec<i32>>();
247 assert_eq!(vec![-100], i32_column);
248
249 let string_column = Helper::try_into_vector(&columns[1]).unwrap();
250 let string_column = string_column
251 .as_any()
252 .downcast_ref::<StringVector>()
253 .unwrap();
254 let string_column = string_column.iter_data().flatten().collect::<Vec<&str>>();
255 assert_eq!(vec!["hello"], string_column);
256 }
257
258 fn build_testing_table() -> TableRef {
259 let i32_column_schema =
260 ColumnSchema::new("i32_numbers", ConcreteDataType::int32_datatype(), true);
261 let string_column_schema =
262 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true);
263 let column_schemas = vec![i32_column_schema, string_column_schema];
264
265 let schema = Arc::new(Schema::new(column_schemas));
266 let columns: Vec<VectorRef> = vec![
267 Arc::new(Int32Vector::from(vec![
268 Some(-100),
269 None,
270 Some(1),
271 Some(100),
272 ])),
273 Arc::new(StringVector::from(vec![
274 Some("hello"),
275 None,
276 Some("greptime"),
277 None,
278 ])),
279 ];
280 let recordbatch = RecordBatch::new(schema, columns).unwrap();
281 MemTable::table("", recordbatch)
282 }
283}