mito2/sst/parquet/
helper.rs1use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use object_store::ObjectStore;
20use parquet::basic::ColumnOrder;
21use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
22use parquet::format;
23use parquet::schema::types::{from_thrift, SchemaDescriptor};
24use snafu::ResultExt;
25
26use crate::error;
27use crate::error::Result;
28
29pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
32 let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
33 let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
34
35 let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
36 for rg in t_file_metadata.row_groups {
37 row_groups.push(
38 RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
39 .context(error::ConvertMetaDataSnafu)?,
40 );
41 }
42 let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);
43
44 let file_metadata = FileMetaData::new(
45 t_file_metadata.version,
46 t_file_metadata.num_rows,
47 t_file_metadata.created_by,
48 t_file_metadata.key_value_metadata,
49 schema_desc_ptr,
50 column_orders,
51 );
52 Ok(ParquetMetaData::new(file_metadata, row_groups))
55}
56
57fn parse_column_orders(
61 t_column_orders: Option<Vec<format::ColumnOrder>>,
62 schema_descr: &SchemaDescriptor,
63) -> Option<Vec<ColumnOrder>> {
64 match t_column_orders {
65 Some(orders) => {
66 assert_eq!(
68 orders.len(),
69 schema_descr.num_columns(),
70 "Column order length mismatch"
71 );
72 let mut res = Vec::with_capacity(schema_descr.num_columns());
73 for (i, column) in schema_descr.columns().iter().enumerate() {
74 match orders[i] {
75 format::ColumnOrder::TYPEORDER(_) => {
76 let sort_order = ColumnOrder::get_sort_order(
77 column.logical_type(),
78 column.converted_type(),
79 column.physical_type(),
80 );
81 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
82 }
83 }
84 }
85 Some(res)
86 }
87 None => None,
88 }
89}
90
91const FETCH_PARALLELISM: usize = 8;
92const MERGE_GAP: usize = 512 * 1024;
93
94pub async fn fetch_byte_ranges(
99 file_path: &str,
100 object_store: ObjectStore,
101 ranges: &[Range<u64>],
102) -> object_store::Result<Vec<Bytes>> {
103 Ok(object_store
104 .reader_with(file_path)
105 .concurrent(FETCH_PARALLELISM)
106 .gap(MERGE_GAP)
107 .await?
108 .fetch(ranges.to_vec())
109 .await?
110 .into_iter()
111 .map(|buf| buf.to_bytes())
112 .collect::<Vec<_>>())
113}