mito2/sst/parquet/
helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use object_store::ObjectStore;
20use parquet::basic::ColumnOrder;
21use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
22use parquet::format;
23use parquet::schema::types::{from_thrift, SchemaDescriptor};
24use snafu::ResultExt;
25
26use crate::error;
27use crate::error::Result;
28
29// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
30/// Convert [format::FileMetaData] to [ParquetMetaData]
31pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
32    let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
33    let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
34
35    let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
36    for rg in t_file_metadata.row_groups {
37        row_groups.push(
38            RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
39                .context(error::ConvertMetaDataSnafu)?,
40        );
41    }
42    let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);
43
44    let file_metadata = FileMetaData::new(
45        t_file_metadata.version,
46        t_file_metadata.num_rows,
47        t_file_metadata.created_by,
48        t_file_metadata.key_value_metadata,
49        schema_desc_ptr,
50        column_orders,
51    );
52    // There may be a problem owing to lacking of column_index and offset_index,
53    // if we open page index in the future.
54    Ok(ParquetMetaData::new(file_metadata, row_groups))
55}
56
57// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
58/// Parses column orders from Thrift definition.
59/// If no column orders are defined, returns `None`.
60fn parse_column_orders(
61    t_column_orders: Option<Vec<format::ColumnOrder>>,
62    schema_descr: &SchemaDescriptor,
63) -> Option<Vec<ColumnOrder>> {
64    match t_column_orders {
65        Some(orders) => {
66            // Should always be the case
67            assert_eq!(
68                orders.len(),
69                schema_descr.num_columns(),
70                "Column order length mismatch"
71            );
72            let mut res = Vec::with_capacity(schema_descr.num_columns());
73            for (i, column) in schema_descr.columns().iter().enumerate() {
74                match orders[i] {
75                    format::ColumnOrder::TYPEORDER(_) => {
76                        let sort_order = ColumnOrder::get_sort_order(
77                            column.logical_type(),
78                            column.converted_type(),
79                            column.physical_type(),
80                        );
81                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
82                    }
83                }
84            }
85            Some(res)
86        }
87        None => None,
88    }
89}
90
91const FETCH_PARALLELISM: usize = 8;
92const MERGE_GAP: usize = 512 * 1024;
93
94/// Asynchronously fetches byte ranges from an object store.
95///
96/// * `FETCH_PARALLELISM` - The number of concurrent fetch operations.
97/// * `MERGE_GAP` - The maximum gap size (in bytes) to merge small byte ranges for optimized fetching.
98pub async fn fetch_byte_ranges(
99    file_path: &str,
100    object_store: ObjectStore,
101    ranges: &[Range<u64>],
102) -> object_store::Result<Vec<Bytes>> {
103    Ok(object_store
104        .reader_with(file_path)
105        .concurrent(FETCH_PARALLELISM)
106        .gap(MERGE_GAP)
107        .await?
108        .fetch(ranges.to_vec())
109        .await?
110        .into_iter()
111        .map(|buf| buf.to_bytes())
112        .collect::<Vec<_>>())
113}