1use std::collections::{BTreeMap, VecDeque};
16use std::mem;
17use std::num::NonZeroUsize;
18use std::ops::RangeInclusive;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use common_telemetry::{debug, error};
24use futures::stream;
25use snafu::ResultExt;
26
27use crate::bitmap::Bitmap;
28use crate::external_provider::ExternalTempFileProvider;
29use crate::inverted_index::create::sort::intermediate_rw::{
30 IntermediateReader, IntermediateWriter,
31};
32use crate::inverted_index::create::sort::merge_stream::MergeSortedStream;
33use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter};
34use crate::inverted_index::create::sort_create::SorterFactory;
35use crate::inverted_index::error::{IntermediateSnafu, Result};
36use crate::{Bytes, BytesRef};
37
38pub struct ExternalSorter {
41 index_name: String,
43
44 temp_file_provider: Arc<dyn ExternalTempFileProvider>,
46
47 segment_null_bitmap: Bitmap,
49
50 values_buffer: BTreeMap<Bytes, (Bitmap, usize)>,
52
53 total_row_count: usize,
55
56 segment_row_count: NonZeroUsize,
59
60 current_memory_usage: usize,
62
63 current_memory_usage_threshold: Option<usize>,
68
69 global_memory_usage: Arc<AtomicUsize>,
71
72 global_memory_usage_sort_limit: Option<usize>,
77}
78
79#[async_trait]
80impl Sorter for ExternalSorter {
81 async fn push_n(&mut self, value: Option<BytesRef<'_>>, n: usize) -> Result<()> {
84 if n == 0 {
85 return Ok(());
86 }
87
88 let segment_index_range = self.segment_index_range(n);
89 self.total_row_count += n;
90
91 if let Some(value) = value {
92 let memory_diff = self.push_not_null(value, segment_index_range);
93 self.may_dump_buffer(memory_diff).await
94 } else {
95 self.segment_null_bitmap.insert_range(segment_index_range);
96 Ok(())
97 }
98 }
99
100 async fn output(&mut self) -> Result<SortOutput> {
103 let readers = self
104 .temp_file_provider
105 .read_all(&self.index_name)
106 .await
107 .context(IntermediateSnafu)?;
108
109 let mut tree_nodes: VecDeque<SortedStream> = VecDeque::with_capacity(readers.len() + 1);
112 tree_nodes.push_back(Box::new(stream::iter(
113 mem::take(&mut self.values_buffer)
114 .into_iter()
115 .map(|(value, (bitmap, _))| Ok((value, bitmap))),
116 )));
117 for (_, reader) in readers {
118 tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?);
119 }
120
121 while tree_nodes.len() >= 2 {
122 let stream1 = tree_nodes.pop_front().unwrap();
124 let stream2 = tree_nodes.pop_front().unwrap();
125 let merged_stream = MergeSortedStream::merge(stream1, stream2);
126 tree_nodes.push_back(merged_stream);
127 }
128
129 Ok(SortOutput {
130 segment_null_bitmap: mem::take(&mut self.segment_null_bitmap),
131 sorted_stream: tree_nodes.pop_front().unwrap(),
132 total_row_count: self.total_row_count,
133 })
134 }
135}
136
137impl ExternalSorter {
138 pub fn new(
140 index_name: String,
141 temp_file_provider: Arc<dyn ExternalTempFileProvider>,
142 segment_row_count: NonZeroUsize,
143 current_memory_usage_threshold: Option<usize>,
144 global_memory_usage: Arc<AtomicUsize>,
145 global_memory_usage_sort_limit: Option<usize>,
146 ) -> Self {
147 Self {
148 index_name,
149 temp_file_provider,
150
151 segment_null_bitmap: Bitmap::new_bitvec(), values_buffer: BTreeMap::new(),
153
154 total_row_count: 0,
155 segment_row_count,
156
157 current_memory_usage: 0,
158 current_memory_usage_threshold,
159 global_memory_usage,
160 global_memory_usage_sort_limit,
161 }
162 }
163
164 pub fn factory(
166 temp_file_provider: Arc<dyn ExternalTempFileProvider>,
167 current_memory_usage_threshold: Option<usize>,
168 global_memory_usage: Arc<AtomicUsize>,
169 global_memory_usage_sort_limit: Option<usize>,
170 ) -> SorterFactory {
171 Box::new(move |index_name, segment_row_count| {
172 Box::new(Self::new(
173 index_name,
174 temp_file_provider.clone(),
175 segment_row_count,
176 current_memory_usage_threshold,
177 global_memory_usage.clone(),
178 global_memory_usage_sort_limit,
179 ))
180 })
181 }
182
183 fn push_not_null(
187 &mut self,
188 value: BytesRef<'_>,
189 segment_index_range: RangeInclusive<usize>,
190 ) -> usize {
191 match self.values_buffer.get_mut(value) {
192 Some((bitmap, mem_usage)) => {
193 bitmap.insert_range(segment_index_range);
194 let new_usage = bitmap.memory_usage() + value.len();
195 let diff = new_usage - *mem_usage;
196 *mem_usage = new_usage;
197
198 diff
199 }
200 None => {
201 let mut bitmap = Bitmap::new_roaring();
202 bitmap.insert_range(segment_index_range);
203
204 let mem_usage = bitmap.memory_usage() + value.len();
205 self.values_buffer
206 .insert(value.to_vec(), (bitmap, mem_usage));
207
208 mem_usage
209 }
210 }
211 }
212
213 async fn may_dump_buffer(&mut self, memory_diff: usize) -> Result<()> {
215 self.current_memory_usage += memory_diff;
216 let memory_usage = self.current_memory_usage;
217 self.global_memory_usage
218 .fetch_add(memory_diff, Ordering::Relaxed);
219
220 if self.global_memory_usage_sort_limit.is_none() {
221 return Ok(());
222 }
223
224 if self.global_memory_usage.load(Ordering::Relaxed)
225 < self.global_memory_usage_sort_limit.unwrap()
226 {
227 return Ok(());
228 }
229
230 if let Some(current_threshold) = self.current_memory_usage_threshold {
231 if memory_usage < current_threshold {
232 return Ok(());
233 }
234 }
235
236 let file_id = &format!("{:012}", self.total_row_count);
237 let index_name = &self.index_name;
238 let writer = self
239 .temp_file_provider
240 .create(index_name, file_id)
241 .await
242 .context(IntermediateSnafu)?;
243
244 let values = mem::take(&mut self.values_buffer);
245 self.global_memory_usage
246 .fetch_sub(memory_usage, Ordering::Relaxed);
247 self.current_memory_usage = 0;
248
249 let entries = values.len();
250 IntermediateWriter::new(writer).write_all(values.into_iter().map(|(k, (b, _))| (k, b))).await.inspect(|_|
251 debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
252 ).inspect_err(|e|
253 error!(e; "Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}")
254 )
255 }
256
257 fn segment_index_range(&self, n: usize) -> RangeInclusive<usize> {
260 let row_begin = self.total_row_count;
261 let start = self.segment_index(row_begin);
262 let end = self.segment_index(row_begin + n - 1);
263 start..=end
264 }
265
266 fn segment_index(&self, row_index: usize) -> usize {
268 row_index / self.segment_row_count
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::collections::HashMap;
275 use std::iter;
276 use std::sync::Mutex;
277
278 use futures::{AsyncRead, StreamExt};
279 use rand::Rng;
280 use tokio::io::duplex;
281 use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
282
283 use super::*;
284 use crate::external_provider::MockExternalTempFileProvider;
285
286 async fn test_external_sorter(
287 current_memory_usage_threshold: Option<usize>,
288 global_memory_usage_sort_limit: Option<usize>,
289 segment_row_count: usize,
290 row_count: usize,
291 batch_push: bool,
292 ) {
293 let mut mock_provider = MockExternalTempFileProvider::new();
294
295 let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
296 Arc::new(Mutex::new(HashMap::new()));
297
298 mock_provider.expect_create().returning({
299 let files = Arc::clone(&mock_files);
300 move |index_name, file_id| {
301 assert_eq!(index_name, "test");
302 let mut files = files.lock().unwrap();
303 let (writer, reader) = duplex(1024 * 1024);
304 files.insert(file_id.to_string(), Box::new(reader.compat()));
305 Ok(Box::new(writer.compat_write()))
306 }
307 });
308
309 mock_provider.expect_read_all().returning({
310 let files = Arc::clone(&mock_files);
311 move |index_name| {
312 assert_eq!(index_name, "test");
313 let mut files = files.lock().unwrap();
314 Ok(files.drain().collect::<Vec<_>>())
315 }
316 });
317
318 let mut sorter = ExternalSorter::new(
319 "test".to_owned(),
320 Arc::new(mock_provider),
321 NonZeroUsize::new(segment_row_count).unwrap(),
322 current_memory_usage_threshold,
323 Arc::new(AtomicUsize::new(0)),
324 global_memory_usage_sort_limit,
325 );
326
327 let mut sorted_result = if batch_push {
328 let (dic_values, sorted_result) =
329 dictionary_values_and_sorted_result(row_count, segment_row_count);
330
331 for (value, n) in dic_values {
332 sorter.push_n(value.as_deref(), n).await.unwrap();
333 }
334
335 sorted_result
336 } else {
337 let (mock_values, sorted_result) =
338 shuffle_values_and_sorted_result(row_count, segment_row_count);
339
340 for value in mock_values {
341 sorter.push(value.as_deref()).await.unwrap();
342 }
343
344 sorted_result
345 };
346
347 let SortOutput {
348 segment_null_bitmap,
349 mut sorted_stream,
350 total_row_count,
351 } = sorter.output().await.unwrap();
352 assert_eq!(total_row_count, row_count);
353 let n = sorted_result.remove(&None);
354 assert_eq!(
355 segment_null_bitmap.iter_ones().collect::<Vec<_>>(),
356 n.unwrap_or_default()
357 );
358 for (value, offsets) in sorted_result {
359 let item = sorted_stream.next().await.unwrap().unwrap();
360 assert_eq!(item.0, value.unwrap());
361 assert_eq!(item.1.iter_ones().collect::<Vec<_>>(), offsets);
362 }
363 }
364
365 #[tokio::test]
366 async fn test_external_sorter_pure_in_memory() {
367 let current_memory_usage_threshold = None;
368 let global_memory_usage_sort_limit = None;
369 let total_row_count_cases = vec![0, 100, 1000, 10000];
370 let segment_row_count_cases = vec![1, 10, 100, 1000];
371 let batch_push_cases = vec![false, true];
372
373 for total_row_count in total_row_count_cases {
374 for segment_row_count in &segment_row_count_cases {
375 for batch_push in &batch_push_cases {
376 test_external_sorter(
377 current_memory_usage_threshold,
378 global_memory_usage_sort_limit,
379 *segment_row_count,
380 total_row_count,
381 *batch_push,
382 )
383 .await;
384 }
385 }
386 }
387 }
388
389 #[tokio::test]
390 async fn test_external_sorter_pure_external() {
391 let current_memory_usage_threshold = None;
392 let global_memory_usage_sort_limit = Some(0);
393 let total_row_count_cases = vec![0, 100, 1000, 10000];
394 let segment_row_count_cases = vec![1, 10, 100, 1000];
395 let batch_push_cases = vec![false, true];
396
397 for total_row_count in total_row_count_cases {
398 for segment_row_count in &segment_row_count_cases {
399 for batch_push in &batch_push_cases {
400 test_external_sorter(
401 current_memory_usage_threshold,
402 global_memory_usage_sort_limit,
403 *segment_row_count,
404 total_row_count,
405 *batch_push,
406 )
407 .await;
408 }
409 }
410 }
411 }
412
413 #[tokio::test]
414 async fn test_external_sorter_mixed() {
415 let current_memory_usage_threshold = vec![None, Some(2048)];
416 let global_memory_usage_sort_limit = Some(1024);
417 let total_row_count_cases = vec![0, 100, 1000, 10000];
418 let segment_row_count_cases = vec![1, 10, 100, 1000];
419 let batch_push_cases = vec![false, true];
420
421 for total_row_count in total_row_count_cases {
422 for segment_row_count in &segment_row_count_cases {
423 for batch_push in &batch_push_cases {
424 for current_memory_usage_threshold in ¤t_memory_usage_threshold {
425 test_external_sorter(
426 *current_memory_usage_threshold,
427 global_memory_usage_sort_limit,
428 *segment_row_count,
429 total_row_count,
430 *batch_push,
431 )
432 .await;
433 }
434 }
435 }
436 }
437 }
438
439 fn random_option_bytes(size: usize) -> Option<Vec<u8>> {
440 let mut rng = rand::rng();
441
442 if rng.random() {
443 let mut buffer = vec![0u8; size];
444 rng.fill(&mut buffer[..]);
445 Some(buffer)
446 } else {
447 None
448 }
449 }
450
451 type Values = Vec<Option<Bytes>>;
452 type DictionaryValues = Vec<(Option<Bytes>, usize)>;
453 type ValueSegIds = BTreeMap<Option<Bytes>, Vec<usize>>;
454
455 fn shuffle_values_and_sorted_result(
456 row_count: usize,
457 segment_row_count: usize,
458 ) -> (Values, ValueSegIds) {
459 let mock_values = iter::repeat_with(|| random_option_bytes(100))
460 .take(row_count)
461 .collect::<Vec<_>>();
462
463 let sorted_result = sorted_result(&mock_values, segment_row_count);
464 (mock_values, sorted_result)
465 }
466
467 fn dictionary_values_and_sorted_result(
468 row_count: usize,
469 segment_row_count: usize,
470 ) -> (DictionaryValues, ValueSegIds) {
471 let mut n = row_count;
472 let mut rng = rand::rng();
473 let mut dic_values = Vec::new();
474
475 while n > 0 {
476 let size = rng.random_range(1..=n);
477 let value = random_option_bytes(100);
478 dic_values.push((value, size));
479 n -= size;
480 }
481
482 let mock_values = dic_values
483 .iter()
484 .flat_map(|(value, size)| std::iter::repeat_n(value.clone(), *size))
485 .collect::<Vec<_>>();
486
487 let sorted_result = sorted_result(&mock_values, segment_row_count);
488 (dic_values, sorted_result)
489 }
490
491 fn sorted_result(values: &Values, segment_row_count: usize) -> ValueSegIds {
492 let mut sorted_result = BTreeMap::new();
493 for (row_index, value) in values.iter().enumerate() {
494 let to_add_segment_index = row_index / segment_row_count;
495 let indices = sorted_result.entry(value.clone()).or_insert_with(Vec::new);
496
497 if indices.last() != Some(&to_add_segment_index) {
498 indices.push(to_add_segment_index);
499 }
500 }
501
502 sorted_result
503 }
504}