Skip to main content

mito2/sst/parquet/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::result::Result as StdResult;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17
18use bytes::Bytes;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use object_store::ObjectStore;
22use parquet::arrow::async_reader::MetadataFetch;
23use parquet::errors::{ParquetError, Result as ParquetResult};
24use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
25use parquet::file::statistics::Statistics;
26use snafu::{IntoError as _, ResultExt};
27use store_api::metadata::RegionMetadata;
28use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
29
30use crate::error::{self, Result};
31use crate::sst::parquet::reader::MetadataCacheMetrics;
32
33/// The estimated size of the footer and metadata need to read from the end of parquet file.
34const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
35
36pub(crate) struct MetadataLoader<'a> {
37    // An object store that supports async read
38    object_store: ObjectStore,
39    // The path of parquet file
40    file_path: &'a str,
41    // The size of parquet file
42    file_size: u64,
43    page_index_policy: PageIndexPolicy,
44}
45
46impl<'a> MetadataLoader<'a> {
47    /// Create a new parquet metadata loader.
48    pub fn new(
49        object_store: ObjectStore,
50        file_path: &'a str,
51        file_size: u64,
52    ) -> MetadataLoader<'a> {
53        Self {
54            object_store,
55            file_path,
56            file_size,
57            page_index_policy: Default::default(),
58        }
59    }
60
61    pub(crate) fn with_page_index_policy(&mut self, page_index_policy: PageIndexPolicy) {
62        self.page_index_policy = page_index_policy;
63    }
64
65    /// Get the size of parquet file. If file_size is 0, stat the object store to get the size.
66    async fn get_file_size(&self) -> Result<u64> {
67        let file_size = match self.file_size {
68            0 => self
69                .object_store
70                .stat(self.file_path)
71                .await
72                .context(error::OpenDalSnafu)?
73                .content_length(),
74            other => other,
75        };
76        Ok(file_size)
77    }
78
79    pub async fn load(&self, cache_metrics: &mut MetadataCacheMetrics) -> Result<ParquetMetaData> {
80        let path = self.file_path;
81        let file_size = self.get_file_size().await?;
82        let reader = ParquetMetaDataReader::new()
83            .with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize))
84            .with_page_index_policy(self.page_index_policy);
85
86        let num_reads = AtomicUsize::new(0);
87        let bytes_read = AtomicU64::new(0);
88        let fetch = ObjectStoreFetch {
89            object_store: &self.object_store,
90            file_path: self.file_path,
91            num_reads: &num_reads,
92            bytes_read: &bytes_read,
93        };
94
95        let metadata = reader
96            .load_and_finish(fetch, file_size)
97            .await
98            .map_err(|e| match unbox_external_error(e) {
99                Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err),
100                Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err),
101            })?;
102
103        cache_metrics.num_reads = num_reads.into_inner();
104        cache_metrics.bytes_read = bytes_read.into_inner();
105
106        Ok(metadata)
107    }
108}
109
110/// Unpack ParquetError to get object_store::Error if possible.
111fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, ParquetError> {
112    match e {
113        ParquetError::External(boxed_err) => match boxed_err.downcast::<object_store::Error>() {
114            Ok(os_err) => Ok(*os_err),
115            Err(parquet_error) => Err(ParquetError::External(parquet_error)),
116        },
117        other => Err(other),
118    }
119}
120
121pub(crate) fn extract_primary_key_range(
122    parquet_meta: &ParquetMetaData,
123    region_metadata: &RegionMetadata,
124) -> Option<(Bytes, Bytes)> {
125    if region_metadata.primary_key.is_empty() {
126        return None;
127    }
128
129    let pk_column_idx = parquet_meta
130        .file_metadata()
131        .schema_descr()
132        .columns()
133        .iter()
134        .position(|column| column.name() == PRIMARY_KEY_COLUMN_NAME)?;
135
136    let mut min: Option<Bytes> = None;
137    let mut max: Option<Bytes> = None;
138
139    for row_group in parquet_meta.row_groups() {
140        let Statistics::ByteArray(stats) = row_group.column(pk_column_idx).statistics()? else {
141            return None;
142        };
143
144        let row_group_min = Bytes::copy_from_slice(stats.min_bytes_opt()?);
145        let row_group_max = Bytes::copy_from_slice(stats.max_bytes_opt()?);
146        min = Some(match min {
147            Some(current) => current.min(row_group_min),
148            None => row_group_min,
149        });
150        max = Some(match max {
151            Some(current) => current.max(row_group_max),
152            None => row_group_max,
153        });
154    }
155
156    min.zip(max)
157}
158
159struct ObjectStoreFetch<'a> {
160    object_store: &'a ObjectStore,
161    file_path: &'a str,
162    num_reads: &'a AtomicUsize,
163    bytes_read: &'a AtomicU64,
164}
165
166impl MetadataFetch for ObjectStoreFetch<'_> {
167    fn fetch(&mut self, range: std::ops::Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
168        let bytes_to_read = range.end - range.start;
169        async move {
170            let data = self
171                .object_store
172                .read_with(self.file_path)
173                .range(range)
174                .await
175                .map_err(|e| ParquetError::External(Box::new(e)))?;
176            self.num_reads.fetch_add(1, Ordering::Relaxed);
177            self.bytes_read.fetch_add(bytes_to_read, Ordering::Relaxed);
178            Ok(data.to_bytes())
179        }
180        .boxed()
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use std::sync::Arc;
187
188    use datatypes::arrow::array::{
189        ArrayRef, BinaryArray, DictionaryArray, Int64Array, UInt32Array,
190    };
191    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
192    use datatypes::arrow::record_batch::RecordBatch;
193    use parquet::arrow::ArrowWriter;
194    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
195    use parquet::file::metadata::{KeyValue, ParquetMetaData};
196    use parquet::file::properties::{EnabledStatistics, WriterProperties};
197
198    use super::*;
199    use crate::sst::parquet::PARQUET_METADATA_KEY;
200    use crate::test_util::sst_util::sst_region_metadata;
201
202    fn build_test_metadata(
203        include_primary_key: bool,
204        primary_keys: &[&[u8]],
205        row_group_sizes: &[usize],
206        stats_enabled: EnabledStatistics,
207    ) -> ParquetMetaData {
208        let total_rows = row_group_sizes.iter().sum::<usize>();
209        let mut fields = vec![Field::new("field", ArrowDataType::Int64, true)];
210        let mut columns: Vec<ArrayRef> =
211            vec![Arc::new(Int64Array::from_iter_values(0..total_rows as i64))];
212        if include_primary_key {
213            assert_eq!(total_rows, primary_keys.len());
214            fields.push(Field::new(
215                "__primary_key",
216                ArrowDataType::Dictionary(
217                    Box::new(ArrowDataType::UInt32),
218                    Box::new(ArrowDataType::Binary),
219                ),
220                false,
221            ));
222            let values = Arc::new(BinaryArray::from_iter_values(primary_keys.iter().copied()));
223            let keys = UInt32Array::from_iter_values(0..primary_keys.len() as u32);
224            columns.push(Arc::new(DictionaryArray::new(keys, values)));
225        }
226
227        let schema = Arc::new(Schema::new(fields));
228        let region_metadata = Arc::new(sst_region_metadata());
229        let key_value = KeyValue::new(
230            PARQUET_METADATA_KEY.to_string(),
231            region_metadata.to_json().unwrap(),
232        );
233        let props = WriterProperties::builder()
234            .set_key_value_metadata(Some(vec![key_value]))
235            .set_statistics_enabled(stats_enabled)
236            .build();
237
238        let mut parquet_bytes = Vec::new();
239        let mut writer =
240            ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), Some(props)).unwrap();
241        let mut offset = 0;
242        for row_group_size in row_group_sizes {
243            let batch = RecordBatch::try_new(
244                schema.clone(),
245                columns
246                    .iter()
247                    .map(|column| column.slice(offset, *row_group_size))
248                    .collect(),
249            )
250            .unwrap();
251            writer.write(&batch).unwrap();
252            offset += row_group_size;
253        }
254        writer.close().unwrap();
255
256        ParquetRecordBatchReaderBuilder::try_new(Bytes::from(parquet_bytes))
257            .unwrap()
258            .metadata()
259            .as_ref()
260            .clone()
261    }
262
263    #[test]
264    fn test_extract_primary_key_range_returns_none_when_column_absent() {
265        let metadata = build_test_metadata(false, &[], &[1], EnabledStatistics::Page);
266        let region_metadata = sst_region_metadata();
267
268        assert_eq!(None, extract_primary_key_range(&metadata, &region_metadata));
269    }
270
271    #[test]
272    fn test_extract_primary_key_range_folds_row_group_stats() {
273        let metadata = build_test_metadata(
274            true,
275            &[b"bbb", b"ccc", b"aaa", b"zzz"],
276            &[2, 2],
277            EnabledStatistics::Page,
278        );
279        let region_metadata = sst_region_metadata();
280
281        assert_eq!(
282            Some((Bytes::from_static(b"aaa"), Bytes::from_static(b"zzz"))),
283            extract_primary_key_range(&metadata, &region_metadata)
284        );
285    }
286
287    #[test]
288    fn test_extract_primary_key_range_returns_none_when_any_rg_stats_missing() {
289        let metadata = build_test_metadata(
290            true,
291            &[b"bbb", b"ccc", b"aaa", b"zzz"],
292            &[2, 2],
293            EnabledStatistics::None,
294        );
295        let region_metadata = sst_region_metadata();
296
297        assert_eq!(None, extract_primary_key_range(&metadata, &region_metadata));
298    }
299}