mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53}
54
55impl Default for WriteOptions {
56    fn default() -> Self {
57        WriteOptions {
58            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
59            row_group_size: DEFAULT_ROW_GROUP_SIZE,
60        }
61    }
62}
63
64/// Parquet SST info returned by the writer.
65#[derive(Debug)]
66pub struct SstInfo {
67    /// SST file id.
68    pub file_id: FileId,
69    /// Time range of the SST. The timestamps have the same time unit as the
70    /// data in the SST.
71    pub time_range: FileTimeRange,
72    /// File size in bytes.
73    pub file_size: u64,
74    /// Number of rows.
75    pub num_rows: usize,
76    /// Number of row groups
77    pub num_row_groups: u64,
78    /// File Meta Data
79    pub file_metadata: Option<Arc<ParquetMetaData>>,
80    /// Index Meta Data
81    pub index_metadata: IndexOutput,
82}
83
84#[cfg(test)]
85mod tests {
86    use std::sync::Arc;
87
88    use common_time::Timestamp;
89    use datafusion_common::{Column, ScalarValue};
90    use datafusion_expr::{BinaryExpr, Expr, Operator};
91    use datatypes::arrow;
92    use datatypes::arrow::array::RecordBatch;
93    use datatypes::arrow::datatypes::{DataType, Field, Schema};
94    use parquet::arrow::AsyncArrowWriter;
95    use parquet::basic::{Compression, Encoding, ZstdLevel};
96    use parquet::file::metadata::KeyValue;
97    use parquet::file::properties::WriterProperties;
98    use table::predicate::Predicate;
99    use tokio_util::compat::FuturesAsyncWriteCompatExt;
100
101    use super::*;
102    use crate::access_layer::FilePathProvider;
103    use crate::cache::{CacheManager, CacheStrategy, PageKey};
104    use crate::sst::index::{Indexer, IndexerBuilder};
105    use crate::sst::parquet::format::WriteFormat;
106    use crate::sst::parquet::reader::ParquetReaderBuilder;
107    use crate::sst::parquet::writer::ParquetWriter;
108    use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
109    use crate::test_util::sst_util::{
110        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
111        new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
112    };
113    use crate::test_util::{check_reader_result, TestEnv};
114
115    const FILE_DIR: &str = "/";
116
117    #[derive(Clone)]
118    struct FixedPathProvider {
119        file_id: FileId,
120    }
121
122    impl FilePathProvider for FixedPathProvider {
123        fn build_index_file_path(&self, _file_id: FileId) -> String {
124            location::index_file_path(FILE_DIR, self.file_id)
125        }
126
127        fn build_sst_file_path(&self, _file_id: FileId) -> String {
128            location::sst_file_path(FILE_DIR, self.file_id)
129        }
130    }
131
132    struct NoopIndexBuilder;
133
134    #[async_trait::async_trait]
135    impl IndexerBuilder for NoopIndexBuilder {
136        async fn build(&self, _file_id: FileId) -> Indexer {
137            Indexer::default()
138        }
139    }
140
141    #[tokio::test]
142    async fn test_write_read() {
143        let mut env = TestEnv::new();
144        let object_store = env.init_object_store_manager();
145        let handle = sst_file_handle(0, 1000);
146        let file_path = FixedPathProvider {
147            file_id: handle.file_id(),
148        };
149        let metadata = Arc::new(sst_region_metadata());
150        let source = new_source(&[
151            new_batch_by_range(&["a", "d"], 0, 60),
152            new_batch_by_range(&["b", "f"], 0, 40),
153            new_batch_by_range(&["b", "h"], 100, 200),
154        ]);
155        // Use a small row group size for test.
156        let write_opts = WriteOptions {
157            row_group_size: 50,
158            ..Default::default()
159        };
160
161        let mut writer = ParquetWriter::new_with_object_store(
162            object_store.clone(),
163            metadata.clone(),
164            NoopIndexBuilder,
165            file_path,
166        )
167        .await;
168
169        let info = writer
170            .write_all(source, None, &write_opts)
171            .await
172            .unwrap()
173            .remove(0);
174        assert_eq!(200, info.num_rows);
175        assert!(info.file_size > 0);
176        assert_eq!(
177            (
178                Timestamp::new_millisecond(0),
179                Timestamp::new_millisecond(199)
180            ),
181            info.time_range
182        );
183
184        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
185        let mut reader = builder.build().await.unwrap();
186        check_reader_result(
187            &mut reader,
188            &[
189                new_batch_by_range(&["a", "d"], 0, 50),
190                new_batch_by_range(&["a", "d"], 50, 60),
191                new_batch_by_range(&["b", "f"], 0, 40),
192                new_batch_by_range(&["b", "h"], 100, 150),
193                new_batch_by_range(&["b", "h"], 150, 200),
194            ],
195        )
196        .await;
197    }
198
199    #[tokio::test]
200    async fn test_read_with_cache() {
201        let mut env = TestEnv::new();
202        let object_store = env.init_object_store_manager();
203        let handle = sst_file_handle(0, 1000);
204        let metadata = Arc::new(sst_region_metadata());
205        let source = new_source(&[
206            new_batch_by_range(&["a", "d"], 0, 60),
207            new_batch_by_range(&["b", "f"], 0, 40),
208            new_batch_by_range(&["b", "h"], 100, 200),
209        ]);
210        // Use a small row group size for test.
211        let write_opts = WriteOptions {
212            row_group_size: 50,
213            ..Default::default()
214        };
215        // Prepare data.
216        let mut writer = ParquetWriter::new_with_object_store(
217            object_store.clone(),
218            metadata.clone(),
219            NoopIndexBuilder,
220            FixedPathProvider {
221                file_id: handle.file_id(),
222            },
223        )
224        .await;
225
226        writer
227            .write_all(source, None, &write_opts)
228            .await
229            .unwrap()
230            .remove(0);
231
232        // Enable page cache.
233        let cache = CacheStrategy::EnableAll(Arc::new(
234            CacheManager::builder()
235                .page_cache_size(64 * 1024 * 1024)
236                .build(),
237        ));
238        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
239            .cache(cache.clone());
240        for _ in 0..3 {
241            let mut reader = builder.build().await.unwrap();
242            check_reader_result(
243                &mut reader,
244                &[
245                    new_batch_by_range(&["a", "d"], 0, 50),
246                    new_batch_by_range(&["a", "d"], 50, 60),
247                    new_batch_by_range(&["b", "f"], 0, 40),
248                    new_batch_by_range(&["b", "h"], 100, 150),
249                    new_batch_by_range(&["b", "h"], 150, 200),
250                ],
251            )
252            .await;
253        }
254
255        // Doesn't have compressed page cached.
256        let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
257        assert!(cache.get_pages(&page_key).is_none());
258
259        // Cache 4 row groups.
260        for i in 0..4 {
261            let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
262            assert!(cache.get_pages(&page_key).is_some());
263        }
264        let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
265        assert!(cache.get_pages(&page_key).is_none());
266    }
267
268    #[tokio::test]
269    async fn test_parquet_metadata_eq() {
270        // create test env
271        let mut env = crate::test_util::TestEnv::new();
272        let object_store = env.init_object_store_manager();
273        let handle = sst_file_handle(0, 1000);
274        let metadata = Arc::new(sst_region_metadata());
275        let source = new_source(&[
276            new_batch_by_range(&["a", "d"], 0, 60),
277            new_batch_by_range(&["b", "f"], 0, 40),
278            new_batch_by_range(&["b", "h"], 100, 200),
279        ]);
280        let write_opts = WriteOptions {
281            row_group_size: 50,
282            ..Default::default()
283        };
284
285        // write the sst file and get sst info
286        // sst info contains the parquet metadata, which is converted from FileMetaData
287        let mut writer = ParquetWriter::new_with_object_store(
288            object_store.clone(),
289            metadata.clone(),
290            NoopIndexBuilder,
291            FixedPathProvider {
292                file_id: handle.file_id(),
293            },
294        )
295        .await;
296
297        let sst_info = writer
298            .write_all(source, None, &write_opts)
299            .await
300            .unwrap()
301            .remove(0);
302        let writer_metadata = sst_info.file_metadata.unwrap();
303
304        // read the sst file metadata
305        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
306        let reader = builder.build().await.unwrap();
307        let reader_metadata = reader.parquet_metadata();
308
309        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
310    }
311
312    #[tokio::test]
313    async fn test_read_with_tag_filter() {
314        let mut env = TestEnv::new();
315        let object_store = env.init_object_store_manager();
316        let handle = sst_file_handle(0, 1000);
317        let metadata = Arc::new(sst_region_metadata());
318        let source = new_source(&[
319            new_batch_by_range(&["a", "d"], 0, 60),
320            new_batch_by_range(&["b", "f"], 0, 40),
321            new_batch_by_range(&["b", "h"], 100, 200),
322        ]);
323        // Use a small row group size for test.
324        let write_opts = WriteOptions {
325            row_group_size: 50,
326            ..Default::default()
327        };
328        // Prepare data.
329        let mut writer = ParquetWriter::new_with_object_store(
330            object_store.clone(),
331            metadata.clone(),
332            NoopIndexBuilder,
333            FixedPathProvider {
334                file_id: handle.file_id(),
335            },
336        )
337        .await;
338        writer
339            .write_all(source, None, &write_opts)
340            .await
341            .unwrap()
342            .remove(0);
343
344        // Predicate
345        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
346            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
347            op: Operator::Eq,
348            right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
349        })]));
350
351        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
352            .predicate(predicate);
353        let mut reader = builder.build().await.unwrap();
354        check_reader_result(
355            &mut reader,
356            &[
357                new_batch_by_range(&["a", "d"], 0, 50),
358                new_batch_by_range(&["a", "d"], 50, 60),
359            ],
360        )
361        .await;
362    }
363
364    #[tokio::test]
365    async fn test_read_empty_batch() {
366        let mut env = TestEnv::new();
367        let object_store = env.init_object_store_manager();
368        let handle = sst_file_handle(0, 1000);
369        let metadata = Arc::new(sst_region_metadata());
370        let source = new_source(&[
371            new_batch_by_range(&["a", "z"], 0, 0),
372            new_batch_by_range(&["a", "z"], 100, 100),
373            new_batch_by_range(&["a", "z"], 200, 230),
374        ]);
375        // Use a small row group size for test.
376        let write_opts = WriteOptions {
377            row_group_size: 50,
378            ..Default::default()
379        };
380        // Prepare data.
381        let mut writer = ParquetWriter::new_with_object_store(
382            object_store.clone(),
383            metadata.clone(),
384            NoopIndexBuilder,
385            FixedPathProvider {
386                file_id: handle.file_id(),
387            },
388        )
389        .await;
390        writer
391            .write_all(source, None, &write_opts)
392            .await
393            .unwrap()
394            .remove(0);
395
396        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
397        let mut reader = builder.build().await.unwrap();
398        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
399    }
400
401    #[tokio::test]
402    async fn test_read_with_field_filter() {
403        let mut env = TestEnv::new();
404        let object_store = env.init_object_store_manager();
405        let handle = sst_file_handle(0, 1000);
406        let metadata = Arc::new(sst_region_metadata());
407        let source = new_source(&[
408            new_batch_by_range(&["a", "d"], 0, 60),
409            new_batch_by_range(&["b", "f"], 0, 40),
410            new_batch_by_range(&["b", "h"], 100, 200),
411        ]);
412        // Use a small row group size for test.
413        let write_opts = WriteOptions {
414            row_group_size: 50,
415            ..Default::default()
416        };
417        // Prepare data.
418        let mut writer = ParquetWriter::new_with_object_store(
419            object_store.clone(),
420            metadata.clone(),
421            NoopIndexBuilder,
422            FixedPathProvider {
423                file_id: handle.file_id(),
424            },
425        )
426        .await;
427
428        writer
429            .write_all(source, None, &write_opts)
430            .await
431            .unwrap()
432            .remove(0);
433
434        // Predicate
435        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
436            left: Box::new(Expr::Column(Column::from_name("field_0"))),
437            op: Operator::GtEq,
438            right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
439        })]));
440
441        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
442            .predicate(predicate);
443        let mut reader = builder.build().await.unwrap();
444        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
445    }
446
447    #[tokio::test]
448    async fn test_read_large_binary() {
449        let mut env = TestEnv::new();
450        let object_store = env.init_object_store_manager();
451        let handle = sst_file_handle(0, 1000);
452        let file_path = handle.file_path(FILE_DIR);
453
454        let write_opts = WriteOptions {
455            row_group_size: 50,
456            ..Default::default()
457        };
458
459        let metadata = build_test_binary_test_region_metadata();
460        let json = metadata.to_json().unwrap();
461        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
462
463        let props_builder = WriterProperties::builder()
464            .set_key_value_metadata(Some(vec![key_value_meta]))
465            .set_compression(Compression::ZSTD(ZstdLevel::default()))
466            .set_encoding(Encoding::PLAIN)
467            .set_max_row_group_size(write_opts.row_group_size);
468
469        let writer_props = props_builder.build();
470
471        let write_format = WriteFormat::new(metadata);
472        let fields: Vec<_> = write_format
473            .arrow_schema()
474            .fields()
475            .into_iter()
476            .map(|field| {
477                let data_type = field.data_type().clone();
478                if data_type == DataType::Binary {
479                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
480                } else {
481                    Field::new(field.name(), data_type, field.is_nullable())
482                }
483            })
484            .collect();
485
486        let arrow_schema = Arc::new(Schema::new(fields));
487
488        // Ensures field_0 has LargeBinary type.
489        assert_eq!(
490            &DataType::LargeBinary,
491            arrow_schema.field_with_name("field_0").unwrap().data_type()
492        );
493        let mut writer = AsyncArrowWriter::try_new(
494            object_store
495                .writer_with(&file_path)
496                .concurrent(DEFAULT_WRITE_CONCURRENCY)
497                .await
498                .map(|w| w.into_futures_async_write().compat_write())
499                .unwrap(),
500            arrow_schema.clone(),
501            Some(writer_props),
502        )
503        .unwrap();
504
505        let batch = new_batch_with_binary(&["a"], 0, 60);
506        let arrow_batch = write_format.convert_batch(&batch).unwrap();
507        let arrays: Vec<_> = arrow_batch
508            .columns()
509            .iter()
510            .map(|array| {
511                let data_type = array.data_type().clone();
512                if data_type == DataType::Binary {
513                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
514                } else {
515                    array.clone()
516                }
517            })
518            .collect();
519        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
520
521        writer.write(&result).await.unwrap();
522        writer.close().await.unwrap();
523
524        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
525        let mut reader = builder.build().await.unwrap();
526        check_reader_result(
527            &mut reader,
528            &[
529                new_batch_with_binary(&["a"], 0, 50),
530                new_batch_with_binary(&["a"], 50, 60),
531            ],
532        )
533        .await;
534    }
535}