mito2/sst/parquet/
helper.rs1use std::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use bytes::Bytes;
20use common_telemetry::trace;
21use object_store::ObjectStore;
22use parquet::basic::ColumnOrder;
23use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
24use parquet::format;
25use parquet::schema::types::{SchemaDescriptor, from_thrift};
26use snafu::ResultExt;
27
28use crate::error;
29use crate::error::Result;
30
31pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
34 let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
35 let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
36
37 let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
38 for rg in t_file_metadata.row_groups {
39 row_groups.push(
40 RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
41 .context(error::ConvertMetaDataSnafu)?,
42 );
43 }
44 let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);
45
46 let file_metadata = FileMetaData::new(
47 t_file_metadata.version,
48 t_file_metadata.num_rows,
49 t_file_metadata.created_by,
50 t_file_metadata.key_value_metadata,
51 schema_desc_ptr,
52 column_orders,
53 );
54 Ok(ParquetMetaData::new(file_metadata, row_groups))
57}
58
59fn parse_column_orders(
63 t_column_orders: Option<Vec<format::ColumnOrder>>,
64 schema_descr: &SchemaDescriptor,
65) -> Option<Vec<ColumnOrder>> {
66 match t_column_orders {
67 Some(orders) => {
68 assert_eq!(
70 orders.len(),
71 schema_descr.num_columns(),
72 "Column order length mismatch"
73 );
74 let mut res = Vec::with_capacity(schema_descr.num_columns());
75 for (i, column) in schema_descr.columns().iter().enumerate() {
76 match orders[i] {
77 format::ColumnOrder::TYPEORDER(_) => {
78 let sort_order = ColumnOrder::get_sort_order(
79 column.logical_type(),
80 column.converted_type(),
81 column.physical_type(),
82 );
83 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
84 }
85 }
86 }
87 Some(res)
88 }
89 None => None,
90 }
91}
92
93const FETCH_PARALLELISM: usize = 8;
94pub(crate) const MERGE_GAP: usize = 512 * 1024;
95
96pub async fn fetch_byte_ranges(
101 file_path: &str,
102 object_store: ObjectStore,
103 ranges: &[Range<u64>],
104) -> object_store::Result<Vec<Bytes>> {
105 let total_size = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
106 let start = Instant::now();
107
108 let result = object_store
109 .reader_with(file_path)
110 .concurrent(FETCH_PARALLELISM)
111 .gap(MERGE_GAP)
112 .await?
113 .fetch(ranges.to_vec())
114 .await?
115 .into_iter()
116 .map(|buf| buf.to_bytes())
117 .collect::<Vec<_>>();
118
119 trace!(
120 "Fetch {} bytes from '{}' in object store, cost: {:?}",
121 total_size,
122 file_path,
123 start.elapsed()
124 );
125
126 Ok(result)
127}