1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53}
54
55impl Default for WriteOptions {
56 fn default() -> Self {
57 WriteOptions {
58 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
59 row_group_size: DEFAULT_ROW_GROUP_SIZE,
60 }
61 }
62}
63
64#[derive(Debug)]
66pub struct SstInfo {
67 pub file_id: FileId,
69 pub time_range: FileTimeRange,
72 pub file_size: u64,
74 pub num_rows: usize,
76 pub num_row_groups: u64,
78 pub file_metadata: Option<Arc<ParquetMetaData>>,
80 pub index_metadata: IndexOutput,
82}
83
84#[cfg(test)]
85mod tests {
86 use std::sync::Arc;
87
88 use common_time::Timestamp;
89 use datafusion_common::{Column, ScalarValue};
90 use datafusion_expr::{BinaryExpr, Expr, Operator};
91 use datatypes::arrow;
92 use datatypes::arrow::array::RecordBatch;
93 use datatypes::arrow::datatypes::{DataType, Field, Schema};
94 use parquet::arrow::AsyncArrowWriter;
95 use parquet::basic::{Compression, Encoding, ZstdLevel};
96 use parquet::file::metadata::KeyValue;
97 use parquet::file::properties::WriterProperties;
98 use table::predicate::Predicate;
99 use tokio_util::compat::FuturesAsyncWriteCompatExt;
100
101 use super::*;
102 use crate::access_layer::FilePathProvider;
103 use crate::cache::{CacheManager, CacheStrategy, PageKey};
104 use crate::sst::index::{Indexer, IndexerBuilder};
105 use crate::sst::parquet::format::WriteFormat;
106 use crate::sst::parquet::reader::ParquetReaderBuilder;
107 use crate::sst::parquet::writer::ParquetWriter;
108 use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
109 use crate::test_util::sst_util::{
110 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
111 new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
112 };
113 use crate::test_util::{check_reader_result, TestEnv};
114
115 const FILE_DIR: &str = "/";
116
117 #[derive(Clone)]
118 struct FixedPathProvider {
119 file_id: FileId,
120 }
121
122 impl FilePathProvider for FixedPathProvider {
123 fn build_index_file_path(&self, _file_id: FileId) -> String {
124 location::index_file_path(FILE_DIR, self.file_id)
125 }
126
127 fn build_sst_file_path(&self, _file_id: FileId) -> String {
128 location::sst_file_path(FILE_DIR, self.file_id)
129 }
130 }
131
132 struct NoopIndexBuilder;
133
134 #[async_trait::async_trait]
135 impl IndexerBuilder for NoopIndexBuilder {
136 async fn build(&self, _file_id: FileId) -> Indexer {
137 Indexer::default()
138 }
139 }
140
141 #[tokio::test]
142 async fn test_write_read() {
143 let mut env = TestEnv::new();
144 let object_store = env.init_object_store_manager();
145 let handle = sst_file_handle(0, 1000);
146 let file_path = FixedPathProvider {
147 file_id: handle.file_id(),
148 };
149 let metadata = Arc::new(sst_region_metadata());
150 let source = new_source(&[
151 new_batch_by_range(&["a", "d"], 0, 60),
152 new_batch_by_range(&["b", "f"], 0, 40),
153 new_batch_by_range(&["b", "h"], 100, 200),
154 ]);
155 let write_opts = WriteOptions {
157 row_group_size: 50,
158 ..Default::default()
159 };
160
161 let mut writer = ParquetWriter::new_with_object_store(
162 object_store.clone(),
163 metadata.clone(),
164 NoopIndexBuilder,
165 file_path,
166 )
167 .await;
168
169 let info = writer
170 .write_all(source, None, &write_opts)
171 .await
172 .unwrap()
173 .remove(0);
174 assert_eq!(200, info.num_rows);
175 assert!(info.file_size > 0);
176 assert_eq!(
177 (
178 Timestamp::new_millisecond(0),
179 Timestamp::new_millisecond(199)
180 ),
181 info.time_range
182 );
183
184 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
185 let mut reader = builder.build().await.unwrap();
186 check_reader_result(
187 &mut reader,
188 &[
189 new_batch_by_range(&["a", "d"], 0, 50),
190 new_batch_by_range(&["a", "d"], 50, 60),
191 new_batch_by_range(&["b", "f"], 0, 40),
192 new_batch_by_range(&["b", "h"], 100, 150),
193 new_batch_by_range(&["b", "h"], 150, 200),
194 ],
195 )
196 .await;
197 }
198
199 #[tokio::test]
200 async fn test_read_with_cache() {
201 let mut env = TestEnv::new();
202 let object_store = env.init_object_store_manager();
203 let handle = sst_file_handle(0, 1000);
204 let metadata = Arc::new(sst_region_metadata());
205 let source = new_source(&[
206 new_batch_by_range(&["a", "d"], 0, 60),
207 new_batch_by_range(&["b", "f"], 0, 40),
208 new_batch_by_range(&["b", "h"], 100, 200),
209 ]);
210 let write_opts = WriteOptions {
212 row_group_size: 50,
213 ..Default::default()
214 };
215 let mut writer = ParquetWriter::new_with_object_store(
217 object_store.clone(),
218 metadata.clone(),
219 NoopIndexBuilder,
220 FixedPathProvider {
221 file_id: handle.file_id(),
222 },
223 )
224 .await;
225
226 writer
227 .write_all(source, None, &write_opts)
228 .await
229 .unwrap()
230 .remove(0);
231
232 let cache = CacheStrategy::EnableAll(Arc::new(
234 CacheManager::builder()
235 .page_cache_size(64 * 1024 * 1024)
236 .build(),
237 ));
238 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
239 .cache(cache.clone());
240 for _ in 0..3 {
241 let mut reader = builder.build().await.unwrap();
242 check_reader_result(
243 &mut reader,
244 &[
245 new_batch_by_range(&["a", "d"], 0, 50),
246 new_batch_by_range(&["a", "d"], 50, 60),
247 new_batch_by_range(&["b", "f"], 0, 40),
248 new_batch_by_range(&["b", "h"], 100, 150),
249 new_batch_by_range(&["b", "h"], 150, 200),
250 ],
251 )
252 .await;
253 }
254
255 let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
257 assert!(cache.get_pages(&page_key).is_none());
258
259 for i in 0..4 {
261 let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
262 assert!(cache.get_pages(&page_key).is_some());
263 }
264 let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
265 assert!(cache.get_pages(&page_key).is_none());
266 }
267
268 #[tokio::test]
269 async fn test_parquet_metadata_eq() {
270 let mut env = crate::test_util::TestEnv::new();
272 let object_store = env.init_object_store_manager();
273 let handle = sst_file_handle(0, 1000);
274 let metadata = Arc::new(sst_region_metadata());
275 let source = new_source(&[
276 new_batch_by_range(&["a", "d"], 0, 60),
277 new_batch_by_range(&["b", "f"], 0, 40),
278 new_batch_by_range(&["b", "h"], 100, 200),
279 ]);
280 let write_opts = WriteOptions {
281 row_group_size: 50,
282 ..Default::default()
283 };
284
285 let mut writer = ParquetWriter::new_with_object_store(
288 object_store.clone(),
289 metadata.clone(),
290 NoopIndexBuilder,
291 FixedPathProvider {
292 file_id: handle.file_id(),
293 },
294 )
295 .await;
296
297 let sst_info = writer
298 .write_all(source, None, &write_opts)
299 .await
300 .unwrap()
301 .remove(0);
302 let writer_metadata = sst_info.file_metadata.unwrap();
303
304 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
306 let reader = builder.build().await.unwrap();
307 let reader_metadata = reader.parquet_metadata();
308
309 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
310 }
311
312 #[tokio::test]
313 async fn test_read_with_tag_filter() {
314 let mut env = TestEnv::new();
315 let object_store = env.init_object_store_manager();
316 let handle = sst_file_handle(0, 1000);
317 let metadata = Arc::new(sst_region_metadata());
318 let source = new_source(&[
319 new_batch_by_range(&["a", "d"], 0, 60),
320 new_batch_by_range(&["b", "f"], 0, 40),
321 new_batch_by_range(&["b", "h"], 100, 200),
322 ]);
323 let write_opts = WriteOptions {
325 row_group_size: 50,
326 ..Default::default()
327 };
328 let mut writer = ParquetWriter::new_with_object_store(
330 object_store.clone(),
331 metadata.clone(),
332 NoopIndexBuilder,
333 FixedPathProvider {
334 file_id: handle.file_id(),
335 },
336 )
337 .await;
338 writer
339 .write_all(source, None, &write_opts)
340 .await
341 .unwrap()
342 .remove(0);
343
344 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
346 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
347 op: Operator::Eq,
348 right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
349 })]));
350
351 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
352 .predicate(predicate);
353 let mut reader = builder.build().await.unwrap();
354 check_reader_result(
355 &mut reader,
356 &[
357 new_batch_by_range(&["a", "d"], 0, 50),
358 new_batch_by_range(&["a", "d"], 50, 60),
359 ],
360 )
361 .await;
362 }
363
364 #[tokio::test]
365 async fn test_read_empty_batch() {
366 let mut env = TestEnv::new();
367 let object_store = env.init_object_store_manager();
368 let handle = sst_file_handle(0, 1000);
369 let metadata = Arc::new(sst_region_metadata());
370 let source = new_source(&[
371 new_batch_by_range(&["a", "z"], 0, 0),
372 new_batch_by_range(&["a", "z"], 100, 100),
373 new_batch_by_range(&["a", "z"], 200, 230),
374 ]);
375 let write_opts = WriteOptions {
377 row_group_size: 50,
378 ..Default::default()
379 };
380 let mut writer = ParquetWriter::new_with_object_store(
382 object_store.clone(),
383 metadata.clone(),
384 NoopIndexBuilder,
385 FixedPathProvider {
386 file_id: handle.file_id(),
387 },
388 )
389 .await;
390 writer
391 .write_all(source, None, &write_opts)
392 .await
393 .unwrap()
394 .remove(0);
395
396 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
397 let mut reader = builder.build().await.unwrap();
398 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
399 }
400
401 #[tokio::test]
402 async fn test_read_with_field_filter() {
403 let mut env = TestEnv::new();
404 let object_store = env.init_object_store_manager();
405 let handle = sst_file_handle(0, 1000);
406 let metadata = Arc::new(sst_region_metadata());
407 let source = new_source(&[
408 new_batch_by_range(&["a", "d"], 0, 60),
409 new_batch_by_range(&["b", "f"], 0, 40),
410 new_batch_by_range(&["b", "h"], 100, 200),
411 ]);
412 let write_opts = WriteOptions {
414 row_group_size: 50,
415 ..Default::default()
416 };
417 let mut writer = ParquetWriter::new_with_object_store(
419 object_store.clone(),
420 metadata.clone(),
421 NoopIndexBuilder,
422 FixedPathProvider {
423 file_id: handle.file_id(),
424 },
425 )
426 .await;
427
428 writer
429 .write_all(source, None, &write_opts)
430 .await
431 .unwrap()
432 .remove(0);
433
434 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
436 left: Box::new(Expr::Column(Column::from_name("field_0"))),
437 op: Operator::GtEq,
438 right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
439 })]));
440
441 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
442 .predicate(predicate);
443 let mut reader = builder.build().await.unwrap();
444 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
445 }
446
447 #[tokio::test]
448 async fn test_read_large_binary() {
449 let mut env = TestEnv::new();
450 let object_store = env.init_object_store_manager();
451 let handle = sst_file_handle(0, 1000);
452 let file_path = handle.file_path(FILE_DIR);
453
454 let write_opts = WriteOptions {
455 row_group_size: 50,
456 ..Default::default()
457 };
458
459 let metadata = build_test_binary_test_region_metadata();
460 let json = metadata.to_json().unwrap();
461 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
462
463 let props_builder = WriterProperties::builder()
464 .set_key_value_metadata(Some(vec![key_value_meta]))
465 .set_compression(Compression::ZSTD(ZstdLevel::default()))
466 .set_encoding(Encoding::PLAIN)
467 .set_max_row_group_size(write_opts.row_group_size);
468
469 let writer_props = props_builder.build();
470
471 let write_format = WriteFormat::new(metadata);
472 let fields: Vec<_> = write_format
473 .arrow_schema()
474 .fields()
475 .into_iter()
476 .map(|field| {
477 let data_type = field.data_type().clone();
478 if data_type == DataType::Binary {
479 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
480 } else {
481 Field::new(field.name(), data_type, field.is_nullable())
482 }
483 })
484 .collect();
485
486 let arrow_schema = Arc::new(Schema::new(fields));
487
488 assert_eq!(
490 &DataType::LargeBinary,
491 arrow_schema.field_with_name("field_0").unwrap().data_type()
492 );
493 let mut writer = AsyncArrowWriter::try_new(
494 object_store
495 .writer_with(&file_path)
496 .concurrent(DEFAULT_WRITE_CONCURRENCY)
497 .await
498 .map(|w| w.into_futures_async_write().compat_write())
499 .unwrap(),
500 arrow_schema.clone(),
501 Some(writer_props),
502 )
503 .unwrap();
504
505 let batch = new_batch_with_binary(&["a"], 0, 60);
506 let arrow_batch = write_format.convert_batch(&batch).unwrap();
507 let arrays: Vec<_> = arrow_batch
508 .columns()
509 .iter()
510 .map(|array| {
511 let data_type = array.data_type().clone();
512 if data_type == DataType::Binary {
513 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
514 } else {
515 array.clone()
516 }
517 })
518 .collect();
519 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
520
521 writer.write(&result).await.unwrap();
522 writer.close().await.unwrap();
523
524 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
525 let mut reader = builder.build().await.unwrap();
526 check_reader_result(
527 &mut reader,
528 &[
529 new_batch_with_binary(&["a"], 0, 50),
530 new_batch_with_binary(&["a"], 50, 60),
531 ],
532 )
533 .await;
534 }
535}