mito2/sst/parquet/
metadata.rs1use std::result::Result as StdResult;
16
17use bytes::Bytes;
18use futures::FutureExt;
19use futures::future::BoxFuture;
20use object_store::ObjectStore;
21use parquet::arrow::async_reader::MetadataFetch;
22use parquet::errors::{ParquetError, Result as ParquetResult};
23use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
24use snafu::{IntoError as _, ResultExt};
25
26use crate::error::{self, Result};
27
28const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
30
31pub(crate) struct MetadataLoader<'a> {
32 object_store: ObjectStore,
34 file_path: &'a str,
36 file_size: u64,
38}
39
40impl<'a> MetadataLoader<'a> {
41 pub fn new(
43 object_store: ObjectStore,
44 file_path: &'a str,
45 file_size: u64,
46 ) -> MetadataLoader<'a> {
47 Self {
48 object_store,
49 file_path,
50 file_size,
51 }
52 }
53
54 async fn get_file_size(&self) -> Result<u64> {
56 let file_size = match self.file_size {
57 0 => self
58 .object_store
59 .stat(self.file_path)
60 .await
61 .context(error::OpenDalSnafu)?
62 .content_length(),
63 other => other,
64 };
65 Ok(file_size)
66 }
67
68 pub async fn load(&self) -> Result<ParquetMetaData> {
69 let path = self.file_path;
70 let file_size = self.get_file_size().await?;
71 let reader =
72 ParquetMetaDataReader::new().with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize));
73
74 let fetch = ObjectStoreFetch {
75 object_store: &self.object_store,
76 file_path: self.file_path,
77 };
78
79 reader
80 .load_and_finish(fetch, file_size)
81 .await
82 .map_err(|e| match unbox_external_error(e) {
83 Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err),
84 Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err),
85 })
86 }
87}
88
89fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, ParquetError> {
91 match e {
92 ParquetError::External(boxed_err) => match boxed_err.downcast::<object_store::Error>() {
93 Ok(os_err) => Ok(*os_err),
94 Err(parquet_error) => Err(ParquetError::External(parquet_error)),
95 },
96 other => Err(other),
97 }
98}
99
100struct ObjectStoreFetch<'a> {
101 object_store: &'a ObjectStore,
102 file_path: &'a str,
103}
104
105impl MetadataFetch for ObjectStoreFetch<'_> {
106 fn fetch(&mut self, range: std::ops::Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
107 async move {
108 let data = self
109 .object_store
110 .read_with(self.file_path)
111 .range(range)
112 .await
113 .map_err(|e| ParquetError::External(Box::new(e)))?;
114 Ok(data.to_bytes())
115 }
116 .boxed()
117 }
118}