mito2/sst/parquet/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::result::Result as StdResult;
16
17use bytes::Bytes;
18use futures::FutureExt;
19use futures::future::BoxFuture;
20use object_store::ObjectStore;
21use parquet::arrow::async_reader::MetadataFetch;
22use parquet::errors::{ParquetError, Result as ParquetResult};
23use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
24use snafu::{IntoError as _, ResultExt};
25
26use crate::error::{self, Result};
27
28/// The estimated size of the footer and metadata need to read from the end of parquet file.
29const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
30
31pub(crate) struct MetadataLoader<'a> {
32    // An object store that supports async read
33    object_store: ObjectStore,
34    // The path of parquet file
35    file_path: &'a str,
36    // The size of parquet file
37    file_size: u64,
38}
39
40impl<'a> MetadataLoader<'a> {
41    /// Create a new parquet metadata loader.
42    pub fn new(
43        object_store: ObjectStore,
44        file_path: &'a str,
45        file_size: u64,
46    ) -> MetadataLoader<'a> {
47        Self {
48            object_store,
49            file_path,
50            file_size,
51        }
52    }
53
54    /// Get the size of parquet file. If file_size is 0, stat the object store to get the size.
55    async fn get_file_size(&self) -> Result<u64> {
56        let file_size = match self.file_size {
57            0 => self
58                .object_store
59                .stat(self.file_path)
60                .await
61                .context(error::OpenDalSnafu)?
62                .content_length(),
63            other => other,
64        };
65        Ok(file_size)
66    }
67
68    pub async fn load(&self) -> Result<ParquetMetaData> {
69        let path = self.file_path;
70        let file_size = self.get_file_size().await?;
71        let reader =
72            ParquetMetaDataReader::new().with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize));
73
74        let fetch = ObjectStoreFetch {
75            object_store: &self.object_store,
76            file_path: self.file_path,
77        };
78
79        reader
80            .load_and_finish(fetch, file_size)
81            .await
82            .map_err(|e| match unbox_external_error(e) {
83                Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err),
84                Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err),
85            })
86    }
87}
88
89/// Unpack ParquetError to get object_store::Error if possible.
90fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, ParquetError> {
91    match e {
92        ParquetError::External(boxed_err) => match boxed_err.downcast::<object_store::Error>() {
93            Ok(os_err) => Ok(*os_err),
94            Err(parquet_error) => Err(ParquetError::External(parquet_error)),
95        },
96        other => Err(other),
97    }
98}
99
100struct ObjectStoreFetch<'a> {
101    object_store: &'a ObjectStore,
102    file_path: &'a str,
103}
104
105impl MetadataFetch for ObjectStoreFetch<'_> {
106    fn fetch(&mut self, range: std::ops::Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
107        async move {
108            let data = self
109                .object_store
110                .read_with(self.file_path)
111                .range(range)
112                .await
113                .map_err(|e| ParquetError::External(Box::new(e)))?;
114            Ok(data.to_bytes())
115        }
116        .boxed()
117    }
118}