mito2/sst/parquet/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use object_store::ObjectStore;
16use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
17use parquet::file::FOOTER_SIZE;
18use snafu::ResultExt;
19
20use crate::error::{self, Result};
21
22/// The estimated size of the footer and metadata need to read from the end of parquet file.
23const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
24
25/// Load the metadata of parquet file in an async way.
26pub(crate) struct MetadataLoader<'a> {
27    // An object store that supports async read
28    object_store: ObjectStore,
29    // The path of parquet file
30    file_path: &'a str,
31    // The size of parquet file
32    file_size: u64,
33}
34
35impl<'a> MetadataLoader<'a> {
36    /// Create a new parquet metadata loader.
37    pub fn new(
38        object_store: ObjectStore,
39        file_path: &'a str,
40        file_size: u64,
41    ) -> MetadataLoader<'a> {
42        Self {
43            object_store,
44            file_path,
45            file_size,
46        }
47    }
48
49    /// Async load the metadata of parquet file.
50    ///
51    /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the
52    /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata.
53    ///
54    /// Parquet File Format:
55    /// ```text
56    /// ┌───────────────────────────────────┐
57    /// |4-byte magic number "PAR1"         |
58    /// |───────────────────────────────────|
59    /// |Column 1 Chunk 1 + Column Metadata |
60    /// |Column 2 Chunk 1 + Column Metadata |
61    /// |...                                |
62    /// |Column N Chunk M + Column Metadata |
63    /// |───────────────────────────────────|
64    /// |File Metadata                      |
65    /// |───────────────────────────────────|
66    /// |4-byte length of file metadata     |
67    /// |4-byte magic number "PAR1"         |
68    /// └───────────────────────────────────┘
69    /// ```
70    ///
71    /// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106
72    pub async fn load(&self) -> Result<ParquetMetaData> {
73        let object_store = &self.object_store;
74        let path = self.file_path;
75        let file_size = self.get_file_size().await?;
76
77        if file_size < FOOTER_SIZE as u64 {
78            return error::InvalidParquetSnafu {
79                file: path,
80                reason: "file size is smaller than footer size",
81            }
82            .fail();
83        }
84
85        // Prefetch bytes for metadata from the end and process the footer
86        let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE);
87        let buffer = object_store
88            .read_with(path)
89            .range(buffer_start..file_size)
90            .await
91            .context(error::OpenDalSnafu)?
92            .to_vec();
93        let buffer_len = buffer.len();
94
95        let mut footer = [0; 8];
96        footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
97
98        let metadata_len = ParquetMetaDataReader::decode_footer(&footer).map_err(|e| {
99            error::InvalidParquetSnafu {
100                file: path,
101                reason: format!("failed to decode footer, {e}"),
102            }
103            .build()
104        })? as u64;
105
106        if file_size - (FOOTER_SIZE as u64) < metadata_len {
107            return error::InvalidParquetSnafu {
108                file: path,
109                reason: format!(
110                    "the sum of Metadata length {} and Footer size {} is larger than file size {}",
111                    metadata_len, FOOTER_SIZE, file_size
112                ),
113            }
114            .fail();
115        }
116
117        if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
118            // The whole metadata is in the first read
119            let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
120            let metadata = ParquetMetaDataReader::decode_metadata(
121                &buffer[metadata_start..buffer_len - FOOTER_SIZE],
122            )
123            .map_err(|e| {
124                error::InvalidParquetSnafu {
125                    file: path,
126                    reason: format!("failed to decode metadata, {e}"),
127                }
128                .build()
129            })?;
130            Ok(metadata)
131        } else {
132            // The metadata is out of buffer, need to make a second read
133            let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64;
134            let data = object_store
135                .read_with(path)
136                .range(metadata_start..(file_size - FOOTER_SIZE as u64))
137                .await
138                .context(error::OpenDalSnafu)?
139                .to_vec();
140
141            let metadata = ParquetMetaDataReader::decode_metadata(&data).map_err(|e| {
142                error::InvalidParquetSnafu {
143                    file: path,
144                    reason: format!("failed to decode metadata, {e}"),
145                }
146                .build()
147            })?;
148            Ok(metadata)
149        }
150    }
151
152    /// Get the size of parquet file.
153    async fn get_file_size(&self) -> Result<u64> {
154        let file_size = match self.file_size {
155            0 => self
156                .object_store
157                .stat(self.file_path)
158                .await
159                .context(error::OpenDalSnafu)?
160                .content_length(),
161            other => other,
162        };
163        Ok(file_size)
164    }
165}