index/inverted_index/create/sort/
external_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::mem;
17use std::num::NonZeroUsize;
18use std::ops::RangeInclusive;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use common_telemetry::{debug, error};
24use futures::stream;
25use snafu::ResultExt;
26
27use crate::bitmap::Bitmap;
28use crate::external_provider::ExternalTempFileProvider;
29use crate::inverted_index::create::sort::intermediate_rw::{
30    IntermediateReader, IntermediateWriter,
31};
32use crate::inverted_index::create::sort::merge_stream::MergeSortedStream;
33use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter};
34use crate::inverted_index::create::sort_create::SorterFactory;
35use crate::inverted_index::error::{IntermediateSnafu, Result};
36use crate::{Bytes, BytesRef};
37
38/// `ExternalSorter` manages the sorting of data using both in-memory structures and external files.
39/// It dumps data to external files when the in-memory buffer crosses a certain memory threshold.
40pub struct ExternalSorter {
41    /// The index name associated with the sorting operation
42    index_name: String,
43
44    /// Manages creation and access to external temporary files
45    temp_file_provider: Arc<dyn ExternalTempFileProvider>,
46
47    /// Bitmap indicating which segments have null values
48    segment_null_bitmap: Bitmap,
49
50    /// In-memory buffer to hold values and their corresponding bitmaps until memory threshold is exceeded
51    values_buffer: BTreeMap<Bytes, (Bitmap, usize)>,
52
53    /// Count of all rows ingested so far
54    total_row_count: usize,
55
56    /// The number of rows per group for bitmap indexing which determines how rows are
57    /// batched for indexing. It is used to determine which segment a row belongs to.
58    segment_row_count: NonZeroUsize,
59
60    /// Tracks memory usage of the buffer
61    current_memory_usage: usize,
62
63    /// The threshold of current memory usage below which the buffer is not dumped, even if the global memory
64    /// usage exceeds `global_memory_usage_sort_limit`. This allows for smaller buffers to remain in memory,
65    /// providing a buffer against unnecessary dumps to external files, which can be costly in terms of performance.
66    /// `None` indicates that only the global memory usage threshold is considered for dumping the buffer.
67    current_memory_usage_threshold: Option<usize>,
68
69    /// Tracks the global memory usage of all sorters
70    global_memory_usage: Arc<AtomicUsize>,
71
72    /// The memory usage limit that, when exceeded by the global memory consumption of all sorters, necessitates
73    /// a reassessment of buffer retention. Surpassing this limit signals that there is a high overall memory pressure,
74    /// potentially requiring buffer dumping to external storage for memory relief.
75    /// `None` value indicates that no specific global memory usage threshold is established for triggering buffer dumps.
76    global_memory_usage_sort_limit: Option<usize>,
77}
78
79#[async_trait]
80impl Sorter for ExternalSorter {
81    /// Pushes n identical values into the sorter, adding them to the in-memory buffer and dumping
82    /// the buffer to an external file if necessary
83    async fn push_n(&mut self, value: Option<BytesRef<'_>>, n: usize) -> Result<()> {
84        if n == 0 {
85            return Ok(());
86        }
87
88        let segment_index_range = self.segment_index_range(n);
89        self.total_row_count += n;
90
91        if let Some(value) = value {
92            let memory_diff = self.push_not_null(value, segment_index_range);
93            self.may_dump_buffer(memory_diff).await
94        } else {
95            self.segment_null_bitmap.insert_range(segment_index_range);
96            Ok(())
97        }
98    }
99
100    /// Finalizes the sorting operation, merging data from both in-memory buffer and external files
101    /// into a sorted stream
102    async fn output(&mut self) -> Result<SortOutput> {
103        let readers = self
104            .temp_file_provider
105            .read_all(&self.index_name)
106            .await
107            .context(IntermediateSnafu)?;
108
109        // TODO(zhongzc): k-way merge instead of 2-way merge
110
111        let mut tree_nodes: VecDeque<SortedStream> = VecDeque::with_capacity(readers.len() + 1);
112        tree_nodes.push_back(Box::new(stream::iter(
113            mem::take(&mut self.values_buffer)
114                .into_iter()
115                .map(|(value, (bitmap, _))| Ok((value, bitmap))),
116        )));
117        for (_, reader) in readers {
118            tree_nodes.push_back(IntermediateReader::new(reader).into_stream().await?);
119        }
120
121        while tree_nodes.len() >= 2 {
122            // every turn, the length of tree_nodes will be reduced by 1 until only one stream left
123            let stream1 = tree_nodes.pop_front().unwrap();
124            let stream2 = tree_nodes.pop_front().unwrap();
125            let merged_stream = MergeSortedStream::merge(stream1, stream2);
126            tree_nodes.push_back(merged_stream);
127        }
128
129        Ok(SortOutput {
130            segment_null_bitmap: mem::take(&mut self.segment_null_bitmap),
131            sorted_stream: tree_nodes.pop_front().unwrap(),
132            total_row_count: self.total_row_count,
133        })
134    }
135}
136
137impl ExternalSorter {
138    /// Constructs a new `ExternalSorter`
139    pub fn new(
140        index_name: String,
141        temp_file_provider: Arc<dyn ExternalTempFileProvider>,
142        segment_row_count: NonZeroUsize,
143        current_memory_usage_threshold: Option<usize>,
144        global_memory_usage: Arc<AtomicUsize>,
145        global_memory_usage_sort_limit: Option<usize>,
146    ) -> Self {
147        Self {
148            index_name,
149            temp_file_provider,
150
151            segment_null_bitmap: Bitmap::new_bitvec(), // bitvec is more efficient for many null values
152            values_buffer: BTreeMap::new(),
153
154            total_row_count: 0,
155            segment_row_count,
156
157            current_memory_usage: 0,
158            current_memory_usage_threshold,
159            global_memory_usage,
160            global_memory_usage_sort_limit,
161        }
162    }
163
164    /// Generates a factory function that creates new `ExternalSorter` instances
165    pub fn factory(
166        temp_file_provider: Arc<dyn ExternalTempFileProvider>,
167        current_memory_usage_threshold: Option<usize>,
168        global_memory_usage: Arc<AtomicUsize>,
169        global_memory_usage_sort_limit: Option<usize>,
170    ) -> SorterFactory {
171        Box::new(move |index_name, segment_row_count| {
172            Box::new(Self::new(
173                index_name,
174                temp_file_provider.clone(),
175                segment_row_count,
176                current_memory_usage_threshold,
177                global_memory_usage.clone(),
178                global_memory_usage_sort_limit,
179            ))
180        })
181    }
182
183    /// Pushes the non-null values to the values buffer and sets the bits within
184    /// the specified range in the given bitmap to true.
185    /// Returns the memory usage difference of the buffer after the operation.
186    fn push_not_null(
187        &mut self,
188        value: BytesRef<'_>,
189        segment_index_range: RangeInclusive<usize>,
190    ) -> usize {
191        match self.values_buffer.get_mut(value) {
192            Some((bitmap, mem_usage)) => {
193                bitmap.insert_range(segment_index_range);
194                let new_usage = bitmap.memory_usage() + value.len();
195                let diff = new_usage - *mem_usage;
196                *mem_usage = new_usage;
197
198                diff
199            }
200            None => {
201                let mut bitmap = Bitmap::new_roaring();
202                bitmap.insert_range(segment_index_range);
203
204                let mem_usage = bitmap.memory_usage() + value.len();
205                self.values_buffer
206                    .insert(value.to_vec(), (bitmap, mem_usage));
207
208                mem_usage
209            }
210        }
211    }
212
213    /// Checks if the in-memory buffer exceeds the threshold and offloads it to external storage if necessary
214    async fn may_dump_buffer(&mut self, memory_diff: usize) -> Result<()> {
215        self.current_memory_usage += memory_diff;
216        let memory_usage = self.current_memory_usage;
217        self.global_memory_usage
218            .fetch_add(memory_diff, Ordering::Relaxed);
219
220        if self.global_memory_usage_sort_limit.is_none() {
221            return Ok(());
222        }
223
224        if self.global_memory_usage.load(Ordering::Relaxed)
225            < self.global_memory_usage_sort_limit.unwrap()
226        {
227            return Ok(());
228        }
229
230        if let Some(current_threshold) = self.current_memory_usage_threshold {
231            if memory_usage < current_threshold {
232                return Ok(());
233            }
234        }
235
236        let file_id = &format!("{:012}", self.total_row_count);
237        let index_name = &self.index_name;
238        let writer = self
239            .temp_file_provider
240            .create(index_name, file_id)
241            .await
242            .context(IntermediateSnafu)?;
243
244        let values = mem::take(&mut self.values_buffer);
245        self.global_memory_usage
246            .fetch_sub(memory_usage, Ordering::Relaxed);
247        self.current_memory_usage = 0;
248
249        let entries = values.len();
250        IntermediateWriter::new(writer).write_all(values.into_iter().map(|(k, (b, _))| (k, b))).await.inspect(|_|
251            debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
252        ).inspect_err(|e|
253            error!(e; "Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}")
254        )
255    }
256
257    /// Determines the segment index range for the row index range
258    /// `[row_begin, row_begin + n - 1]`
259    fn segment_index_range(&self, n: usize) -> RangeInclusive<usize> {
260        let row_begin = self.total_row_count;
261        let start = self.segment_index(row_begin);
262        let end = self.segment_index(row_begin + n - 1);
263        start..=end
264    }
265
266    /// Determines the segment index for the given row index
267    fn segment_index(&self, row_index: usize) -> usize {
268        row_index / self.segment_row_count
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::collections::HashMap;
275    use std::iter;
276    use std::sync::Mutex;
277
278    use futures::{AsyncRead, StreamExt};
279    use rand::Rng;
280    use tokio::io::duplex;
281    use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
282
283    use super::*;
284    use crate::external_provider::MockExternalTempFileProvider;
285
286    async fn test_external_sorter(
287        current_memory_usage_threshold: Option<usize>,
288        global_memory_usage_sort_limit: Option<usize>,
289        segment_row_count: usize,
290        row_count: usize,
291        batch_push: bool,
292    ) {
293        let mut mock_provider = MockExternalTempFileProvider::new();
294
295        let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
296            Arc::new(Mutex::new(HashMap::new()));
297
298        mock_provider.expect_create().returning({
299            let files = Arc::clone(&mock_files);
300            move |index_name, file_id| {
301                assert_eq!(index_name, "test");
302                let mut files = files.lock().unwrap();
303                let (writer, reader) = duplex(1024 * 1024);
304                files.insert(file_id.to_string(), Box::new(reader.compat()));
305                Ok(Box::new(writer.compat_write()))
306            }
307        });
308
309        mock_provider.expect_read_all().returning({
310            let files = Arc::clone(&mock_files);
311            move |index_name| {
312                assert_eq!(index_name, "test");
313                let mut files = files.lock().unwrap();
314                Ok(files.drain().collect::<Vec<_>>())
315            }
316        });
317
318        let mut sorter = ExternalSorter::new(
319            "test".to_owned(),
320            Arc::new(mock_provider),
321            NonZeroUsize::new(segment_row_count).unwrap(),
322            current_memory_usage_threshold,
323            Arc::new(AtomicUsize::new(0)),
324            global_memory_usage_sort_limit,
325        );
326
327        let mut sorted_result = if batch_push {
328            let (dic_values, sorted_result) =
329                dictionary_values_and_sorted_result(row_count, segment_row_count);
330
331            for (value, n) in dic_values {
332                sorter.push_n(value.as_deref(), n).await.unwrap();
333            }
334
335            sorted_result
336        } else {
337            let (mock_values, sorted_result) =
338                shuffle_values_and_sorted_result(row_count, segment_row_count);
339
340            for value in mock_values {
341                sorter.push(value.as_deref()).await.unwrap();
342            }
343
344            sorted_result
345        };
346
347        let SortOutput {
348            segment_null_bitmap,
349            mut sorted_stream,
350            total_row_count,
351        } = sorter.output().await.unwrap();
352        assert_eq!(total_row_count, row_count);
353        let n = sorted_result.remove(&None);
354        assert_eq!(
355            segment_null_bitmap.iter_ones().collect::<Vec<_>>(),
356            n.unwrap_or_default()
357        );
358        for (value, offsets) in sorted_result {
359            let item = sorted_stream.next().await.unwrap().unwrap();
360            assert_eq!(item.0, value.unwrap());
361            assert_eq!(item.1.iter_ones().collect::<Vec<_>>(), offsets);
362        }
363    }
364
365    #[tokio::test]
366    async fn test_external_sorter_pure_in_memory() {
367        let current_memory_usage_threshold = None;
368        let global_memory_usage_sort_limit = None;
369        let total_row_count_cases = vec![0, 100, 1000, 10000];
370        let segment_row_count_cases = vec![1, 10, 100, 1000];
371        let batch_push_cases = vec![false, true];
372
373        for total_row_count in total_row_count_cases {
374            for segment_row_count in &segment_row_count_cases {
375                for batch_push in &batch_push_cases {
376                    test_external_sorter(
377                        current_memory_usage_threshold,
378                        global_memory_usage_sort_limit,
379                        *segment_row_count,
380                        total_row_count,
381                        *batch_push,
382                    )
383                    .await;
384                }
385            }
386        }
387    }
388
389    #[tokio::test]
390    async fn test_external_sorter_pure_external() {
391        let current_memory_usage_threshold = None;
392        let global_memory_usage_sort_limit = Some(0);
393        let total_row_count_cases = vec![0, 100, 1000, 10000];
394        let segment_row_count_cases = vec![1, 10, 100, 1000];
395        let batch_push_cases = vec![false, true];
396
397        for total_row_count in total_row_count_cases {
398            for segment_row_count in &segment_row_count_cases {
399                for batch_push in &batch_push_cases {
400                    test_external_sorter(
401                        current_memory_usage_threshold,
402                        global_memory_usage_sort_limit,
403                        *segment_row_count,
404                        total_row_count,
405                        *batch_push,
406                    )
407                    .await;
408                }
409            }
410        }
411    }
412
413    #[tokio::test]
414    async fn test_external_sorter_mixed() {
415        let current_memory_usage_threshold = vec![None, Some(2048)];
416        let global_memory_usage_sort_limit = Some(1024);
417        let total_row_count_cases = vec![0, 100, 1000, 10000];
418        let segment_row_count_cases = vec![1, 10, 100, 1000];
419        let batch_push_cases = vec![false, true];
420
421        for total_row_count in total_row_count_cases {
422            for segment_row_count in &segment_row_count_cases {
423                for batch_push in &batch_push_cases {
424                    for current_memory_usage_threshold in &current_memory_usage_threshold {
425                        test_external_sorter(
426                            *current_memory_usage_threshold,
427                            global_memory_usage_sort_limit,
428                            *segment_row_count,
429                            total_row_count,
430                            *batch_push,
431                        )
432                        .await;
433                    }
434                }
435            }
436        }
437    }
438
439    fn random_option_bytes(size: usize) -> Option<Vec<u8>> {
440        let mut rng = rand::rng();
441
442        if rng.random() {
443            let mut buffer = vec![0u8; size];
444            rng.fill(&mut buffer[..]);
445            Some(buffer)
446        } else {
447            None
448        }
449    }
450
451    type Values = Vec<Option<Bytes>>;
452    type DictionaryValues = Vec<(Option<Bytes>, usize)>;
453    type ValueSegIds = BTreeMap<Option<Bytes>, Vec<usize>>;
454
455    fn shuffle_values_and_sorted_result(
456        row_count: usize,
457        segment_row_count: usize,
458    ) -> (Values, ValueSegIds) {
459        let mock_values = iter::repeat_with(|| random_option_bytes(100))
460            .take(row_count)
461            .collect::<Vec<_>>();
462
463        let sorted_result = sorted_result(&mock_values, segment_row_count);
464        (mock_values, sorted_result)
465    }
466
467    fn dictionary_values_and_sorted_result(
468        row_count: usize,
469        segment_row_count: usize,
470    ) -> (DictionaryValues, ValueSegIds) {
471        let mut n = row_count;
472        let mut rng = rand::rng();
473        let mut dic_values = Vec::new();
474
475        while n > 0 {
476            let size = rng.random_range(1..=n);
477            let value = random_option_bytes(100);
478            dic_values.push((value, size));
479            n -= size;
480        }
481
482        let mock_values = dic_values
483            .iter()
484            .flat_map(|(value, size)| std::iter::repeat_n(value.clone(), *size))
485            .collect::<Vec<_>>();
486
487        let sorted_result = sorted_result(&mock_values, segment_row_count);
488        (dic_values, sorted_result)
489    }
490
491    fn sorted_result(values: &Values, segment_row_count: usize) -> ValueSegIds {
492        let mut sorted_result = BTreeMap::new();
493        for (row_index, value) in values.iter().enumerate() {
494            let to_add_segment_index = row_index / segment_row_count;
495            let indices = sorted_result.entry(value.clone()).or_insert_with(Vec::new);
496
497            if indices.last() != Some(&to_add_segment_index) {
498                indices.push(to_add_segment_index);
499            }
500        }
501
502        sorted_result
503    }
504}