index/inverted_index/format/reader/
footer.rs1use common_base::range_read::RangeReader;
16use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
17use prost::Message;
18use snafu::{ensure, ResultExt};
19
20use crate::inverted_index::error::{
21 BlobSizeTooSmallSnafu, CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result,
22 UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu,
23 UnexpectedZeroSegmentRowCountSnafu,
24};
25use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
26
27pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; pub struct InvertedIndexFooterReader<R> {
31 source: R,
32 blob_size: u64,
33 prefetch_size: Option<u64>,
34}
35
36impl<R> InvertedIndexFooterReader<R> {
37 pub fn new(source: R, blob_size: u64) -> Self {
38 Self {
39 source,
40 blob_size,
41 prefetch_size: None,
42 }
43 }
44
45 pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
47 self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE));
48 self
49 }
50
51 pub fn prefetch_size(&self) -> u64 {
52 self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE)
53 }
54}
55
56impl<R: RangeReader> InvertedIndexFooterReader<R> {
57 pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
58 ensure!(
59 self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
60 BlobSizeTooSmallSnafu
61 );
62
63 let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
64 let suffix = self
65 .source
66 .read(footer_start..self.blob_size)
67 .await
68 .context(CommonIoSnafu)?;
69 let suffix_len = suffix.len();
70 let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
71 self.validate_payload_size(length)?;
72
73 let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
74
75 if length > suffix_len as u64 - footer_size {
77 let metadata_start = self.blob_size - length - footer_size;
78 let meta = self
79 .source
80 .read(metadata_start..self.blob_size - footer_size)
81 .await
82 .context(CommonIoSnafu)?;
83 self.parse_payload(&meta, length)
84 } else {
85 let metadata_start = self.blob_size - length - footer_size - footer_start;
86 let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
87 self.parse_payload(meta, length)
88 }
89 }
90
91 fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
92 let suffix_len = suffix.len();
93 ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu);
94 let mut bytes = [0; 4];
95 bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
96
97 Ok(bytes)
98 }
99
100 fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result<InvertedIndexMetas> {
101 let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?;
102 self.validate_metas(&metas, payload_size)?;
103 Ok(metas)
104 }
105
106 fn validate_payload_size(&self, payload_size: u64) -> Result<()> {
107 let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
108 ensure!(
109 payload_size <= max_payload_size,
110 UnexpectedFooterPayloadSizeSnafu {
111 max_payload_size,
112 actual_payload_size: payload_size,
113 }
114 );
115
116 Ok(())
117 }
118
119 fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> {
121 ensure!(
122 metas.segment_row_count > 0,
123 UnexpectedZeroSegmentRowCountSnafu
124 );
125
126 for meta in metas.metas.values() {
127 let InvertedIndexMeta {
128 base_offset,
129 inverted_index_size,
130 ..
131 } = meta;
132
133 let limit = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size;
134 ensure!(
135 *base_offset + *inverted_index_size <= limit,
136 UnexpectedOffsetSizeSnafu {
137 offset: *base_offset,
138 size: *inverted_index_size,
139 blob_size: self.blob_size,
140 payload_size,
141 }
142 );
143 }
144
145 Ok(())
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use std::assert_matches::assert_matches;
152
153 use prost::Message;
154
155 use super::*;
156 use crate::inverted_index::error::Error;
157
158 fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
159 let mut metas = InvertedIndexMetas {
160 segment_row_count: 1,
161 ..Default::default()
162 };
163 metas.metas.insert("test".to_string(), meta);
164
165 let mut payload_buf = vec![];
166 metas.encode(&mut payload_buf).unwrap();
167
168 let footer_payload_size = (payload_buf.len() as u32).to_le_bytes().to_vec();
169 payload_buf.extend_from_slice(&footer_payload_size);
170 payload_buf
171 }
172
173 #[tokio::test]
174 async fn test_read_payload() {
175 let meta = InvertedIndexMeta {
176 name: "test".to_string(),
177 ..Default::default()
178 };
179
180 let payload_buf = create_test_payload(meta);
181 let blob_size = payload_buf.len() as u64;
182
183 for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
184 let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
185 if prefetch > 0 {
186 reader = reader.with_prefetch_size(prefetch);
187 }
188
189 let metas = reader.metadata().await.unwrap();
190 assert_eq!(metas.metas.len(), 1);
191 let index_meta = &metas.metas.get("test").unwrap();
192 assert_eq!(index_meta.name, "test");
193 }
194 }
195
196 #[tokio::test]
197 async fn test_invalid_footer_payload_size() {
198 let meta = InvertedIndexMeta {
199 name: "test".to_string(),
200 ..Default::default()
201 };
202 let mut payload_buf = create_test_payload(meta);
203 payload_buf.push(0xff); let blob_size = payload_buf.len() as u64;
205
206 for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
207 let blob_size = payload_buf.len() as u64;
208 let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
209 if prefetch > 0 {
210 reader = reader.with_prefetch_size(prefetch);
211 }
212
213 let result = reader.metadata().await;
214 assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
215 }
216 }
217
218 #[tokio::test]
219 async fn test_invalid_offset_size() {
220 let meta = InvertedIndexMeta {
221 name: "test".to_string(),
222 base_offset: 0,
223 inverted_index_size: 1, ..Default::default()
225 };
226
227 let payload_buf = create_test_payload(meta);
228 let blob_size = payload_buf.len() as u64;
229
230 for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
231 let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
232 if prefetch > 0 {
233 reader = reader.with_prefetch_size(prefetch);
234 }
235
236 let result = reader.metadata().await;
237 assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
238 }
239 }
240}