index/inverted_index/format/reader/
footer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_base::range_read::RangeReader;
16use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
17use prost::Message;
18use snafu::{ensure, ResultExt};
19
20use crate::inverted_index::error::{
21    BlobSizeTooSmallSnafu, CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result,
22    UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu,
23    UnexpectedZeroSegmentRowCountSnafu,
24};
25use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
26
27pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
28
29/// InvertedIndexFooterReader is for reading the footer section of the blob.
30pub struct InvertedIndexFooterReader<R> {
31    source: R,
32    blob_size: u64,
33    prefetch_size: Option<u64>,
34}
35
36impl<R> InvertedIndexFooterReader<R> {
37    pub fn new(source: R, blob_size: u64) -> Self {
38        Self {
39            source,
40            blob_size,
41            prefetch_size: None,
42        }
43    }
44
45    /// Set the prefetch size for the footer reader.
46    pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
47        self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE));
48        self
49    }
50
51    pub fn prefetch_size(&self) -> u64 {
52        self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE)
53    }
54}
55
56impl<R: RangeReader> InvertedIndexFooterReader<R> {
57    pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
58        ensure!(
59            self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
60            BlobSizeTooSmallSnafu
61        );
62
63        let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
64        let suffix = self
65            .source
66            .read(footer_start..self.blob_size)
67            .await
68            .context(CommonIoSnafu)?;
69        let suffix_len = suffix.len();
70        let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
71        self.validate_payload_size(length)?;
72
73        let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
74
75        // Did not fetch the entire file metadata in the initial read, need to make a second request.
76        if length > suffix_len as u64 - footer_size {
77            let metadata_start = self.blob_size - length - footer_size;
78            let meta = self
79                .source
80                .read(metadata_start..self.blob_size - footer_size)
81                .await
82                .context(CommonIoSnafu)?;
83            self.parse_payload(&meta, length)
84        } else {
85            let metadata_start = self.blob_size - length - footer_size - footer_start;
86            let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
87            self.parse_payload(meta, length)
88        }
89    }
90
91    fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
92        let suffix_len = suffix.len();
93        ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu);
94        let mut bytes = [0; 4];
95        bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
96
97        Ok(bytes)
98    }
99
100    fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result<InvertedIndexMetas> {
101        let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?;
102        self.validate_metas(&metas, payload_size)?;
103        Ok(metas)
104    }
105
106    fn validate_payload_size(&self, payload_size: u64) -> Result<()> {
107        let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
108        ensure!(
109            payload_size <= max_payload_size,
110            UnexpectedFooterPayloadSizeSnafu {
111                max_payload_size,
112                actual_payload_size: payload_size,
113            }
114        );
115
116        Ok(())
117    }
118
119    /// Check if the read metadata is consistent with expected sizes and offsets.
120    fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> {
121        ensure!(
122            metas.segment_row_count > 0,
123            UnexpectedZeroSegmentRowCountSnafu
124        );
125
126        for meta in metas.metas.values() {
127            let InvertedIndexMeta {
128                base_offset,
129                inverted_index_size,
130                ..
131            } = meta;
132
133            let limit = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size;
134            ensure!(
135                *base_offset + *inverted_index_size <= limit,
136                UnexpectedOffsetSizeSnafu {
137                    offset: *base_offset,
138                    size: *inverted_index_size,
139                    blob_size: self.blob_size,
140                    payload_size,
141                }
142            );
143        }
144
145        Ok(())
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use std::assert_matches::assert_matches;
152
153    use prost::Message;
154
155    use super::*;
156    use crate::inverted_index::error::Error;
157
158    fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
159        let mut metas = InvertedIndexMetas {
160            segment_row_count: 1,
161            ..Default::default()
162        };
163        metas.metas.insert("test".to_string(), meta);
164
165        let mut payload_buf = vec![];
166        metas.encode(&mut payload_buf).unwrap();
167
168        let footer_payload_size = (payload_buf.len() as u32).to_le_bytes().to_vec();
169        payload_buf.extend_from_slice(&footer_payload_size);
170        payload_buf
171    }
172
173    #[tokio::test]
174    async fn test_read_payload() {
175        let meta = InvertedIndexMeta {
176            name: "test".to_string(),
177            ..Default::default()
178        };
179
180        let payload_buf = create_test_payload(meta);
181        let blob_size = payload_buf.len() as u64;
182
183        for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
184            let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
185            if prefetch > 0 {
186                reader = reader.with_prefetch_size(prefetch);
187            }
188
189            let metas = reader.metadata().await.unwrap();
190            assert_eq!(metas.metas.len(), 1);
191            let index_meta = &metas.metas.get("test").unwrap();
192            assert_eq!(index_meta.name, "test");
193        }
194    }
195
196    #[tokio::test]
197    async fn test_invalid_footer_payload_size() {
198        let meta = InvertedIndexMeta {
199            name: "test".to_string(),
200            ..Default::default()
201        };
202        let mut payload_buf = create_test_payload(meta);
203        payload_buf.push(0xff); // Add an extra byte to corrupt the footer
204        let blob_size = payload_buf.len() as u64;
205
206        for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
207            let blob_size = payload_buf.len() as u64;
208            let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
209            if prefetch > 0 {
210                reader = reader.with_prefetch_size(prefetch);
211            }
212
213            let result = reader.metadata().await;
214            assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
215        }
216    }
217
218    #[tokio::test]
219    async fn test_invalid_offset_size() {
220        let meta = InvertedIndexMeta {
221            name: "test".to_string(),
222            base_offset: 0,
223            inverted_index_size: 1, // Set size to 1 to make ecceed the blob size
224            ..Default::default()
225        };
226
227        let payload_buf = create_test_payload(meta);
228        let blob_size = payload_buf.len() as u64;
229
230        for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
231            let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
232            if prefetch > 0 {
233                reader = reader.with_prefetch_size(prefetch);
234            }
235
236            let result = reader.metadata().await;
237            assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
238        }
239    }
240}