index/bloom_filter/
creator.rs1mod finalize_segment;
16mod intermediate_codec;
17
18use std::collections::HashSet;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use finalize_segment::FinalizedBloomFilterStorage;
23use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
24use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
25use prost::Message;
26use snafu::ResultExt;
27
28use crate::bloom_filter::error::{IoSnafu, Result};
29use crate::bloom_filter::SEED;
30use crate::external_provider::ExternalTempFileProvider;
31use crate::Bytes;
32
33pub const FALSE_POSITIVE_RATE: f64 = 0.01;
35
36pub struct BloomFilterCreator {
52 rows_per_segment: usize,
54
55 accumulated_row_count: usize,
57
58 cur_seg_distinct_elems: HashSet<Bytes>,
60
61 cur_seg_distinct_elems_mem_usage: usize,
63
64 finalized_bloom_filters: FinalizedBloomFilterStorage,
66
67 finalized_row_count: usize,
69
70 global_memory_usage: Arc<AtomicUsize>,
72}
73
74impl BloomFilterCreator {
75 pub fn new(
81 rows_per_segment: usize,
82 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
83 global_memory_usage: Arc<AtomicUsize>,
84 global_memory_usage_threshold: Option<usize>,
85 ) -> Self {
86 assert!(
87 rows_per_segment > 0,
88 "rows_per_segment must be greater than 0"
89 );
90
91 Self {
92 rows_per_segment,
93 accumulated_row_count: 0,
94 cur_seg_distinct_elems: HashSet::default(),
95 cur_seg_distinct_elems_mem_usage: 0,
96 global_memory_usage: global_memory_usage.clone(),
97 finalized_bloom_filters: FinalizedBloomFilterStorage::new(
98 intermediate_provider,
99 global_memory_usage,
100 global_memory_usage_threshold,
101 ),
102 finalized_row_count: 0,
103 }
104 }
105
106 pub async fn push_n_row_elems(
109 &mut self,
110 mut nrows: usize,
111 elems: impl IntoIterator<Item = Bytes>,
112 ) -> Result<()> {
113 if nrows == 0 {
114 return Ok(());
115 }
116 if nrows == 1 {
117 return self.push_row_elems(elems).await;
118 }
119
120 let elems = elems.into_iter().collect::<Vec<_>>();
121 while nrows > 0 {
122 let rows_to_seg_end =
123 self.rows_per_segment - (self.accumulated_row_count % self.rows_per_segment);
124 let rows_to_push = nrows.min(rows_to_seg_end);
125 nrows -= rows_to_push;
126
127 self.accumulated_row_count += rows_to_push;
128
129 let mut mem_diff = 0;
130 for elem in &elems {
131 let len = elem.len();
132 let is_new = self.cur_seg_distinct_elems.insert(elem.clone());
133 if is_new {
134 mem_diff += len;
135 }
136 }
137 self.cur_seg_distinct_elems_mem_usage += mem_diff;
138 self.global_memory_usage
139 .fetch_add(mem_diff, Ordering::Relaxed);
140
141 if self.accumulated_row_count % self.rows_per_segment == 0 {
142 self.finalize_segment().await?;
143 self.finalized_row_count = self.accumulated_row_count;
144 }
145 }
146
147 Ok(())
148 }
149
150 pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
153 self.accumulated_row_count += 1;
154
155 let mut mem_diff = 0;
156 for elem in elems.into_iter() {
157 let len = elem.len();
158 let is_new = self.cur_seg_distinct_elems.insert(elem);
159 if is_new {
160 mem_diff += len;
161 }
162 }
163 self.cur_seg_distinct_elems_mem_usage += mem_diff;
164 self.global_memory_usage
165 .fetch_add(mem_diff, Ordering::Relaxed);
166
167 if self.accumulated_row_count % self.rows_per_segment == 0 {
168 self.finalize_segment().await?;
169 self.finalized_row_count = self.accumulated_row_count;
170 }
171
172 Ok(())
173 }
174
175 pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
177 if self.accumulated_row_count > self.finalized_row_count {
178 self.finalize_segment().await?;
179 }
180
181 let mut meta = BloomFilterMeta {
182 rows_per_segment: self.rows_per_segment as _,
183 row_count: self.accumulated_row_count as _,
184 ..Default::default()
185 };
186
187 let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
188 meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
189 meta.segment_count = meta.segment_loc_indices.len() as _;
190
191 while let Some(segment) = segs.next().await {
192 let segment = segment?;
193 writer
194 .write_all(&segment.bloom_filter_bytes)
195 .await
196 .context(IoSnafu)?;
197
198 let size = segment.bloom_filter_bytes.len() as u64;
199 meta.bloom_filter_locs.push(BloomFilterLoc {
200 offset: meta.bloom_filter_size as _,
201 size,
202 element_count: segment.element_count as _,
203 });
204 meta.bloom_filter_size += size;
205 }
206
207 let meta_bytes = meta.encode_to_vec();
208 writer.write_all(&meta_bytes).await.context(IoSnafu)?;
209
210 let meta_size = meta_bytes.len() as u32;
211 writer
212 .write_all(&meta_size.to_le_bytes())
213 .await
214 .context(IoSnafu)?;
215 writer.flush().await.unwrap();
216
217 Ok(())
218 }
219
220 pub fn memory_usage(&self) -> usize {
222 self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
223 }
224
225 async fn finalize_segment(&mut self) -> Result<()> {
226 let elem_count = self.cur_seg_distinct_elems.len();
227 self.finalized_bloom_filters
228 .add(self.cur_seg_distinct_elems.drain(), elem_count)
229 .await?;
230
231 self.global_memory_usage
232 .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
233 self.cur_seg_distinct_elems_mem_usage = 0;
234 Ok(())
235 }
236}
237
238impl Drop for BloomFilterCreator {
239 fn drop(&mut self) {
240 self.global_memory_usage
241 .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use fastbloom::BloomFilter;
248 use futures::io::Cursor;
249
250 use super::*;
251 use crate::external_provider::MockExternalTempFileProvider;
252
253 pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
255 bytes
256 .chunks_exact(std::mem::size_of::<u64>())
257 .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
258 .collect()
259 }
260
261 #[tokio::test]
262 async fn test_bloom_filter_creator() {
263 let mut writer = Cursor::new(Vec::new());
264 let mut creator = BloomFilterCreator::new(
265 2,
266 Arc::new(MockExternalTempFileProvider::new()),
267 Arc::new(AtomicUsize::new(0)),
268 None,
269 );
270
271 creator
272 .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
273 .await
274 .unwrap();
275 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
276 assert!(creator.memory_usage() > 0);
277
278 creator
279 .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
280 .await
281 .unwrap();
282 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
284 assert!(creator.memory_usage() > 0);
285
286 creator
287 .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
288 .await
289 .unwrap();
290 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
291 assert!(creator.memory_usage() > 0);
292
293 creator.finish(&mut writer).await.unwrap();
294
295 let bytes = writer.into_inner();
296 let total_size = bytes.len();
297 let meta_size_offset = total_size - 4;
298 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
299
300 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
301 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
302
303 assert_eq!(meta.rows_per_segment, 2);
304 assert_eq!(meta.segment_count, 2);
305 assert_eq!(meta.row_count, 3);
306 assert_eq!(
307 meta.bloom_filter_size as usize + meta_bytes.len() + 4,
308 total_size
309 );
310
311 let mut bfs = Vec::new();
312 for segment in meta.bloom_filter_locs {
313 let bloom_filter_bytes =
314 &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
315 let v = u64_vec_from_bytes(bloom_filter_bytes);
316 let bloom_filter = BloomFilter::from_vec(v)
317 .seed(&SEED)
318 .expected_items(segment.element_count as usize);
319 bfs.push(bloom_filter);
320 }
321
322 assert_eq!(meta.segment_loc_indices.len(), 2);
323
324 let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
325 assert!(bf0.contains(&b"a"));
326 assert!(bf0.contains(&b"b"));
327 assert!(bf0.contains(&b"c"));
328 assert!(bf0.contains(&b"d"));
329
330 let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
331 assert!(bf1.contains(&b"e"));
332 assert!(bf1.contains(&b"f"));
333 }
334
335 #[tokio::test]
336 async fn test_bloom_filter_creator_batch_push() {
337 let mut writer = Cursor::new(Vec::new());
338 let mut creator: BloomFilterCreator = BloomFilterCreator::new(
339 2,
340 Arc::new(MockExternalTempFileProvider::new()),
341 Arc::new(AtomicUsize::new(0)),
342 None,
343 );
344
345 creator
346 .push_n_row_elems(5, vec![b"a".to_vec(), b"b".to_vec()])
347 .await
348 .unwrap();
349 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
350 assert!(creator.memory_usage() > 0);
351
352 creator
353 .push_n_row_elems(5, vec![b"c".to_vec(), b"d".to_vec()])
354 .await
355 .unwrap();
356 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
357 assert!(creator.memory_usage() > 0);
358
359 creator
360 .push_n_row_elems(10, vec![b"e".to_vec(), b"f".to_vec()])
361 .await
362 .unwrap();
363 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
364 assert!(creator.memory_usage() > 0);
365
366 creator.finish(&mut writer).await.unwrap();
367
368 let bytes = writer.into_inner();
369 let total_size = bytes.len();
370 let meta_size_offset = total_size - 4;
371 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
372
373 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
374 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
375
376 assert_eq!(meta.rows_per_segment, 2);
377 assert_eq!(meta.segment_count, 10);
378 assert_eq!(meta.row_count, 20);
379 assert_eq!(
380 meta.bloom_filter_size as usize + meta_bytes.len() + 4,
381 total_size
382 );
383
384 let mut bfs = Vec::new();
385 for segment in meta.bloom_filter_locs {
386 let bloom_filter_bytes =
387 &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
388 let v = u64_vec_from_bytes(bloom_filter_bytes);
389 let bloom_filter = BloomFilter::from_vec(v)
390 .seed(&SEED)
391 .expected_items(segment.element_count as _);
392 bfs.push(bloom_filter);
393 }
394
395 assert_eq!(bfs.len(), 4);
397 assert_eq!(meta.segment_loc_indices.len(), 10);
398
399 for idx in meta.segment_loc_indices.iter().take(3) {
400 let bf = &bfs[*idx as usize];
401 assert!(bf.contains(&b"a"));
402 assert!(bf.contains(&b"b"));
403 }
404 for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
405 let bf = &bfs[*idx as usize];
406 assert!(bf.contains(&b"c"));
407 assert!(bf.contains(&b"d"));
408 }
409 for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
410 let bf = &bfs[*idx as usize];
411 assert!(bf.contains(&b"e"));
412 assert!(bf.contains(&b"f"));
413 }
414 }
415
416 #[tokio::test]
417 async fn test_final_seg_all_null() {
418 let mut writer = Cursor::new(Vec::new());
419 let mut creator = BloomFilterCreator::new(
420 2,
421 Arc::new(MockExternalTempFileProvider::new()),
422 Arc::new(AtomicUsize::new(0)),
423 None,
424 );
425
426 creator
427 .push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
428 .await
429 .unwrap();
430 creator.push_row_elems(Vec::new()).await.unwrap();
431
432 creator.finish(&mut writer).await.unwrap();
433
434 let bytes = writer.into_inner();
435 let total_size = bytes.len();
436 let meta_size_offset = total_size - 4;
437 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
438
439 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
440 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
441
442 assert_eq!(meta.rows_per_segment, 2);
443 assert_eq!(meta.segment_count, 3);
444 assert_eq!(meta.row_count, 5);
445 }
446}