1use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46#[derive(Debug)]
48pub struct WriteOptions {
49 pub write_buffer_size: ReadableSize,
51 pub row_group_size: usize,
53 pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60 fn default() -> Self {
61 WriteOptions {
62 write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63 row_group_size: DEFAULT_ROW_GROUP_SIZE,
64 max_file_size: None,
65 }
66 }
67}
68
69#[derive(Debug)]
71pub struct SstInfo {
72 pub file_id: FileId,
74 pub time_range: FileTimeRange,
77 pub file_size: u64,
79 pub num_rows: usize,
81 pub num_row_groups: u64,
83 pub file_metadata: Option<Arc<ParquetMetaData>>,
85 pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91 use std::sync::Arc;
92
93 use common_time::Timestamp;
94 use datafusion_common::{Column, ScalarValue};
95 use datafusion_expr::{BinaryExpr, Expr, Operator};
96 use datatypes::arrow;
97 use datatypes::arrow::array::RecordBatch;
98 use datatypes::arrow::datatypes::{DataType, Field, Schema};
99 use parquet::arrow::AsyncArrowWriter;
100 use parquet::basic::{Compression, Encoding, ZstdLevel};
101 use parquet::file::metadata::KeyValue;
102 use parquet::file::properties::WriterProperties;
103 use table::predicate::Predicate;
104 use tokio_util::compat::FuturesAsyncWriteCompatExt;
105
106 use super::*;
107 use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
108 use crate::cache::{CacheManager, CacheStrategy, PageKey};
109 use crate::read::BatchReader;
110 use crate::sst::index::{Indexer, IndexerBuilder};
111 use crate::sst::parquet::format::WriteFormat;
112 use crate::sst::parquet::reader::ParquetReaderBuilder;
113 use crate::sst::parquet::writer::ParquetWriter;
114 use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
115 use crate::test_util::sst_util::{
116 assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
117 new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
118 sst_region_metadata,
119 };
120 use crate::test_util::{check_reader_result, TestEnv};
121
122 const FILE_DIR: &str = "/";
123
124 #[derive(Clone)]
125 struct FixedPathProvider {
126 file_id: FileId,
127 }
128
129 impl FilePathProvider for FixedPathProvider {
130 fn build_index_file_path(&self, _file_id: FileId) -> String {
131 location::index_file_path(FILE_DIR, self.file_id)
132 }
133
134 fn build_sst_file_path(&self, _file_id: FileId) -> String {
135 location::sst_file_path(FILE_DIR, self.file_id)
136 }
137 }
138
139 struct NoopIndexBuilder;
140
141 #[async_trait::async_trait]
142 impl IndexerBuilder for NoopIndexBuilder {
143 async fn build(&self, _file_id: FileId) -> Indexer {
144 Indexer::default()
145 }
146 }
147
148 #[tokio::test]
149 async fn test_write_read() {
150 let mut env = TestEnv::new();
151 let object_store = env.init_object_store_manager();
152 let handle = sst_file_handle(0, 1000);
153 let file_path = FixedPathProvider {
154 file_id: handle.file_id(),
155 };
156 let metadata = Arc::new(sst_region_metadata());
157 let source = new_source(&[
158 new_batch_by_range(&["a", "d"], 0, 60),
159 new_batch_by_range(&["b", "f"], 0, 40),
160 new_batch_by_range(&["b", "h"], 100, 200),
161 ]);
162 let write_opts = WriteOptions {
164 row_group_size: 50,
165 ..Default::default()
166 };
167
168 let mut writer = ParquetWriter::new_with_object_store(
169 object_store.clone(),
170 metadata.clone(),
171 NoopIndexBuilder,
172 file_path,
173 )
174 .await;
175
176 let info = writer
177 .write_all(source, None, &write_opts)
178 .await
179 .unwrap()
180 .remove(0);
181 assert_eq!(200, info.num_rows);
182 assert!(info.file_size > 0);
183 assert_eq!(
184 (
185 Timestamp::new_millisecond(0),
186 Timestamp::new_millisecond(199)
187 ),
188 info.time_range
189 );
190
191 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
192 let mut reader = builder.build().await.unwrap();
193 check_reader_result(
194 &mut reader,
195 &[
196 new_batch_by_range(&["a", "d"], 0, 50),
197 new_batch_by_range(&["a", "d"], 50, 60),
198 new_batch_by_range(&["b", "f"], 0, 40),
199 new_batch_by_range(&["b", "h"], 100, 150),
200 new_batch_by_range(&["b", "h"], 150, 200),
201 ],
202 )
203 .await;
204 }
205
206 #[tokio::test]
207 async fn test_read_with_cache() {
208 let mut env = TestEnv::new();
209 let object_store = env.init_object_store_manager();
210 let handle = sst_file_handle(0, 1000);
211 let metadata = Arc::new(sst_region_metadata());
212 let source = new_source(&[
213 new_batch_by_range(&["a", "d"], 0, 60),
214 new_batch_by_range(&["b", "f"], 0, 40),
215 new_batch_by_range(&["b", "h"], 100, 200),
216 ]);
217 let write_opts = WriteOptions {
219 row_group_size: 50,
220 ..Default::default()
221 };
222 let mut writer = ParquetWriter::new_with_object_store(
224 object_store.clone(),
225 metadata.clone(),
226 NoopIndexBuilder,
227 FixedPathProvider {
228 file_id: handle.file_id(),
229 },
230 )
231 .await;
232
233 writer
234 .write_all(source, None, &write_opts)
235 .await
236 .unwrap()
237 .remove(0);
238
239 let cache = CacheStrategy::EnableAll(Arc::new(
241 CacheManager::builder()
242 .page_cache_size(64 * 1024 * 1024)
243 .build(),
244 ));
245 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
246 .cache(cache.clone());
247 for _ in 0..3 {
248 let mut reader = builder.build().await.unwrap();
249 check_reader_result(
250 &mut reader,
251 &[
252 new_batch_by_range(&["a", "d"], 0, 50),
253 new_batch_by_range(&["a", "d"], 50, 60),
254 new_batch_by_range(&["b", "f"], 0, 40),
255 new_batch_by_range(&["b", "h"], 100, 150),
256 new_batch_by_range(&["b", "h"], 150, 200),
257 ],
258 )
259 .await;
260 }
261
262 let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
264 assert!(cache.get_pages(&page_key).is_none());
265
266 for i in 0..4 {
268 let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
269 assert!(cache.get_pages(&page_key).is_some());
270 }
271 let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
272 assert!(cache.get_pages(&page_key).is_none());
273 }
274
275 #[tokio::test]
276 async fn test_parquet_metadata_eq() {
277 let mut env = crate::test_util::TestEnv::new();
279 let object_store = env.init_object_store_manager();
280 let handle = sst_file_handle(0, 1000);
281 let metadata = Arc::new(sst_region_metadata());
282 let source = new_source(&[
283 new_batch_by_range(&["a", "d"], 0, 60),
284 new_batch_by_range(&["b", "f"], 0, 40),
285 new_batch_by_range(&["b", "h"], 100, 200),
286 ]);
287 let write_opts = WriteOptions {
288 row_group_size: 50,
289 ..Default::default()
290 };
291
292 let mut writer = ParquetWriter::new_with_object_store(
295 object_store.clone(),
296 metadata.clone(),
297 NoopIndexBuilder,
298 FixedPathProvider {
299 file_id: handle.file_id(),
300 },
301 )
302 .await;
303
304 let sst_info = writer
305 .write_all(source, None, &write_opts)
306 .await
307 .unwrap()
308 .remove(0);
309 let writer_metadata = sst_info.file_metadata.unwrap();
310
311 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
313 let reader = builder.build().await.unwrap();
314 let reader_metadata = reader.parquet_metadata();
315
316 assert_parquet_metadata_eq(writer_metadata, reader_metadata)
317 }
318
319 #[tokio::test]
320 async fn test_read_with_tag_filter() {
321 let mut env = TestEnv::new();
322 let object_store = env.init_object_store_manager();
323 let handle = sst_file_handle(0, 1000);
324 let metadata = Arc::new(sst_region_metadata());
325 let source = new_source(&[
326 new_batch_by_range(&["a", "d"], 0, 60),
327 new_batch_by_range(&["b", "f"], 0, 40),
328 new_batch_by_range(&["b", "h"], 100, 200),
329 ]);
330 let write_opts = WriteOptions {
332 row_group_size: 50,
333 ..Default::default()
334 };
335 let mut writer = ParquetWriter::new_with_object_store(
337 object_store.clone(),
338 metadata.clone(),
339 NoopIndexBuilder,
340 FixedPathProvider {
341 file_id: handle.file_id(),
342 },
343 )
344 .await;
345 writer
346 .write_all(source, None, &write_opts)
347 .await
348 .unwrap()
349 .remove(0);
350
351 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
353 left: Box::new(Expr::Column(Column::from_name("tag_0"))),
354 op: Operator::Eq,
355 right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
356 })]));
357
358 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
359 .predicate(predicate);
360 let mut reader = builder.build().await.unwrap();
361 check_reader_result(
362 &mut reader,
363 &[
364 new_batch_by_range(&["a", "d"], 0, 50),
365 new_batch_by_range(&["a", "d"], 50, 60),
366 ],
367 )
368 .await;
369 }
370
371 #[tokio::test]
372 async fn test_read_empty_batch() {
373 let mut env = TestEnv::new();
374 let object_store = env.init_object_store_manager();
375 let handle = sst_file_handle(0, 1000);
376 let metadata = Arc::new(sst_region_metadata());
377 let source = new_source(&[
378 new_batch_by_range(&["a", "z"], 0, 0),
379 new_batch_by_range(&["a", "z"], 100, 100),
380 new_batch_by_range(&["a", "z"], 200, 230),
381 ]);
382 let write_opts = WriteOptions {
384 row_group_size: 50,
385 ..Default::default()
386 };
387 let mut writer = ParquetWriter::new_with_object_store(
389 object_store.clone(),
390 metadata.clone(),
391 NoopIndexBuilder,
392 FixedPathProvider {
393 file_id: handle.file_id(),
394 },
395 )
396 .await;
397 writer
398 .write_all(source, None, &write_opts)
399 .await
400 .unwrap()
401 .remove(0);
402
403 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
404 let mut reader = builder.build().await.unwrap();
405 check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
406 }
407
408 #[tokio::test]
409 async fn test_read_with_field_filter() {
410 let mut env = TestEnv::new();
411 let object_store = env.init_object_store_manager();
412 let handle = sst_file_handle(0, 1000);
413 let metadata = Arc::new(sst_region_metadata());
414 let source = new_source(&[
415 new_batch_by_range(&["a", "d"], 0, 60),
416 new_batch_by_range(&["b", "f"], 0, 40),
417 new_batch_by_range(&["b", "h"], 100, 200),
418 ]);
419 let write_opts = WriteOptions {
421 row_group_size: 50,
422 ..Default::default()
423 };
424 let mut writer = ParquetWriter::new_with_object_store(
426 object_store.clone(),
427 metadata.clone(),
428 NoopIndexBuilder,
429 FixedPathProvider {
430 file_id: handle.file_id(),
431 },
432 )
433 .await;
434
435 writer
436 .write_all(source, None, &write_opts)
437 .await
438 .unwrap()
439 .remove(0);
440
441 let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
443 left: Box::new(Expr::Column(Column::from_name("field_0"))),
444 op: Operator::GtEq,
445 right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
446 })]));
447
448 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
449 .predicate(predicate);
450 let mut reader = builder.build().await.unwrap();
451 check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
452 }
453
454 #[tokio::test]
455 async fn test_read_large_binary() {
456 let mut env = TestEnv::new();
457 let object_store = env.init_object_store_manager();
458 let handle = sst_file_handle(0, 1000);
459 let file_path = handle.file_path(FILE_DIR);
460
461 let write_opts = WriteOptions {
462 row_group_size: 50,
463 ..Default::default()
464 };
465
466 let metadata = build_test_binary_test_region_metadata();
467 let json = metadata.to_json().unwrap();
468 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
469
470 let props_builder = WriterProperties::builder()
471 .set_key_value_metadata(Some(vec![key_value_meta]))
472 .set_compression(Compression::ZSTD(ZstdLevel::default()))
473 .set_encoding(Encoding::PLAIN)
474 .set_max_row_group_size(write_opts.row_group_size);
475
476 let writer_props = props_builder.build();
477
478 let write_format = WriteFormat::new(metadata);
479 let fields: Vec<_> = write_format
480 .arrow_schema()
481 .fields()
482 .into_iter()
483 .map(|field| {
484 let data_type = field.data_type().clone();
485 if data_type == DataType::Binary {
486 Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
487 } else {
488 Field::new(field.name(), data_type, field.is_nullable())
489 }
490 })
491 .collect();
492
493 let arrow_schema = Arc::new(Schema::new(fields));
494
495 assert_eq!(
497 &DataType::LargeBinary,
498 arrow_schema.field_with_name("field_0").unwrap().data_type()
499 );
500 let mut writer = AsyncArrowWriter::try_new(
501 object_store
502 .writer_with(&file_path)
503 .concurrent(DEFAULT_WRITE_CONCURRENCY)
504 .await
505 .map(|w| w.into_futures_async_write().compat_write())
506 .unwrap(),
507 arrow_schema.clone(),
508 Some(writer_props),
509 )
510 .unwrap();
511
512 let batch = new_batch_with_binary(&["a"], 0, 60);
513 let arrow_batch = write_format.convert_batch(&batch).unwrap();
514 let arrays: Vec<_> = arrow_batch
515 .columns()
516 .iter()
517 .map(|array| {
518 let data_type = array.data_type().clone();
519 if data_type == DataType::Binary {
520 arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
521 } else {
522 array.clone()
523 }
524 })
525 .collect();
526 let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
527
528 writer.write(&result).await.unwrap();
529 writer.close().await.unwrap();
530
531 let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
532 let mut reader = builder.build().await.unwrap();
533 check_reader_result(
534 &mut reader,
535 &[
536 new_batch_with_binary(&["a"], 0, 50),
537 new_batch_with_binary(&["a"], 50, 60),
538 ],
539 )
540 .await;
541 }
542
543 #[tokio::test]
544 async fn test_write_multiple_files() {
545 common_telemetry::init_default_ut_logging();
546 let mut env = TestEnv::new();
548 let object_store = env.init_object_store_manager();
549 let metadata = Arc::new(sst_region_metadata());
550 let batches = &[
551 new_batch_by_range(&["a", "d"], 0, 1000),
552 new_batch_by_range(&["b", "f"], 0, 1000),
553 new_batch_by_range(&["b", "h"], 100, 200),
554 new_batch_by_range(&["b", "h"], 200, 300),
555 new_batch_by_range(&["b", "h"], 300, 1000),
556 ];
557 let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
558
559 let source = new_source(batches);
560 let write_opts = WriteOptions {
561 row_group_size: 50,
562 max_file_size: Some(1024 * 16),
563 ..Default::default()
564 };
565
566 let path_provider = RegionFilePathFactory {
567 region_dir: "test".to_string(),
568 };
569 let mut writer = ParquetWriter::new_with_object_store(
570 object_store.clone(),
571 metadata.clone(),
572 NoopIndexBuilder,
573 path_provider,
574 )
575 .await;
576
577 let files = writer.write_all(source, None, &write_opts).await.unwrap();
578 assert_eq!(2, files.len());
579
580 let mut rows_read = 0;
581 for f in &files {
582 let file_handle = sst_file_handle_with_file_id(
583 f.file_id,
584 f.time_range.0.value(),
585 f.time_range.1.value(),
586 );
587 let builder =
588 ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
589 let mut reader = builder.build().await.unwrap();
590 while let Some(batch) = reader.next_batch().await.unwrap() {
591 rows_read += batch.num_rows();
592 }
593 }
594 assert_eq!(total_rows, rows_read);
595 }
596}