mito2/sst/parquet/
helper.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use bytes::Bytes;
20use common_telemetry::trace;
21use object_store::ObjectStore;
22use parquet::basic::ColumnOrder;
23use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
24use parquet::format;
25use parquet::schema::types::{SchemaDescriptor, from_thrift};
26use snafu::ResultExt;
27
28use crate::error;
29use crate::error::Result;
30
31// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
32/// Convert [format::FileMetaData] to [ParquetMetaData]
33pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
34    let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
35    let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
36
37    let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
38    for rg in t_file_metadata.row_groups {
39        row_groups.push(
40            RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
41                .context(error::ConvertMetaDataSnafu)?,
42        );
43    }
44    let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);
45
46    let file_metadata = FileMetaData::new(
47        t_file_metadata.version,
48        t_file_metadata.num_rows,
49        t_file_metadata.created_by,
50        t_file_metadata.key_value_metadata,
51        schema_desc_ptr,
52        column_orders,
53    );
54    // There may be a problem owing to lacking of column_index and offset_index,
55    // if we open page index in the future.
56    Ok(ParquetMetaData::new(file_metadata, row_groups))
57}
58
59// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
60/// Parses column orders from Thrift definition.
61/// If no column orders are defined, returns `None`.
62fn parse_column_orders(
63    t_column_orders: Option<Vec<format::ColumnOrder>>,
64    schema_descr: &SchemaDescriptor,
65) -> Option<Vec<ColumnOrder>> {
66    match t_column_orders {
67        Some(orders) => {
68            // Should always be the case
69            assert_eq!(
70                orders.len(),
71                schema_descr.num_columns(),
72                "Column order length mismatch"
73            );
74            let mut res = Vec::with_capacity(schema_descr.num_columns());
75            for (i, column) in schema_descr.columns().iter().enumerate() {
76                match orders[i] {
77                    format::ColumnOrder::TYPEORDER(_) => {
78                        let sort_order = ColumnOrder::get_sort_order(
79                            column.logical_type(),
80                            column.converted_type(),
81                            column.physical_type(),
82                        );
83                        res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
84                    }
85                }
86            }
87            Some(res)
88        }
89        None => None,
90    }
91}
92
93const FETCH_PARALLELISM: usize = 8;
94pub(crate) const MERGE_GAP: usize = 512 * 1024;
95
96/// Asynchronously fetches byte ranges from an object store.
97///
98/// * `FETCH_PARALLELISM` - The number of concurrent fetch operations.
99/// * `MERGE_GAP` - The maximum gap size (in bytes) to merge small byte ranges for optimized fetching.
100pub async fn fetch_byte_ranges(
101    file_path: &str,
102    object_store: ObjectStore,
103    ranges: &[Range<u64>],
104) -> object_store::Result<Vec<Bytes>> {
105    let total_size = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
106    let start = Instant::now();
107
108    let result = object_store
109        .reader_with(file_path)
110        .concurrent(FETCH_PARALLELISM)
111        .gap(MERGE_GAP)
112        .await?
113        .fetch(ranges.to_vec())
114        .await?
115        .into_iter()
116        .map(|buf| buf.to_bytes())
117        .collect::<Vec<_>>();
118
119    trace!(
120        "Fetch {} bytes from '{}' in object store, cost: {:?}",
121        total_size,
122        file_path,
123        start.elapsed()
124    );
125
126    Ok(result)
127}