1mod finalize_segment;
16mod intermediate_codec;
17
18use std::collections::HashSet;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use finalize_segment::FinalizedBloomFilterStorage;
23use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
24use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
25use prost::Message;
26use snafu::ResultExt;
27
28use crate::bloom_filter::error::{IoSnafu, Result};
29use crate::bloom_filter::SEED;
30use crate::external_provider::ExternalTempFileProvider;
31use crate::Bytes;
32
33pub struct BloomFilterCreator {
49 rows_per_segment: usize,
51
52 accumulated_row_count: usize,
54
55 cur_seg_distinct_elems: HashSet<Bytes>,
57
58 cur_seg_distinct_elems_mem_usage: usize,
60
61 finalized_bloom_filters: FinalizedBloomFilterStorage,
63
64 finalized_row_count: usize,
66
67 global_memory_usage: Arc<AtomicUsize>,
69}
70
71impl BloomFilterCreator {
72 pub fn new(
78 rows_per_segment: usize,
79 false_positive_rate: f64,
80 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
81 global_memory_usage: Arc<AtomicUsize>,
82 global_memory_usage_threshold: Option<usize>,
83 ) -> Self {
84 assert!(
85 rows_per_segment > 0,
86 "rows_per_segment must be greater than 0"
87 );
88
89 Self {
90 rows_per_segment,
91 accumulated_row_count: 0,
92 cur_seg_distinct_elems: HashSet::default(),
93 cur_seg_distinct_elems_mem_usage: 0,
94 global_memory_usage: global_memory_usage.clone(),
95 finalized_bloom_filters: FinalizedBloomFilterStorage::new(
96 false_positive_rate,
97 intermediate_provider,
98 global_memory_usage,
99 global_memory_usage_threshold,
100 ),
101 finalized_row_count: 0,
102 }
103 }
104
105 pub async fn push_n_row_elems(
108 &mut self,
109 mut nrows: usize,
110 elems: impl IntoIterator<Item = Bytes>,
111 ) -> Result<()> {
112 if nrows == 0 {
113 return Ok(());
114 }
115 if nrows == 1 {
116 return self.push_row_elems(elems).await;
117 }
118
119 let elems = elems.into_iter().collect::<Vec<_>>();
120 while nrows > 0 {
121 let rows_to_seg_end =
122 self.rows_per_segment - (self.accumulated_row_count % self.rows_per_segment);
123 let rows_to_push = nrows.min(rows_to_seg_end);
124 nrows -= rows_to_push;
125
126 self.accumulated_row_count += rows_to_push;
127
128 let mut mem_diff = 0;
129 for elem in &elems {
130 let len = elem.len();
131 let is_new = self.cur_seg_distinct_elems.insert(elem.clone());
132 if is_new {
133 mem_diff += len;
134 }
135 }
136 self.cur_seg_distinct_elems_mem_usage += mem_diff;
137 self.global_memory_usage
138 .fetch_add(mem_diff, Ordering::Relaxed);
139
140 if self.accumulated_row_count % self.rows_per_segment == 0 {
141 self.finalize_segment().await?;
142 self.finalized_row_count = self.accumulated_row_count;
143 }
144 }
145
146 Ok(())
147 }
148
149 pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
152 self.accumulated_row_count += 1;
153
154 let mut mem_diff = 0;
155 for elem in elems.into_iter() {
156 let len = elem.len();
157 let is_new = self.cur_seg_distinct_elems.insert(elem);
158 if is_new {
159 mem_diff += len;
160 }
161 }
162 self.cur_seg_distinct_elems_mem_usage += mem_diff;
163 self.global_memory_usage
164 .fetch_add(mem_diff, Ordering::Relaxed);
165
166 if self.accumulated_row_count % self.rows_per_segment == 0 {
167 self.finalize_segment().await?;
168 self.finalized_row_count = self.accumulated_row_count;
169 }
170
171 Ok(())
172 }
173
174 pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
176 if self.accumulated_row_count > self.finalized_row_count {
177 self.finalize_segment().await?;
178 }
179
180 let mut meta = BloomFilterMeta {
181 rows_per_segment: self.rows_per_segment as _,
182 row_count: self.accumulated_row_count as _,
183 ..Default::default()
184 };
185
186 let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
187 meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
188 meta.segment_count = meta.segment_loc_indices.len() as _;
189
190 while let Some(segment) = segs.next().await {
191 let segment = segment?;
192 writer
193 .write_all(&segment.bloom_filter_bytes)
194 .await
195 .context(IoSnafu)?;
196
197 let size = segment.bloom_filter_bytes.len() as u64;
198 meta.bloom_filter_locs.push(BloomFilterLoc {
199 offset: meta.bloom_filter_size as _,
200 size,
201 element_count: segment.element_count as _,
202 });
203 meta.bloom_filter_size += size;
204 }
205
206 let meta_bytes = meta.encode_to_vec();
207 writer.write_all(&meta_bytes).await.context(IoSnafu)?;
208
209 let meta_size = meta_bytes.len() as u32;
210 writer
211 .write_all(&meta_size.to_le_bytes())
212 .await
213 .context(IoSnafu)?;
214 writer.flush().await.unwrap();
215
216 Ok(())
217 }
218
219 pub fn memory_usage(&self) -> usize {
221 self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
222 }
223
224 async fn finalize_segment(&mut self) -> Result<()> {
225 let elem_count = self.cur_seg_distinct_elems.len();
226 self.finalized_bloom_filters
227 .add(self.cur_seg_distinct_elems.drain(), elem_count)
228 .await?;
229
230 self.global_memory_usage
231 .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
232 self.cur_seg_distinct_elems_mem_usage = 0;
233 Ok(())
234 }
235}
236
237impl Drop for BloomFilterCreator {
238 fn drop(&mut self) {
239 self.global_memory_usage
240 .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use fastbloom::BloomFilter;
247 use futures::io::Cursor;
248
249 use super::*;
250 use crate::external_provider::MockExternalTempFileProvider;
251
252 pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
254 bytes
255 .chunks_exact(std::mem::size_of::<u64>())
256 .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
257 .collect()
258 }
259
260 #[tokio::test]
261 async fn test_bloom_filter_creator() {
262 let mut writer = Cursor::new(Vec::new());
263 let mut creator = BloomFilterCreator::new(
264 2,
265 0.01,
266 Arc::new(MockExternalTempFileProvider::new()),
267 Arc::new(AtomicUsize::new(0)),
268 None,
269 );
270
271 creator
272 .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
273 .await
274 .unwrap();
275 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
276 assert!(creator.memory_usage() > 0);
277
278 creator
279 .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
280 .await
281 .unwrap();
282 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
284 assert!(creator.memory_usage() > 0);
285
286 creator
287 .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
288 .await
289 .unwrap();
290 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
291 assert!(creator.memory_usage() > 0);
292
293 creator.finish(&mut writer).await.unwrap();
294
295 let bytes = writer.into_inner();
296 let total_size = bytes.len();
297 let meta_size_offset = total_size - 4;
298 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
299
300 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
301 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
302
303 assert_eq!(meta.rows_per_segment, 2);
304 assert_eq!(meta.segment_count, 2);
305 assert_eq!(meta.row_count, 3);
306 assert_eq!(
307 meta.bloom_filter_size as usize + meta_bytes.len() + 4,
308 total_size
309 );
310
311 let mut bfs = Vec::new();
312 for segment in meta.bloom_filter_locs {
313 let bloom_filter_bytes =
314 &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
315 let v = u64_vec_from_bytes(bloom_filter_bytes);
316 let bloom_filter = BloomFilter::from_vec(v)
317 .seed(&SEED)
318 .expected_items(segment.element_count as usize);
319 bfs.push(bloom_filter);
320 }
321
322 assert_eq!(meta.segment_loc_indices.len(), 2);
323
324 let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
325 assert!(bf0.contains(&b"a"));
326 assert!(bf0.contains(&b"b"));
327 assert!(bf0.contains(&b"c"));
328 assert!(bf0.contains(&b"d"));
329
330 let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
331 assert!(bf1.contains(&b"e"));
332 assert!(bf1.contains(&b"f"));
333 }
334
335 #[tokio::test]
336 async fn test_bloom_filter_creator_batch_push() {
337 let mut writer = Cursor::new(Vec::new());
338 let mut creator: BloomFilterCreator = BloomFilterCreator::new(
339 2,
340 0.01,
341 Arc::new(MockExternalTempFileProvider::new()),
342 Arc::new(AtomicUsize::new(0)),
343 None,
344 );
345
346 creator
347 .push_n_row_elems(5, vec![b"a".to_vec(), b"b".to_vec()])
348 .await
349 .unwrap();
350 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
351 assert!(creator.memory_usage() > 0);
352
353 creator
354 .push_n_row_elems(5, vec![b"c".to_vec(), b"d".to_vec()])
355 .await
356 .unwrap();
357 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
358 assert!(creator.memory_usage() > 0);
359
360 creator
361 .push_n_row_elems(10, vec![b"e".to_vec(), b"f".to_vec()])
362 .await
363 .unwrap();
364 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
365 assert!(creator.memory_usage() > 0);
366
367 creator.finish(&mut writer).await.unwrap();
368
369 let bytes = writer.into_inner();
370 let total_size = bytes.len();
371 let meta_size_offset = total_size - 4;
372 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
373
374 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
375 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
376
377 assert_eq!(meta.rows_per_segment, 2);
378 assert_eq!(meta.segment_count, 10);
379 assert_eq!(meta.row_count, 20);
380 assert_eq!(
381 meta.bloom_filter_size as usize + meta_bytes.len() + 4,
382 total_size
383 );
384
385 let mut bfs = Vec::new();
386 for segment in meta.bloom_filter_locs {
387 let bloom_filter_bytes =
388 &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
389 let v = u64_vec_from_bytes(bloom_filter_bytes);
390 let bloom_filter = BloomFilter::from_vec(v)
391 .seed(&SEED)
392 .expected_items(segment.element_count as _);
393 bfs.push(bloom_filter);
394 }
395
396 assert_eq!(bfs.len(), 4);
398 assert_eq!(meta.segment_loc_indices.len(), 10);
399
400 for idx in meta.segment_loc_indices.iter().take(3) {
401 let bf = &bfs[*idx as usize];
402 assert!(bf.contains(&b"a"));
403 assert!(bf.contains(&b"b"));
404 }
405 for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
406 let bf = &bfs[*idx as usize];
407 assert!(bf.contains(&b"c"));
408 assert!(bf.contains(&b"d"));
409 }
410 for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
411 let bf = &bfs[*idx as usize];
412 assert!(bf.contains(&b"e"));
413 assert!(bf.contains(&b"f"));
414 }
415 }
416
417 #[tokio::test]
418 async fn test_final_seg_all_null() {
419 let mut writer = Cursor::new(Vec::new());
420 let mut creator = BloomFilterCreator::new(
421 2,
422 0.01,
423 Arc::new(MockExternalTempFileProvider::new()),
424 Arc::new(AtomicUsize::new(0)),
425 None,
426 );
427
428 creator
429 .push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
430 .await
431 .unwrap();
432 creator.push_row_elems(Vec::new()).await.unwrap();
433
434 creator.finish(&mut writer).await.unwrap();
435
436 let bytes = writer.into_inner();
437 let total_size = bytes.len();
438 let meta_size_offset = total_size - 4;
439 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
440
441 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
442 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
443
444 assert_eq!(meta.rows_per_segment, 2);
445 assert_eq!(meta.segment_count, 3);
446 assert_eq!(meta.row_count, 5);
447 }
448}