mito2/sst/parquet/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::result::Result as StdResult;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17
18use bytes::Bytes;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use object_store::ObjectStore;
22use parquet::arrow::async_reader::MetadataFetch;
23use parquet::errors::{ParquetError, Result as ParquetResult};
24use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
25use snafu::{IntoError as _, ResultExt};
26
27use crate::error::{self, Result};
28use crate::sst::parquet::reader::MetadataCacheMetrics;
29
30/// The estimated size of the footer and metadata need to read from the end of parquet file.
31const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
32
33pub(crate) struct MetadataLoader<'a> {
34    // An object store that supports async read
35    object_store: ObjectStore,
36    // The path of parquet file
37    file_path: &'a str,
38    // The size of parquet file
39    file_size: u64,
40    page_index_policy: PageIndexPolicy,
41}
42
43impl<'a> MetadataLoader<'a> {
44    /// Create a new parquet metadata loader.
45    pub fn new(
46        object_store: ObjectStore,
47        file_path: &'a str,
48        file_size: u64,
49    ) -> MetadataLoader<'a> {
50        Self {
51            object_store,
52            file_path,
53            file_size,
54            page_index_policy: Default::default(),
55        }
56    }
57
58    pub(crate) fn with_page_index_policy(&mut self, page_index_policy: PageIndexPolicy) {
59        self.page_index_policy = page_index_policy;
60    }
61
62    /// Get the size of parquet file. If file_size is 0, stat the object store to get the size.
63    async fn get_file_size(&self) -> Result<u64> {
64        let file_size = match self.file_size {
65            0 => self
66                .object_store
67                .stat(self.file_path)
68                .await
69                .context(error::OpenDalSnafu)?
70                .content_length(),
71            other => other,
72        };
73        Ok(file_size)
74    }
75
76    pub async fn load(&self, cache_metrics: &mut MetadataCacheMetrics) -> Result<ParquetMetaData> {
77        let path = self.file_path;
78        let file_size = self.get_file_size().await?;
79        let reader = ParquetMetaDataReader::new()
80            .with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize))
81            .with_page_index_policy(self.page_index_policy);
82
83        let num_reads = AtomicUsize::new(0);
84        let bytes_read = AtomicU64::new(0);
85        let fetch = ObjectStoreFetch {
86            object_store: &self.object_store,
87            file_path: self.file_path,
88            num_reads: &num_reads,
89            bytes_read: &bytes_read,
90        };
91
92        let metadata = reader
93            .load_and_finish(fetch, file_size)
94            .await
95            .map_err(|e| match unbox_external_error(e) {
96                Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err),
97                Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err),
98            })?;
99
100        cache_metrics.num_reads = num_reads.into_inner();
101        cache_metrics.bytes_read = bytes_read.into_inner();
102
103        Ok(metadata)
104    }
105}
106
107/// Unpack ParquetError to get object_store::Error if possible.
108fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, ParquetError> {
109    match e {
110        ParquetError::External(boxed_err) => match boxed_err.downcast::<object_store::Error>() {
111            Ok(os_err) => Ok(*os_err),
112            Err(parquet_error) => Err(ParquetError::External(parquet_error)),
113        },
114        other => Err(other),
115    }
116}
117
118struct ObjectStoreFetch<'a> {
119    object_store: &'a ObjectStore,
120    file_path: &'a str,
121    num_reads: &'a AtomicUsize,
122    bytes_read: &'a AtomicU64,
123}
124
125impl MetadataFetch for ObjectStoreFetch<'_> {
126    fn fetch(&mut self, range: std::ops::Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
127        let bytes_to_read = range.end - range.start;
128        async move {
129            let data = self
130                .object_store
131                .read_with(self.file_path)
132                .range(range)
133                .await
134                .map_err(|e| ParquetError::External(Box::new(e)))?;
135            self.num_reads.fetch_add(1, Ordering::Relaxed);
136            self.bytes_read.fetch_add(bytes_to_read, Ordering::Relaxed);
137            Ok(data.to_bytes())
138        }
139        .boxed()
140    }
141}