index/inverted_index/format/reader/
footer.rs1use std::time::Instant;
16
17use common_base::range_read::RangeReader;
18use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
19use prost::Message;
20use snafu::{ResultExt, ensure};
21
22use crate::inverted_index::error::{
23 BlobSizeTooSmallSnafu, CommonIoSnafu, DecodeProtoSnafu, InvalidFooterPayloadSizeSnafu, Result,
24 UnexpectedFooterPayloadSizeSnafu, UnexpectedOffsetSizeSnafu,
25 UnexpectedZeroSegmentRowCountSnafu,
26};
27use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
28use crate::inverted_index::format::reader::InvertedIndexReadMetrics;
29
30pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; pub struct InvertedIndexFooterReader<R> {
34 source: R,
35 blob_size: u64,
36 prefetch_size: Option<u64>,
37}
38
39impl<R> InvertedIndexFooterReader<R> {
40 pub fn new(source: R, blob_size: u64) -> Self {
41 Self {
42 source,
43 blob_size,
44 prefetch_size: None,
45 }
46 }
47
48 pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self {
50 self.prefetch_size = Some(prefetch_size.max(FOOTER_PAYLOAD_SIZE_SIZE));
51 self
52 }
53
54 pub fn prefetch_size(&self) -> u64 {
55 self.prefetch_size.unwrap_or(FOOTER_PAYLOAD_SIZE_SIZE)
56 }
57}
58
59impl<R: RangeReader> InvertedIndexFooterReader<R> {
60 pub async fn metadata(
61 &mut self,
62 mut metrics: Option<&mut InvertedIndexReadMetrics>,
63 ) -> Result<InvertedIndexMetas> {
64 ensure!(
65 self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
66 BlobSizeTooSmallSnafu
67 );
68
69 let start = metrics.as_ref().map(|_| Instant::now());
70
71 let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
72 let suffix = self
73 .source
74 .read(footer_start..self.blob_size)
75 .await
76 .context(CommonIoSnafu)?;
77 let suffix_len = suffix.len();
78 let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
79 self.validate_payload_size(length)?;
80
81 let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
82
83 let result = if length > suffix_len as u64 - footer_size {
85 let metadata_start = self.blob_size - length - footer_size;
86 let meta = self
87 .source
88 .read(metadata_start..self.blob_size - footer_size)
89 .await
90 .context(CommonIoSnafu)?;
91
92 if let Some(m) = metrics.as_deref_mut() {
93 m.total_bytes += self.blob_size.min(self.prefetch_size()) + length;
94 m.total_ranges += 2;
95 }
96
97 self.parse_payload(&meta, length)
98 } else {
99 if let Some(m) = metrics.as_deref_mut() {
100 m.total_bytes += self.blob_size.min(self.prefetch_size());
101 m.total_ranges += 1;
102 }
103
104 let metadata_start = self.blob_size - length - footer_size - footer_start;
105 let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
106 self.parse_payload(meta, length)
107 };
108
109 if let Some(m) = metrics {
110 m.fetch_elapsed += start.unwrap().elapsed();
111 }
112
113 result
114 }
115
116 fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
117 let suffix_len = suffix.len();
118 ensure!(suffix_len >= 4, InvalidFooterPayloadSizeSnafu);
119 let mut bytes = [0; 4];
120 bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
121
122 Ok(bytes)
123 }
124
125 fn parse_payload(&mut self, bytes: &[u8], payload_size: u64) -> Result<InvertedIndexMetas> {
126 let metas = InvertedIndexMetas::decode(bytes).context(DecodeProtoSnafu)?;
127 self.validate_metas(&metas, payload_size)?;
128 Ok(metas)
129 }
130
131 fn validate_payload_size(&self, payload_size: u64) -> Result<()> {
132 let max_payload_size = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE;
133 ensure!(
134 payload_size <= max_payload_size,
135 UnexpectedFooterPayloadSizeSnafu {
136 max_payload_size,
137 actual_payload_size: payload_size,
138 }
139 );
140
141 Ok(())
142 }
143
144 fn validate_metas(&self, metas: &InvertedIndexMetas, payload_size: u64) -> Result<()> {
146 ensure!(
147 metas.segment_row_count > 0,
148 UnexpectedZeroSegmentRowCountSnafu
149 );
150
151 for meta in metas.metas.values() {
152 let InvertedIndexMeta {
153 base_offset,
154 inverted_index_size,
155 ..
156 } = meta;
157
158 let limit = self.blob_size - FOOTER_PAYLOAD_SIZE_SIZE - payload_size;
159 ensure!(
160 *base_offset + *inverted_index_size <= limit,
161 UnexpectedOffsetSizeSnafu {
162 offset: *base_offset,
163 size: *inverted_index_size,
164 blob_size: self.blob_size,
165 payload_size,
166 }
167 );
168 }
169
170 Ok(())
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use std::assert_matches::assert_matches;
177
178 use prost::Message;
179
180 use super::*;
181 use crate::inverted_index::error::Error;
182
183 fn create_test_payload(meta: InvertedIndexMeta) -> Vec<u8> {
184 let mut metas = InvertedIndexMetas {
185 segment_row_count: 1,
186 ..Default::default()
187 };
188 metas.metas.insert("test".to_string(), meta);
189
190 let mut payload_buf = vec![];
191 metas.encode(&mut payload_buf).unwrap();
192
193 let footer_payload_size = (payload_buf.len() as u32).to_le_bytes().to_vec();
194 payload_buf.extend_from_slice(&footer_payload_size);
195 payload_buf
196 }
197
198 #[tokio::test]
199 async fn test_read_payload() {
200 let meta = InvertedIndexMeta {
201 name: "test".to_string(),
202 ..Default::default()
203 };
204
205 let payload_buf = create_test_payload(meta);
206 let blob_size = payload_buf.len() as u64;
207
208 for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
209 let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
210 if prefetch > 0 {
211 reader = reader.with_prefetch_size(prefetch);
212 }
213
214 let metas = reader.metadata(None).await.unwrap();
215 assert_eq!(metas.metas.len(), 1);
216 let index_meta = &metas.metas.get("test").unwrap();
217 assert_eq!(index_meta.name, "test");
218 }
219 }
220
221 #[tokio::test]
222 async fn test_invalid_footer_payload_size() {
223 let meta = InvertedIndexMeta {
224 name: "test".to_string(),
225 ..Default::default()
226 };
227 let mut payload_buf = create_test_payload(meta);
228 payload_buf.push(0xff); let blob_size = payload_buf.len() as u64;
230
231 for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
232 let blob_size = payload_buf.len() as u64;
233 let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
234 if prefetch > 0 {
235 reader = reader.with_prefetch_size(prefetch);
236 }
237
238 let result = reader.metadata(None).await;
239 assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
240 }
241 }
242
243 #[tokio::test]
244 async fn test_invalid_offset_size() {
245 let meta = InvertedIndexMeta {
246 name: "test".to_string(),
247 base_offset: 0,
248 inverted_index_size: 1, ..Default::default()
250 };
251
252 let payload_buf = create_test_payload(meta);
253 let blob_size = payload_buf.len() as u64;
254
255 for prefetch in [0, blob_size / 2, blob_size, blob_size + 10] {
256 let mut reader = InvertedIndexFooterReader::new(&payload_buf, blob_size);
257 if prefetch > 0 {
258 reader = reader.with_prefetch_size(prefetch);
259 }
260
261 let result = reader.metadata(None).await;
262 assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
263 }
264 }
265}