mito2/sst/parquet/metadata.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use object_store::ObjectStore;
16use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
17use parquet::file::FOOTER_SIZE;
18use snafu::ResultExt;
19
20use crate::error::{self, Result};
21
22/// The estimated size of the footer and metadata need to read from the end of parquet file.
23const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
24
25/// Load the metadata of parquet file in an async way.
26pub(crate) struct MetadataLoader<'a> {
27 // An object store that supports async read
28 object_store: ObjectStore,
29 // The path of parquet file
30 file_path: &'a str,
31 // The size of parquet file
32 file_size: u64,
33}
34
35impl<'a> MetadataLoader<'a> {
36 /// Create a new parquet metadata loader.
37 pub fn new(
38 object_store: ObjectStore,
39 file_path: &'a str,
40 file_size: u64,
41 ) -> MetadataLoader<'a> {
42 Self {
43 object_store,
44 file_path,
45 file_size,
46 }
47 }
48
49 /// Async load the metadata of parquet file.
50 ///
51 /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the
52 /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata.
53 ///
54 /// Parquet File Format:
55 /// ```text
56 /// ┌───────────────────────────────────┐
57 /// |4-byte magic number "PAR1" |
58 /// |───────────────────────────────────|
59 /// |Column 1 Chunk 1 + Column Metadata |
60 /// |Column 2 Chunk 1 + Column Metadata |
61 /// |... |
62 /// |Column N Chunk M + Column Metadata |
63 /// |───────────────────────────────────|
64 /// |File Metadata |
65 /// |───────────────────────────────────|
66 /// |4-byte length of file metadata |
67 /// |4-byte magic number "PAR1" |
68 /// └───────────────────────────────────┘
69 /// ```
70 ///
71 /// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106
72 pub async fn load(&self) -> Result<ParquetMetaData> {
73 let object_store = &self.object_store;
74 let path = self.file_path;
75 let file_size = self.get_file_size().await?;
76
77 if file_size < FOOTER_SIZE as u64 {
78 return error::InvalidParquetSnafu {
79 file: path,
80 reason: "file size is smaller than footer size",
81 }
82 .fail();
83 }
84
85 // Prefetch bytes for metadata from the end and process the footer
86 let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE);
87 let buffer = object_store
88 .read_with(path)
89 .range(buffer_start..file_size)
90 .await
91 .context(error::OpenDalSnafu)?
92 .to_vec();
93 let buffer_len = buffer.len();
94
95 let mut footer = [0; 8];
96 footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
97
98 let footer_tail = ParquetMetaDataReader::decode_footer_tail(&footer).map_err(|e| {
99 error::InvalidParquetSnafu {
100 file: path,
101 reason: format!("failed to decode footer, {e}"),
102 }
103 .build()
104 })?;
105 let metadata_len = footer_tail.metadata_length() as u64;
106
107 if file_size - (FOOTER_SIZE as u64) < metadata_len {
108 return error::InvalidParquetSnafu {
109 file: path,
110 reason: format!(
111 "the sum of Metadata length {} and Footer size {} is larger than file size {}",
112 metadata_len, FOOTER_SIZE, file_size
113 ),
114 }
115 .fail();
116 }
117
118 if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
119 // The whole metadata is in the first read
120 let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
121 let metadata = ParquetMetaDataReader::decode_metadata(
122 &buffer[metadata_start..buffer_len - FOOTER_SIZE],
123 )
124 .map_err(|e| {
125 error::InvalidParquetSnafu {
126 file: path,
127 reason: format!("failed to decode metadata, {e}"),
128 }
129 .build()
130 })?;
131 Ok(metadata)
132 } else {
133 // The metadata is out of buffer, need to make a second read
134 let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64;
135 let data = object_store
136 .read_with(path)
137 .range(metadata_start..(file_size - FOOTER_SIZE as u64))
138 .await
139 .context(error::OpenDalSnafu)?
140 .to_vec();
141
142 let metadata = ParquetMetaDataReader::decode_metadata(&data).map_err(|e| {
143 error::InvalidParquetSnafu {
144 file: path,
145 reason: format!("failed to decode metadata, {e}"),
146 }
147 .build()
148 })?;
149 Ok(metadata)
150 }
151 }
152
153 /// Get the size of parquet file.
154 async fn get_file_size(&self) -> Result<u64> {
155 let file_size = match self.file_size {
156 0 => self
157 .object_store
158 .stat(self.file_path)
159 .await
160 .context(error::OpenDalSnafu)?
161 .content_length(),
162 other => other,
163 };
164 Ok(file_size)
165 }
166}