index/inverted_index/format/reader/
footer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_base::range_read::RangeReader;
18use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
19use prost::Message;
20use snafu::{ResultExt, ensure};
21
22use crate::inverted_index::error::{
23    BlobSizeTooSmallSnafu, CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result,
24    UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu,
25    UnexpectedZeroSegmentRowCountSnafu,
26};
27use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
28use crate::inverted_index::format::reader::InvertedIndexReadMetrics;
29
30pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
31
32/// InvertedIndexFooterReader is for reading the footer section of the blob.
33pub struct InvertedIndexFooterReader<R> {
34    source: R,
35    blob_size: u64,
36    prefetch_size: Option<u64>,
37}
38
39impl<R> InvertedIndexFooterReader<R> {
40    pub fn new(source: R, blob_size: u64) -> Self {
41        Self {
42            source,
43            blob_size,
44            prefetch_size: None,
45        }
46    }
47
48    /// Set the prefetch size for the footer reader.
49    pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
50        self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE));
51        self
52    }
53
54    pub fn prefetch_size(&self) -> u64 {
55        self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE)
56    }
57}
58
59impl<R: RangeReader> InvertedIndexFooterReader<R> {
60    pub async fn metadata(
61        &mut self,
62        mut metrics: Option<&mut InvertedIndexReadMetrics>,
63    ) -> Result<InvertedIndexMetas> {
64        ensure!(
65            self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
66            BlobSizeTooSmallSnafu
67        );
68
69        let start = metrics.as_ref().map(|_| Instant::now());
70
71        let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
72        let suffix = self
73            .source
74            .read(footer_start..self.blob_size)
75            .await
76            .context(CommonIoSnafu)?;
77        let suffix_len = suffix.len();
78        let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
79        self.validate_payload_size(length)?;
80
81        let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
82
83        // Did not fetch the entire file metadata in the initial read, need to make a second request.
84        let result = if length > suffix_len as u64 - footer_size {
85            let metadata_start = self.blob_size - length - footer_size;
86            let meta = self
87                .source
88                .read(metadata_start..self.blob_size - footer_size)
89                .await
90                .context(CommonIoSnafu)?;
91
92            if let Some(m) = metrics.as_deref_mut() {
93                m.total_bytes += self.blob_size.min(self.prefetch_size()) + length;
94                m.total_ranges += 2;
95            }
96
97            self.parse_payload(&meta, length)
98        } else {
99            if let Some(m) = metrics.as_deref_mut() {
100                m.total_bytes += self.blob_size.min(self.prefetch_size());
101                m.total_ranges += 1;
102            }
103
104            let metadata_start = self.blob_size - length - footer_size - footer_start;
105            let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
106            self.parse_payload(meta, length)
107        };
108
109        if let Some(m) = metrics {
110            m.fetch_elapsed += start.unwrap().elapsed();
111        }
112
113        result
114    }
115
116    fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
117        let suffix_len = suffix.len();
118        ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu);
119        let mut bytes = [0; 4];
120        bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
121
122        Ok(bytes)
123    }
124
125    fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result<InvertedIndexMetas> {
126        let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?;
127        self.validate_metas(&metas, payload_size)?;
128        Ok(metas)
129    }
130
131    fn validate_payload_size(&self, payload_size: u64) -> Result<()> {
132        let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
133        ensure!(
134            payload_size <= max_payload_size,
135            UnexpectedFooterPayloadSizeSnafu {
136                max_payload_size,
137                actual_payload_size: payload_size,
138            }
139        );
140
141        Ok(())
142    }
143
144    /// Check if the read metadata is consistent with expected sizes and offsets.
145    fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> {
146        ensure!(
147            metas.segment_row_count > 0,
148            UnexpectedZeroSegmentRowCountSnafu
149        );
150
151        for meta in metas.metas.values() {
152            let InvertedIndexMeta {
153                base_offset,
154                inverted_index_size,
155                ..
156            } = meta;
157
158            let limit = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size;
159            ensure!(
160                *base_offset + *inverted_index_size <= limit,
161                UnexpectedOffsetSizeSnafu {
162                    offset: *base_offset,
163                    size: *inverted_index_size,
164                    blob_size: self.blob_size,
165                    payload_size,
166                }
167            );
168        }
169
170        Ok(())
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use std::assert_matches::assert_matches;
177
178    use prost::Message;
179
180    use super::*;
181    use crate::inverted_index::error::Error;
182
183    fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
184        let mut metas = InvertedIndexMetas {
185            segment_row_count: 1,
186            ..Default::default()
187        };
188        metas.metas.insert("test".to_string(), meta);
189
190        let mut payload_buf = vec![];
191        metas.encode(&mut payload_buf).unwrap();
192
193        let footer_payload_size = (payload_buf.len() as u32).to_le_bytes().to_vec();
194        payload_buf.extend_from_slice(&footer_payload_size);
195        payload_buf
196    }
197
198    #[tokio::test]
199    async fn test_read_payload() {
200        let meta = InvertedIndexMeta {
201            name: "test".to_string(),
202            ..Default::default()
203        };
204
205        let payload_buf = create_test_payload(meta);
206        let blob_size = payload_buf.len() as u64;
207
208        for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
209            let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
210            if prefetch > 0 {
211                reader = reader.with_prefetch_size(prefetch);
212            }
213
214            let metas = reader.metadata(None).await.unwrap();
215            assert_eq!(metas.metas.len(), 1);
216            let index_meta = &metas.metas.get("test").unwrap();
217            assert_eq!(index_meta.name, "test");
218        }
219    }
220
221    #[tokio::test]
222    async fn test_invalid_footer_payload_size() {
223        let meta = InvertedIndexMeta {
224            name: "test".to_string(),
225            ..Default::default()
226        };
227        let mut payload_buf = create_test_payload(meta);
228        payload_buf.push(0xff); // Add an extra byte to corrupt the footer
229        let blob_size = payload_buf.len() as u64;
230
231        for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
232            let blob_size = payload_buf.len() as u64;
233            let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
234            if prefetch > 0 {
235                reader = reader.with_prefetch_size(prefetch);
236            }
237
238            let result = reader.metadata(None).await;
239            assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
240        }
241    }
242
243    #[tokio::test]
244    async fn test_invalid_offset_size() {
245        let meta = InvertedIndexMeta {
246            name: "test".to_string(),
247            base_offset: 0,
248            inverted_index_size: 1, // Set size to 1 to make ecceed the blob size
249            ..Default::default()
250        };
251
252        let payload_buf = create_test_payload(meta);
253        let blob_size = payload_buf.len() as u64;
254
255        for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
256            let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
257            if prefetch > 0 {
258                reader = reader.with_prefetch_size(prefetch);
259            }
260
261            let result = reader.metadata(None).await;
262            assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
263        }
264    }
265}