1mod finalize_segment;
16mod intermediate_codec;
17
18use std::collections::HashSet;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22use finalize_segment::FinalizedBloomFilterStorage;
23use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
24use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
25use prost::Message;
26use snafu::ResultExt;
27
28use crate::Bytes;
29use crate::bloom_filter::SEED;
30use crate::bloom_filter::error::{IoSnafu, Result};
31use crate::external_provider::ExternalTempFileProvider;
32
33pub struct BloomFilterCreator {
49 rows_per_segment: usize,
51
52 accumulated_row_count: usize,
54
55 cur_seg_distinct_elems: HashSet<Bytes>,
57
58 cur_seg_distinct_elems_mem_usage: usize,
60
61 finalized_bloom_filters: FinalizedBloomFilterStorage,
63
64 finalized_row_count: usize,
66
67 global_memory_usage: Arc<AtomicUsize>,
69}
70
71impl BloomFilterCreator {
72 pub fn new(
78 rows_per_segment: usize,
79 false_positive_rate: f64,
80 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
81 global_memory_usage: Arc<AtomicUsize>,
82 global_memory_usage_threshold: Option<usize>,
83 ) -> Self {
84 assert!(
85 rows_per_segment > 0,
86 "rows_per_segment must be greater than 0"
87 );
88
89 Self {
90 rows_per_segment,
91 accumulated_row_count: 0,
92 cur_seg_distinct_elems: HashSet::default(),
93 cur_seg_distinct_elems_mem_usage: 0,
94 global_memory_usage: global_memory_usage.clone(),
95 finalized_bloom_filters: FinalizedBloomFilterStorage::new(
96 false_positive_rate,
97 intermediate_provider,
98 global_memory_usage,
99 global_memory_usage_threshold,
100 ),
101 finalized_row_count: 0,
102 }
103 }
104
105 pub async fn push_n_row_elems(
108 &mut self,
109 mut nrows: usize,
110 elems: impl IntoIterator<Item = Bytes>,
111 ) -> Result<()> {
112 if nrows == 0 {
113 return Ok(());
114 }
115 if nrows == 1 {
116 return self.push_row_elems(elems).await;
117 }
118
119 let elems = elems.into_iter().collect::<Vec<_>>();
120 while nrows > 0 {
121 let rows_to_seg_end =
122 self.rows_per_segment - (self.accumulated_row_count % self.rows_per_segment);
123 let rows_to_push = nrows.min(rows_to_seg_end);
124 nrows -= rows_to_push;
125
126 self.accumulated_row_count += rows_to_push;
127
128 let mut mem_diff = 0;
129 for elem in &elems {
130 let len = elem.len();
131 let is_new = self.cur_seg_distinct_elems.insert(elem.clone());
132 if is_new {
133 mem_diff += len;
134 }
135 }
136 self.cur_seg_distinct_elems_mem_usage += mem_diff;
137 self.global_memory_usage
138 .fetch_add(mem_diff, Ordering::Relaxed);
139
140 if self
141 .accumulated_row_count
142 .is_multiple_of(self.rows_per_segment)
143 {
144 self.finalize_segment().await?;
145 self.finalized_row_count = self.accumulated_row_count;
146 }
147 }
148
149 Ok(())
150 }
151
152 pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
155 self.accumulated_row_count += 1;
156
157 let mut mem_diff = 0;
158 for elem in elems.into_iter() {
159 let len = elem.len();
160 let is_new = self.cur_seg_distinct_elems.insert(elem);
161 if is_new {
162 mem_diff += len;
163 }
164 }
165 self.cur_seg_distinct_elems_mem_usage += mem_diff;
166 self.global_memory_usage
167 .fetch_add(mem_diff, Ordering::Relaxed);
168
169 if self
170 .accumulated_row_count
171 .is_multiple_of(self.rows_per_segment)
172 {
173 self.finalize_segment().await?;
174 self.finalized_row_count = self.accumulated_row_count;
175 }
176
177 Ok(())
178 }
179
180 pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
182 if self.accumulated_row_count > self.finalized_row_count {
183 self.finalize_segment().await?;
184 }
185
186 let mut meta = BloomFilterMeta {
187 rows_per_segment: self.rows_per_segment as _,
188 row_count: self.accumulated_row_count as _,
189 ..Default::default()
190 };
191
192 let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
193 meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
194 meta.segment_count = meta.segment_loc_indices.len() as _;
195
196 while let Some(segment) = segs.next().await {
197 let segment = segment?;
198 writer
199 .write_all(&segment.bloom_filter_bytes)
200 .await
201 .context(IoSnafu)?;
202
203 let size = segment.bloom_filter_bytes.len() as u64;
204 meta.bloom_filter_locs.push(BloomFilterLoc {
205 offset: meta.bloom_filter_size as _,
206 size,
207 element_count: segment.element_count as _,
208 });
209 meta.bloom_filter_size += size;
210 }
211
212 let meta_bytes = meta.encode_to_vec();
213 writer.write_all(&meta_bytes).await.context(IoSnafu)?;
214
215 let meta_size = meta_bytes.len() as u32;
216 writer
217 .write_all(&meta_size.to_le_bytes())
218 .await
219 .context(IoSnafu)?;
220 writer.flush().await.unwrap();
221
222 Ok(())
223 }
224
225 pub fn memory_usage(&self) -> usize {
227 self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
228 }
229
230 async fn finalize_segment(&mut self) -> Result<()> {
231 let elem_count = self.cur_seg_distinct_elems.len();
232 self.finalized_bloom_filters
233 .add(self.cur_seg_distinct_elems.drain(), elem_count)
234 .await?;
235
236 self.global_memory_usage
237 .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
238 self.cur_seg_distinct_elems_mem_usage = 0;
239 Ok(())
240 }
241}
242
243impl Drop for BloomFilterCreator {
244 fn drop(&mut self) {
245 self.global_memory_usage
246 .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use fastbloom::BloomFilter;
253 use futures::io::Cursor;
254
255 use super::*;
256 use crate::external_provider::MockExternalTempFileProvider;
257
258 pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
260 bytes
261 .chunks_exact(std::mem::size_of::<u64>())
262 .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
263 .collect()
264 }
265
266 #[tokio::test]
267 async fn test_bloom_filter_creator() {
268 let mut writer = Cursor::new(Vec::new());
269 let mut creator = BloomFilterCreator::new(
270 2,
271 0.01,
272 Arc::new(MockExternalTempFileProvider::new()),
273 Arc::new(AtomicUsize::new(0)),
274 None,
275 );
276
277 creator
278 .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
279 .await
280 .unwrap();
281 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
282 assert!(creator.memory_usage() > 0);
283
284 creator
285 .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
286 .await
287 .unwrap();
288 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
290 assert!(creator.memory_usage() > 0);
291
292 creator
293 .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
294 .await
295 .unwrap();
296 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
297 assert!(creator.memory_usage() > 0);
298
299 creator.finish(&mut writer).await.unwrap();
300
301 let bytes = writer.into_inner();
302 let total_size = bytes.len();
303 let meta_size_offset = total_size - 4;
304 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
305
306 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
307 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
308
309 assert_eq!(meta.rows_per_segment, 2);
310 assert_eq!(meta.segment_count, 2);
311 assert_eq!(meta.row_count, 3);
312 assert_eq!(
313 meta.bloom_filter_size as usize + meta_bytes.len() + 4,
314 total_size
315 );
316
317 let mut bfs = Vec::new();
318 for segment in meta.bloom_filter_locs {
319 let bloom_filter_bytes =
320 &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
321 let v = u64_vec_from_bytes(bloom_filter_bytes);
322 let bloom_filter = BloomFilter::from_vec(v)
323 .seed(&SEED)
324 .expected_items(segment.element_count as usize);
325 bfs.push(bloom_filter);
326 }
327
328 assert_eq!(meta.segment_loc_indices.len(), 2);
329
330 let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
331 assert!(bf0.contains(&b"a"));
332 assert!(bf0.contains(&b"b"));
333 assert!(bf0.contains(&b"c"));
334 assert!(bf0.contains(&b"d"));
335
336 let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
337 assert!(bf1.contains(&b"e"));
338 assert!(bf1.contains(&b"f"));
339 }
340
341 #[tokio::test]
342 async fn test_bloom_filter_creator_batch_push() {
343 let mut writer = Cursor::new(Vec::new());
344 let mut creator: BloomFilterCreator = BloomFilterCreator::new(
345 2,
346 0.01,
347 Arc::new(MockExternalTempFileProvider::new()),
348 Arc::new(AtomicUsize::new(0)),
349 None,
350 );
351
352 creator
353 .push_n_row_elems(5, vec![b"a".to_vec(), b"b".to_vec()])
354 .await
355 .unwrap();
356 assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
357 assert!(creator.memory_usage() > 0);
358
359 creator
360 .push_n_row_elems(5, vec![b"c".to_vec(), b"d".to_vec()])
361 .await
362 .unwrap();
363 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
364 assert!(creator.memory_usage() > 0);
365
366 creator
367 .push_n_row_elems(10, vec![b"e".to_vec(), b"f".to_vec()])
368 .await
369 .unwrap();
370 assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
371 assert!(creator.memory_usage() > 0);
372
373 creator.finish(&mut writer).await.unwrap();
374
375 let bytes = writer.into_inner();
376 let total_size = bytes.len();
377 let meta_size_offset = total_size - 4;
378 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
379
380 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
381 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
382
383 assert_eq!(meta.rows_per_segment, 2);
384 assert_eq!(meta.segment_count, 10);
385 assert_eq!(meta.row_count, 20);
386 assert_eq!(
387 meta.bloom_filter_size as usize + meta_bytes.len() + 4,
388 total_size
389 );
390
391 let mut bfs = Vec::new();
392 for segment in meta.bloom_filter_locs {
393 let bloom_filter_bytes =
394 &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
395 let v = u64_vec_from_bytes(bloom_filter_bytes);
396 let bloom_filter = BloomFilter::from_vec(v)
397 .seed(&SEED)
398 .expected_items(segment.element_count as _);
399 bfs.push(bloom_filter);
400 }
401
402 assert_eq!(bfs.len(), 4);
404 assert_eq!(meta.segment_loc_indices.len(), 10);
405
406 for idx in meta.segment_loc_indices.iter().take(3) {
407 let bf = &bfs[*idx as usize];
408 assert!(bf.contains(&b"a"));
409 assert!(bf.contains(&b"b"));
410 }
411 for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
412 let bf = &bfs[*idx as usize];
413 assert!(bf.contains(&b"c"));
414 assert!(bf.contains(&b"d"));
415 }
416 for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
417 let bf = &bfs[*idx as usize];
418 assert!(bf.contains(&b"e"));
419 assert!(bf.contains(&b"f"));
420 }
421 }
422
423 #[tokio::test]
424 async fn test_final_seg_all_null() {
425 let mut writer = Cursor::new(Vec::new());
426 let mut creator = BloomFilterCreator::new(
427 2,
428 0.01,
429 Arc::new(MockExternalTempFileProvider::new()),
430 Arc::new(AtomicUsize::new(0)),
431 None,
432 );
433
434 creator
435 .push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
436 .await
437 .unwrap();
438 creator.push_row_elems(Vec::new()).await.unwrap();
439
440 creator.finish(&mut writer).await.unwrap();
441
442 let bytes = writer.into_inner();
443 let total_size = bytes.len();
444 let meta_size_offset = total_size - 4;
445 let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
446
447 let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
448 let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
449
450 assert_eq!(meta.rows_per_segment, 2);
451 assert_eq!(meta.segment_count, 3);
452 assert_eq!(meta.row_count, 5);
453 }
454}