mito2/sst/parquet/
metadata.rs1use std::result::Result as StdResult;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17
18use bytes::Bytes;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use object_store::ObjectStore;
22use parquet::arrow::async_reader::MetadataFetch;
23use parquet::errors::{ParquetError, Result as ParquetResult};
24use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
25use snafu::{IntoError as _, ResultExt};
26
27use crate::error::{self, Result};
28use crate::sst::parquet::reader::MetadataCacheMetrics;
29
30const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
32
33pub(crate) struct MetadataLoader<'a> {
34 object_store: ObjectStore,
36 file_path: &'a str,
38 file_size: u64,
40 page_index_policy: PageIndexPolicy,
41}
42
43impl<'a> MetadataLoader<'a> {
44 pub fn new(
46 object_store: ObjectStore,
47 file_path: &'a str,
48 file_size: u64,
49 ) -> MetadataLoader<'a> {
50 Self {
51 object_store,
52 file_path,
53 file_size,
54 page_index_policy: Default::default(),
55 }
56 }
57
58 pub(crate) fn with_page_index_policy(&mut self, page_index_policy: PageIndexPolicy) {
59 self.page_index_policy = page_index_policy;
60 }
61
62 async fn get_file_size(&self) -> Result<u64> {
64 let file_size = match self.file_size {
65 0 => self
66 .object_store
67 .stat(self.file_path)
68 .await
69 .context(error::OpenDalSnafu)?
70 .content_length(),
71 other => other,
72 };
73 Ok(file_size)
74 }
75
76 pub async fn load(&self, cache_metrics: &mut MetadataCacheMetrics) -> Result<ParquetMetaData> {
77 let path = self.file_path;
78 let file_size = self.get_file_size().await?;
79 let reader = ParquetMetaDataReader::new()
80 .with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize))
81 .with_page_index_policy(self.page_index_policy);
82
83 let num_reads = AtomicUsize::new(0);
84 let bytes_read = AtomicU64::new(0);
85 let fetch = ObjectStoreFetch {
86 object_store: &self.object_store,
87 file_path: self.file_path,
88 num_reads: &num_reads,
89 bytes_read: &bytes_read,
90 };
91
92 let metadata = reader
93 .load_and_finish(fetch, file_size)
94 .await
95 .map_err(|e| match unbox_external_error(e) {
96 Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err),
97 Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err),
98 })?;
99
100 cache_metrics.num_reads = num_reads.into_inner();
101 cache_metrics.bytes_read = bytes_read.into_inner();
102
103 Ok(metadata)
104 }
105}
106
107fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, ParquetError> {
109 match e {
110 ParquetError::External(boxed_err) => match boxed_err.downcast::<object_store::Error>() {
111 Ok(os_err) => Ok(*os_err),
112 Err(parquet_error) => Err(ParquetError::External(parquet_error)),
113 },
114 other => Err(other),
115 }
116}
117
118struct ObjectStoreFetch<'a> {
119 object_store: &'a ObjectStore,
120 file_path: &'a str,
121 num_reads: &'a AtomicUsize,
122 bytes_read: &'a AtomicU64,
123}
124
125impl MetadataFetch for ObjectStoreFetch<'_> {
126 fn fetch(&mut self, range: std::ops::Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
127 let bytes_to_read = range.end - range.start;
128 async move {
129 let data = self
130 .object_store
131 .read_with(self.file_path)
132 .range(range)
133 .await
134 .map_err(|e| ParquetError::External(Box::new(e)))?;
135 self.num_reads.fetch_add(1, Ordering::Relaxed);
136 self.bytes_read.fetch_add(bytes_to_read, Ordering::Relaxed);
137 Ok(data.to_bytes())
138 }
139 .boxed()
140 }
141}