mito2/sst/parquet/metadata.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use object_store::ObjectStore;
16use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
17use parquet::file::FOOTER_SIZE;
18use snafu::ResultExt;
19
20use crate::error::{self, Result};
21
22/// The estimated size of the footer and metadata need to read from the end of parquet file.
23const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
24
25/// Load the metadata of parquet file in an async way.
26pub(crate) struct MetadataLoader<'a> {
27 // An object store that supports async read
28 object_store: ObjectStore,
29 // The path of parquet file
30 file_path: &'a str,
31 // The size of parquet file
32 file_size: u64,
33}
34
35impl<'a> MetadataLoader<'a> {
36 /// Create a new parquet metadata loader.
37 pub fn new(
38 object_store: ObjectStore,
39 file_path: &'a str,
40 file_size: u64,
41 ) -> MetadataLoader<'a> {
42 Self {
43 object_store,
44 file_path,
45 file_size,
46 }
47 }
48
49 /// Async load the metadata of parquet file.
50 ///
51 /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the
52 /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata.
53 ///
54 /// Parquet File Format:
55 /// ```text
56 /// ┌───────────────────────────────────┐
57 /// |4-byte magic number "PAR1" |
58 /// |───────────────────────────────────|
59 /// |Column 1 Chunk 1 + Column Metadata |
60 /// |Column 2 Chunk 1 + Column Metadata |
61 /// |... |
62 /// |Column N Chunk M + Column Metadata |
63 /// |───────────────────────────────────|
64 /// |File Metadata |
65 /// |───────────────────────────────────|
66 /// |4-byte length of file metadata |
67 /// |4-byte magic number "PAR1" |
68 /// └───────────────────────────────────┘
69 /// ```
70 ///
71 /// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106
72 pub async fn load(&self) -> Result<ParquetMetaData> {
73 let object_store = &self.object_store;
74 let path = self.file_path;
75 let file_size = self.get_file_size().await?;
76
77 if file_size < FOOTER_SIZE as u64 {
78 return error::InvalidParquetSnafu {
79 file: path,
80 reason: "file size is smaller than footer size",
81 }
82 .fail();
83 }
84
85 // Prefetch bytes for metadata from the end and process the footer
86 let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE);
87 let buffer = object_store
88 .read_with(path)
89 .range(buffer_start..file_size)
90 .await
91 .context(error::OpenDalSnafu)?
92 .to_vec();
93 let buffer_len = buffer.len();
94
95 let mut footer = [0; 8];
96 footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
97
98 let metadata_len = ParquetMetaDataReader::decode_footer(&footer).map_err(|e| {
99 error::InvalidParquetSnafu {
100 file: path,
101 reason: format!("failed to decode footer, {e}"),
102 }
103 .build()
104 })? as u64;
105
106 if file_size - (FOOTER_SIZE as u64) < metadata_len {
107 return error::InvalidParquetSnafu {
108 file: path,
109 reason: format!(
110 "the sum of Metadata length {} and Footer size {} is larger than file size {}",
111 metadata_len, FOOTER_SIZE, file_size
112 ),
113 }
114 .fail();
115 }
116
117 if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
118 // The whole metadata is in the first read
119 let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
120 let metadata = ParquetMetaDataReader::decode_metadata(
121 &buffer[metadata_start..buffer_len - FOOTER_SIZE],
122 )
123 .map_err(|e| {
124 error::InvalidParquetSnafu {
125 file: path,
126 reason: format!("failed to decode metadata, {e}"),
127 }
128 .build()
129 })?;
130 Ok(metadata)
131 } else {
132 // The metadata is out of buffer, need to make a second read
133 let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64;
134 let data = object_store
135 .read_with(path)
136 .range(metadata_start..(file_size - FOOTER_SIZE as u64))
137 .await
138 .context(error::OpenDalSnafu)?
139 .to_vec();
140
141 let metadata = ParquetMetaDataReader::decode_metadata(&data).map_err(|e| {
142 error::InvalidParquetSnafu {
143 file: path,
144 reason: format!("failed to decode metadata, {e}"),
145 }
146 .build()
147 })?;
148 Ok(metadata)
149 }
150 }
151
152 /// Get the size of parquet file.
153 async fn get_file_size(&self) -> Result<u64> {
154 let file_size = match self.file_size {
155 0 => self
156 .object_store
157 .stat(self.file_path)
158 .await
159 .context(error::OpenDalSnafu)?
160 .content_length(),
161 other => other,
162 };
163 Ok(file_size)
164 }
165}