mito2/sst/parquet/
metadata.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use object_store::ObjectStore;
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::file::FOOTER_SIZE;
use snafu::ResultExt;

use crate::error::{self, Result};

/// The estimated size of the footer and metadata need to read from the end of parquet file.
const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;

/// Load the metadata of parquet file in an async way.
pub(crate) struct MetadataLoader<'a> {
    // An object store that supports async read
    object_store: ObjectStore,
    // The path of parquet file
    file_path: &'a str,
    // The size of parquet file
    file_size: u64,
}

impl<'a> MetadataLoader<'a> {
    /// Create a new parquet metadata loader.
    pub fn new(
        object_store: ObjectStore,
        file_path: &'a str,
        file_size: u64,
    ) -> MetadataLoader<'a> {
        Self {
            object_store,
            file_path,
            file_size,
        }
    }

    /// Async load the metadata of parquet file.
    ///
    /// Read [DEFAULT_PREFETCH_SIZE] from the end of parquet file at first, if File Metadata is in the
    /// read range, decode it and return [ParquetMetaData], otherwise, read again to get the rest of the metadata.
    ///
    /// Parquet File Format:
    /// ```text
    /// ┌───────────────────────────────────┐
    /// |4-byte magic number "PAR1"         |
    /// |───────────────────────────────────|
    /// |Column 1 Chunk 1 + Column Metadata |
    /// |Column 2 Chunk 1 + Column Metadata |
    /// |...                                |
    /// |Column N Chunk M + Column Metadata |
    /// |───────────────────────────────────|
    /// |File Metadata                      |
    /// |───────────────────────────────────|
    /// |4-byte length of file metadata     |
    /// |4-byte magic number "PAR1"         |
    /// └───────────────────────────────────┘
    /// ```
    ///
    /// Refer to https://github.com/apache/arrow-rs/blob/093a10e46203be1a0e94ae117854701bf58d4c79/parquet/src/arrow/async_reader/metadata.rs#L55-L106
    pub async fn load(&self) -> Result<ParquetMetaData> {
        let object_store = &self.object_store;
        let path = self.file_path;
        let file_size = self.get_file_size().await?;

        if file_size < FOOTER_SIZE as u64 {
            return error::InvalidParquetSnafu {
                file: path,
                reason: "file size is smaller than footer size",
            }
            .fail();
        }

        // Prefetch bytes for metadata from the end and process the footer
        let buffer_start = file_size.saturating_sub(DEFAULT_PREFETCH_SIZE);
        let buffer = object_store
            .read_with(path)
            .range(buffer_start..file_size)
            .await
            .context(error::OpenDalSnafu)?
            .to_vec();
        let buffer_len = buffer.len();

        let mut footer = [0; 8];
        footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);

        let metadata_len = ParquetMetaDataReader::decode_footer(&footer).map_err(|e| {
            error::InvalidParquetSnafu {
                file: path,
                reason: format!("failed to decode footer, {e}"),
            }
            .build()
        })? as u64;

        if file_size - (FOOTER_SIZE as u64) < metadata_len {
            return error::InvalidParquetSnafu {
                file: path,
                reason: format!(
                    "the sum of Metadata length {} and Footer size {} is larger than file size {}",
                    metadata_len, FOOTER_SIZE, file_size
                ),
            }
            .fail();
        }

        if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
            // The whole metadata is in the first read
            let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
            let metadata = ParquetMetaDataReader::decode_metadata(
                &buffer[metadata_start..buffer_len - FOOTER_SIZE],
            )
            .map_err(|e| {
                error::InvalidParquetSnafu {
                    file: path,
                    reason: format!("failed to decode metadata, {e}"),
                }
                .build()
            })?;
            Ok(metadata)
        } else {
            // The metadata is out of buffer, need to make a second read
            let metadata_start = file_size - metadata_len - FOOTER_SIZE as u64;
            let data = object_store
                .read_with(path)
                .range(metadata_start..(file_size - FOOTER_SIZE as u64))
                .await
                .context(error::OpenDalSnafu)?
                .to_vec();

            let metadata = ParquetMetaDataReader::decode_metadata(&data).map_err(|e| {
                error::InvalidParquetSnafu {
                    file: path,
                    reason: format!("failed to decode metadata, {e}"),
                }
                .build()
            })?;
            Ok(metadata)
        }
    }

    /// Get the size of parquet file.
    async fn get_file_size(&self) -> Result<u64> {
        let file_size = match self.file_size {
            0 => self
                .object_store
                .stat(self.file_path)
                .await
                .context(error::OpenDalSnafu)?
                .content_length(),
            other => other,
        };
        Ok(file_size)
    }
}