mito2/sst/
parquet.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SST in parquet format.
16
17use std::sync::Arc;
18
19use common_base::readable_size::ReadableSize;
20use parquet::file::metadata::ParquetMetaData;
21
22use crate::sst::file::{FileId, FileTimeRange};
23use crate::sst::index::IndexOutput;
24use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
25
26pub(crate) mod file_range;
27pub mod format;
28pub(crate) mod helper;
29pub(crate) mod metadata;
30pub(crate) mod page_reader;
31pub mod plain_format;
32pub mod reader;
33pub mod row_group;
34pub mod row_selection;
35pub(crate) mod stats;
36pub mod writer;
37
38/// Key of metadata in parquet SST.
39pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
40
41/// Default batch size to read parquet files.
42pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
43/// Default row group size for parquet files.
44pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
45
46/// Parquet write options.
47#[derive(Debug)]
48pub struct WriteOptions {
49    /// Buffer size for async writer.
50    pub write_buffer_size: ReadableSize,
51    /// Row group size.
52    pub row_group_size: usize,
53    /// Max single output file size.
54    /// Note: This is not a hard limit as we can only observe the file size when
55    /// ArrowWrite writes to underlying writers.
56    pub max_file_size: Option<usize>,
57}
58
59impl Default for WriteOptions {
60    fn default() -> Self {
61        WriteOptions {
62            write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
63            row_group_size: DEFAULT_ROW_GROUP_SIZE,
64            max_file_size: None,
65        }
66    }
67}
68
69/// Parquet SST info returned by the writer.
70#[derive(Debug)]
71pub struct SstInfo {
72    /// SST file id.
73    pub file_id: FileId,
74    /// Time range of the SST. The timestamps have the same time unit as the
75    /// data in the SST.
76    pub time_range: FileTimeRange,
77    /// File size in bytes.
78    pub file_size: u64,
79    /// Number of rows.
80    pub num_rows: usize,
81    /// Number of row groups
82    pub num_row_groups: u64,
83    /// File Meta Data
84    pub file_metadata: Option<Arc<ParquetMetaData>>,
85    /// Index Meta Data
86    pub index_metadata: IndexOutput,
87}
88
89#[cfg(test)]
90mod tests {
91    use std::sync::Arc;
92
93    use common_time::Timestamp;
94    use datafusion_common::{Column, ScalarValue};
95    use datafusion_expr::{BinaryExpr, Expr, Operator};
96    use datatypes::arrow;
97    use datatypes::arrow::array::RecordBatch;
98    use datatypes::arrow::datatypes::{DataType, Field, Schema};
99    use parquet::arrow::AsyncArrowWriter;
100    use parquet::basic::{Compression, Encoding, ZstdLevel};
101    use parquet::file::metadata::KeyValue;
102    use parquet::file::properties::WriterProperties;
103    use table::predicate::Predicate;
104    use tokio_util::compat::FuturesAsyncWriteCompatExt;
105
106    use super::*;
107    use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
108    use crate::cache::{CacheManager, CacheStrategy, PageKey};
109    use crate::read::BatchReader;
110    use crate::sst::index::{Indexer, IndexerBuilder};
111    use crate::sst::parquet::format::WriteFormat;
112    use crate::sst::parquet::reader::ParquetReaderBuilder;
113    use crate::sst::parquet::writer::ParquetWriter;
114    use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
115    use crate::test_util::sst_util::{
116        assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
117        new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
118        sst_region_metadata,
119    };
120    use crate::test_util::{check_reader_result, TestEnv};
121
122    const FILE_DIR: &str = "/";
123
124    #[derive(Clone)]
125    struct FixedPathProvider {
126        file_id: FileId,
127    }
128
129    impl FilePathProvider for FixedPathProvider {
130        fn build_index_file_path(&self, _file_id: FileId) -> String {
131            location::index_file_path(FILE_DIR, self.file_id)
132        }
133
134        fn build_sst_file_path(&self, _file_id: FileId) -> String {
135            location::sst_file_path(FILE_DIR, self.file_id)
136        }
137    }
138
139    struct NoopIndexBuilder;
140
141    #[async_trait::async_trait]
142    impl IndexerBuilder for NoopIndexBuilder {
143        async fn build(&self, _file_id: FileId) -> Indexer {
144            Indexer::default()
145        }
146    }
147
148    #[tokio::test]
149    async fn test_write_read() {
150        let mut env = TestEnv::new();
151        let object_store = env.init_object_store_manager();
152        let handle = sst_file_handle(0, 1000);
153        let file_path = FixedPathProvider {
154            file_id: handle.file_id(),
155        };
156        let metadata = Arc::new(sst_region_metadata());
157        let source = new_source(&[
158            new_batch_by_range(&["a", "d"], 0, 60),
159            new_batch_by_range(&["b", "f"], 0, 40),
160            new_batch_by_range(&["b", "h"], 100, 200),
161        ]);
162        // Use a small row group size for test.
163        let write_opts = WriteOptions {
164            row_group_size: 50,
165            ..Default::default()
166        };
167
168        let mut writer = ParquetWriter::new_with_object_store(
169            object_store.clone(),
170            metadata.clone(),
171            NoopIndexBuilder,
172            file_path,
173        )
174        .await;
175
176        let info = writer
177            .write_all(source, None, &write_opts)
178            .await
179            .unwrap()
180            .remove(0);
181        assert_eq!(200, info.num_rows);
182        assert!(info.file_size > 0);
183        assert_eq!(
184            (
185                Timestamp::new_millisecond(0),
186                Timestamp::new_millisecond(199)
187            ),
188            info.time_range
189        );
190
191        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
192        let mut reader = builder.build().await.unwrap();
193        check_reader_result(
194            &mut reader,
195            &[
196                new_batch_by_range(&["a", "d"], 0, 50),
197                new_batch_by_range(&["a", "d"], 50, 60),
198                new_batch_by_range(&["b", "f"], 0, 40),
199                new_batch_by_range(&["b", "h"], 100, 150),
200                new_batch_by_range(&["b", "h"], 150, 200),
201            ],
202        )
203        .await;
204    }
205
206    #[tokio::test]
207    async fn test_read_with_cache() {
208        let mut env = TestEnv::new();
209        let object_store = env.init_object_store_manager();
210        let handle = sst_file_handle(0, 1000);
211        let metadata = Arc::new(sst_region_metadata());
212        let source = new_source(&[
213            new_batch_by_range(&["a", "d"], 0, 60),
214            new_batch_by_range(&["b", "f"], 0, 40),
215            new_batch_by_range(&["b", "h"], 100, 200),
216        ]);
217        // Use a small row group size for test.
218        let write_opts = WriteOptions {
219            row_group_size: 50,
220            ..Default::default()
221        };
222        // Prepare data.
223        let mut writer = ParquetWriter::new_with_object_store(
224            object_store.clone(),
225            metadata.clone(),
226            NoopIndexBuilder,
227            FixedPathProvider {
228                file_id: handle.file_id(),
229            },
230        )
231        .await;
232
233        writer
234            .write_all(source, None, &write_opts)
235            .await
236            .unwrap()
237            .remove(0);
238
239        // Enable page cache.
240        let cache = CacheStrategy::EnableAll(Arc::new(
241            CacheManager::builder()
242                .page_cache_size(64 * 1024 * 1024)
243                .build(),
244        ));
245        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
246            .cache(cache.clone());
247        for _ in 0..3 {
248            let mut reader = builder.build().await.unwrap();
249            check_reader_result(
250                &mut reader,
251                &[
252                    new_batch_by_range(&["a", "d"], 0, 50),
253                    new_batch_by_range(&["a", "d"], 50, 60),
254                    new_batch_by_range(&["b", "f"], 0, 40),
255                    new_batch_by_range(&["b", "h"], 100, 150),
256                    new_batch_by_range(&["b", "h"], 150, 200),
257                ],
258            )
259            .await;
260        }
261
262        // Doesn't have compressed page cached.
263        let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
264        assert!(cache.get_pages(&page_key).is_none());
265
266        // Cache 4 row groups.
267        for i in 0..4 {
268            let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
269            assert!(cache.get_pages(&page_key).is_some());
270        }
271        let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
272        assert!(cache.get_pages(&page_key).is_none());
273    }
274
275    #[tokio::test]
276    async fn test_parquet_metadata_eq() {
277        // create test env
278        let mut env = crate::test_util::TestEnv::new();
279        let object_store = env.init_object_store_manager();
280        let handle = sst_file_handle(0, 1000);
281        let metadata = Arc::new(sst_region_metadata());
282        let source = new_source(&[
283            new_batch_by_range(&["a", "d"], 0, 60),
284            new_batch_by_range(&["b", "f"], 0, 40),
285            new_batch_by_range(&["b", "h"], 100, 200),
286        ]);
287        let write_opts = WriteOptions {
288            row_group_size: 50,
289            ..Default::default()
290        };
291
292        // write the sst file and get sst info
293        // sst info contains the parquet metadata, which is converted from FileMetaData
294        let mut writer = ParquetWriter::new_with_object_store(
295            object_store.clone(),
296            metadata.clone(),
297            NoopIndexBuilder,
298            FixedPathProvider {
299                file_id: handle.file_id(),
300            },
301        )
302        .await;
303
304        let sst_info = writer
305            .write_all(source, None, &write_opts)
306            .await
307            .unwrap()
308            .remove(0);
309        let writer_metadata = sst_info.file_metadata.unwrap();
310
311        // read the sst file metadata
312        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
313        let reader = builder.build().await.unwrap();
314        let reader_metadata = reader.parquet_metadata();
315
316        assert_parquet_metadata_eq(writer_metadata, reader_metadata)
317    }
318
319    #[tokio::test]
320    async fn test_read_with_tag_filter() {
321        let mut env = TestEnv::new();
322        let object_store = env.init_object_store_manager();
323        let handle = sst_file_handle(0, 1000);
324        let metadata = Arc::new(sst_region_metadata());
325        let source = new_source(&[
326            new_batch_by_range(&["a", "d"], 0, 60),
327            new_batch_by_range(&["b", "f"], 0, 40),
328            new_batch_by_range(&["b", "h"], 100, 200),
329        ]);
330        // Use a small row group size for test.
331        let write_opts = WriteOptions {
332            row_group_size: 50,
333            ..Default::default()
334        };
335        // Prepare data.
336        let mut writer = ParquetWriter::new_with_object_store(
337            object_store.clone(),
338            metadata.clone(),
339            NoopIndexBuilder,
340            FixedPathProvider {
341                file_id: handle.file_id(),
342            },
343        )
344        .await;
345        writer
346            .write_all(source, None, &write_opts)
347            .await
348            .unwrap()
349            .remove(0);
350
351        // Predicate
352        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
353            left: Box::new(Expr::Column(Column::from_name("tag_0"))),
354            op: Operator::Eq,
355            right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
356        })]));
357
358        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
359            .predicate(predicate);
360        let mut reader = builder.build().await.unwrap();
361        check_reader_result(
362            &mut reader,
363            &[
364                new_batch_by_range(&["a", "d"], 0, 50),
365                new_batch_by_range(&["a", "d"], 50, 60),
366            ],
367        )
368        .await;
369    }
370
371    #[tokio::test]
372    async fn test_read_empty_batch() {
373        let mut env = TestEnv::new();
374        let object_store = env.init_object_store_manager();
375        let handle = sst_file_handle(0, 1000);
376        let metadata = Arc::new(sst_region_metadata());
377        let source = new_source(&[
378            new_batch_by_range(&["a", "z"], 0, 0),
379            new_batch_by_range(&["a", "z"], 100, 100),
380            new_batch_by_range(&["a", "z"], 200, 230),
381        ]);
382        // Use a small row group size for test.
383        let write_opts = WriteOptions {
384            row_group_size: 50,
385            ..Default::default()
386        };
387        // Prepare data.
388        let mut writer = ParquetWriter::new_with_object_store(
389            object_store.clone(),
390            metadata.clone(),
391            NoopIndexBuilder,
392            FixedPathProvider {
393                file_id: handle.file_id(),
394            },
395        )
396        .await;
397        writer
398            .write_all(source, None, &write_opts)
399            .await
400            .unwrap()
401            .remove(0);
402
403        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
404        let mut reader = builder.build().await.unwrap();
405        check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
406    }
407
408    #[tokio::test]
409    async fn test_read_with_field_filter() {
410        let mut env = TestEnv::new();
411        let object_store = env.init_object_store_manager();
412        let handle = sst_file_handle(0, 1000);
413        let metadata = Arc::new(sst_region_metadata());
414        let source = new_source(&[
415            new_batch_by_range(&["a", "d"], 0, 60),
416            new_batch_by_range(&["b", "f"], 0, 40),
417            new_batch_by_range(&["b", "h"], 100, 200),
418        ]);
419        // Use a small row group size for test.
420        let write_opts = WriteOptions {
421            row_group_size: 50,
422            ..Default::default()
423        };
424        // Prepare data.
425        let mut writer = ParquetWriter::new_with_object_store(
426            object_store.clone(),
427            metadata.clone(),
428            NoopIndexBuilder,
429            FixedPathProvider {
430                file_id: handle.file_id(),
431            },
432        )
433        .await;
434
435        writer
436            .write_all(source, None, &write_opts)
437            .await
438            .unwrap()
439            .remove(0);
440
441        // Predicate
442        let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
443            left: Box::new(Expr::Column(Column::from_name("field_0"))),
444            op: Operator::GtEq,
445            right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
446        })]));
447
448        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
449            .predicate(predicate);
450        let mut reader = builder.build().await.unwrap();
451        check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
452    }
453
454    #[tokio::test]
455    async fn test_read_large_binary() {
456        let mut env = TestEnv::new();
457        let object_store = env.init_object_store_manager();
458        let handle = sst_file_handle(0, 1000);
459        let file_path = handle.file_path(FILE_DIR);
460
461        let write_opts = WriteOptions {
462            row_group_size: 50,
463            ..Default::default()
464        };
465
466        let metadata = build_test_binary_test_region_metadata();
467        let json = metadata.to_json().unwrap();
468        let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
469
470        let props_builder = WriterProperties::builder()
471            .set_key_value_metadata(Some(vec![key_value_meta]))
472            .set_compression(Compression::ZSTD(ZstdLevel::default()))
473            .set_encoding(Encoding::PLAIN)
474            .set_max_row_group_size(write_opts.row_group_size);
475
476        let writer_props = props_builder.build();
477
478        let write_format = WriteFormat::new(metadata);
479        let fields: Vec<_> = write_format
480            .arrow_schema()
481            .fields()
482            .into_iter()
483            .map(|field| {
484                let data_type = field.data_type().clone();
485                if data_type == DataType::Binary {
486                    Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
487                } else {
488                    Field::new(field.name(), data_type, field.is_nullable())
489                }
490            })
491            .collect();
492
493        let arrow_schema = Arc::new(Schema::new(fields));
494
495        // Ensures field_0 has LargeBinary type.
496        assert_eq!(
497            &DataType::LargeBinary,
498            arrow_schema.field_with_name("field_0").unwrap().data_type()
499        );
500        let mut writer = AsyncArrowWriter::try_new(
501            object_store
502                .writer_with(&file_path)
503                .concurrent(DEFAULT_WRITE_CONCURRENCY)
504                .await
505                .map(|w| w.into_futures_async_write().compat_write())
506                .unwrap(),
507            arrow_schema.clone(),
508            Some(writer_props),
509        )
510        .unwrap();
511
512        let batch = new_batch_with_binary(&["a"], 0, 60);
513        let arrow_batch = write_format.convert_batch(&batch).unwrap();
514        let arrays: Vec<_> = arrow_batch
515            .columns()
516            .iter()
517            .map(|array| {
518                let data_type = array.data_type().clone();
519                if data_type == DataType::Binary {
520                    arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
521                } else {
522                    array.clone()
523                }
524            })
525            .collect();
526        let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
527
528        writer.write(&result).await.unwrap();
529        writer.close().await.unwrap();
530
531        let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
532        let mut reader = builder.build().await.unwrap();
533        check_reader_result(
534            &mut reader,
535            &[
536                new_batch_with_binary(&["a"], 0, 50),
537                new_batch_with_binary(&["a"], 50, 60),
538            ],
539        )
540        .await;
541    }
542
543    #[tokio::test]
544    async fn test_write_multiple_files() {
545        common_telemetry::init_default_ut_logging();
546        // create test env
547        let mut env = TestEnv::new();
548        let object_store = env.init_object_store_manager();
549        let metadata = Arc::new(sst_region_metadata());
550        let batches = &[
551            new_batch_by_range(&["a", "d"], 0, 1000),
552            new_batch_by_range(&["b", "f"], 0, 1000),
553            new_batch_by_range(&["b", "h"], 100, 200),
554            new_batch_by_range(&["b", "h"], 200, 300),
555            new_batch_by_range(&["b", "h"], 300, 1000),
556        ];
557        let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
558
559        let source = new_source(batches);
560        let write_opts = WriteOptions {
561            row_group_size: 50,
562            max_file_size: Some(1024 * 16),
563            ..Default::default()
564        };
565
566        let path_provider = RegionFilePathFactory {
567            region_dir: "test".to_string(),
568        };
569        let mut writer = ParquetWriter::new_with_object_store(
570            object_store.clone(),
571            metadata.clone(),
572            NoopIndexBuilder,
573            path_provider,
574        )
575        .await;
576
577        let files = writer.write_all(source, None, &write_opts).await.unwrap();
578        assert_eq!(2, files.len());
579
580        let mut rows_read = 0;
581        for f in &files {
582            let file_handle = sst_file_handle_with_file_id(
583                f.file_id,
584                f.time_range.0.value(),
585                f.time_range.1.value(),
586            );
587            let builder =
588                ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
589            let mut reader = builder.build().await.unwrap();
590            while let Some(batch) = reader.next_batch().await.unwrap() {
591                rows_read += batch.num_rows();
592            }
593        }
594        assert_eq!(total_rows, rows_read);
595    }
596}