1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, trace};
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use strum::IntoStaticStr;
26use tokio::sync::{mpsc, watch};
27
28use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType};
29use crate::cache::CacheManagerRef;
30use crate::config::MitoConfig;
31use crate::error::{
32 Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
33};
34use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
35use crate::memtable::MemtableRanges;
36use crate::metrics::{
37 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
38 INFLIGHT_FLUSH_COUNT,
39};
40use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
41use crate::read::merge::MergeReaderBuilder;
42use crate::read::scan_region::PredicateGroup;
43use crate::read::Source;
44use crate::region::options::{IndexOptions, MergeMode};
45use crate::region::version::{VersionControlData, VersionControlRef};
46use crate::region::{ManifestContextRef, RegionLeaderState};
47use crate::request::{
48 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
49 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
50};
51use crate::schedule::scheduler::{Job, SchedulerRef};
52use crate::sst::file::FileMeta;
53use crate::sst::parquet::WriteOptions;
54use crate::worker::WorkerListener;
55
56pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
60 fn should_flush_engine(&self) -> bool;
62
63 fn should_stall(&self) -> bool;
65
66 fn reserve_mem(&self, mem: usize);
68
69 fn schedule_free_mem(&self, mem: usize);
74
75 fn free_mem(&self, mem: usize);
77
78 fn memory_usage(&self) -> usize;
80}
81
82pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
83
84#[derive(Debug)]
89pub struct WriteBufferManagerImpl {
90 global_write_buffer_size: usize,
92 mutable_limit: usize,
94 memory_used: AtomicUsize,
96 memory_active: AtomicUsize,
98 notifier: Option<watch::Sender<()>>,
101}
102
103impl WriteBufferManagerImpl {
104 pub fn new(global_write_buffer_size: usize) -> Self {
106 Self {
107 global_write_buffer_size,
108 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
109 memory_used: AtomicUsize::new(0),
110 memory_active: AtomicUsize::new(0),
111 notifier: None,
112 }
113 }
114
115 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
117 self.notifier = Some(notifier);
118 self
119 }
120
121 pub fn mutable_usage(&self) -> usize {
123 self.memory_active.load(Ordering::Relaxed)
124 }
125
126 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
128 global_write_buffer_size / 2
130 }
131}
132
133impl WriteBufferManager for WriteBufferManagerImpl {
134 fn should_flush_engine(&self) -> bool {
135 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
136 if mutable_memtable_memory_usage > self.mutable_limit {
137 debug!(
138 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
139 mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
140 );
141 return true;
142 }
143
144 let memory_usage = self.memory_used.load(Ordering::Relaxed);
145 if memory_usage >= self.global_write_buffer_size {
149 if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
150 debug!(
151 "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
152 mutable_usage: {}.",
153 memory_usage,
154 self.global_write_buffer_size,
155 mutable_memtable_memory_usage);
156 return true;
157 } else {
158 trace!(
159 "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
160 memory_usage,
161 self.global_write_buffer_size,
162 mutable_memtable_memory_usage);
163 }
164 }
165
166 false
167 }
168
169 fn should_stall(&self) -> bool {
170 self.memory_usage() >= self.global_write_buffer_size
171 }
172
173 fn reserve_mem(&self, mem: usize) {
174 self.memory_used.fetch_add(mem, Ordering::Relaxed);
175 self.memory_active.fetch_add(mem, Ordering::Relaxed);
176 }
177
178 fn schedule_free_mem(&self, mem: usize) {
179 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
180 }
181
182 fn free_mem(&self, mem: usize) {
183 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
184 if let Some(notifier) = &self.notifier {
185 let _ = notifier.send(());
189 }
190 }
191
192 fn memory_usage(&self) -> usize {
193 self.memory_used.load(Ordering::Relaxed)
194 }
195}
196
197#[derive(Debug, IntoStaticStr)]
199pub enum FlushReason {
200 Others,
202 EngineFull,
204 Manual,
206 Alter,
208 Periodically,
210 Downgrading,
212}
213
214impl FlushReason {
215 fn as_str(&self) -> &'static str {
217 self.into()
218 }
219}
220
221pub(crate) struct RegionFlushTask {
223 pub(crate) region_id: RegionId,
225 pub(crate) reason: FlushReason,
227 pub(crate) senders: Vec<OutputTx>,
229 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
231
232 pub(crate) access_layer: AccessLayerRef,
233 pub(crate) listener: WorkerListener,
234 pub(crate) engine_config: Arc<MitoConfig>,
235 pub(crate) row_group_size: Option<usize>,
236 pub(crate) cache_manager: CacheManagerRef,
237 pub(crate) manifest_ctx: ManifestContextRef,
238
239 pub(crate) index_options: IndexOptions,
241}
242
243impl RegionFlushTask {
244 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
246 if let Some(sender) = sender.take_inner() {
247 self.senders.push(sender);
248 }
249 }
250
251 fn on_success(self) {
253 for sender in self.senders {
254 sender.send(Ok(0));
255 }
256 }
257
258 fn on_failure(&mut self, err: Arc<Error>) {
260 for sender in self.senders.drain(..) {
261 sender.send(Err(err.clone()).context(FlushRegionSnafu {
262 region_id: self.region_id,
263 }));
264 }
265 }
266
267 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
271 let version_data = version_control.current();
274
275 Box::pin(async move {
276 INFLIGHT_FLUSH_COUNT.inc();
277 self.do_flush(version_data).await;
278 INFLIGHT_FLUSH_COUNT.dec();
279 })
280 }
281
282 async fn do_flush(&mut self, version_data: VersionControlData) {
284 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
285 self.listener.on_flush_begin(self.region_id).await;
286
287 let worker_request = match self.flush_memtables(&version_data).await {
288 Ok(edit) => {
289 let memtables_to_remove = version_data
290 .version
291 .memtables
292 .immutables()
293 .iter()
294 .map(|m| m.id())
295 .collect();
296 let flush_finished = FlushFinished {
297 region_id: self.region_id,
298 flushed_entry_id: version_data.last_entry_id,
300 senders: std::mem::take(&mut self.senders),
301 _timer: timer,
302 edit,
303 memtables_to_remove,
304 };
305 WorkerRequest::Background {
306 region_id: self.region_id,
307 notify: BackgroundNotify::FlushFinished(flush_finished),
308 }
309 }
310 Err(e) => {
311 error!(e; "Failed to flush region {}", self.region_id);
312 timer.stop_and_discard();
314
315 let err = Arc::new(e);
316 self.on_failure(err.clone());
317 WorkerRequest::Background {
318 region_id: self.region_id,
319 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
320 }
321 }
322 };
323 self.send_worker_request(worker_request).await;
324 }
325
326 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
329 let version = &version_data.version;
332 let timer = FLUSH_ELAPSED
333 .with_label_values(&["flush_memtables"])
334 .start_timer();
335
336 let mut write_opts = WriteOptions {
337 write_buffer_size: self.engine_config.sst_write_buffer_size,
338 ..Default::default()
339 };
340 if let Some(row_group_size) = self.row_group_size {
341 write_opts.row_group_size = row_group_size;
342 }
343
344 let memtables = version.memtables.immutables();
345 let mut file_metas = Vec::with_capacity(memtables.len());
346 let mut flushed_bytes = 0;
347 let mut series_count = 0;
348 let mut flush_metrics = Metrics::new(WriteType::Flush);
349 for mem in memtables {
350 if mem.is_empty() {
351 continue;
353 }
354
355 let MemtableRanges { ranges, stats } =
356 mem.ranges(None, PredicateGroup::default(), None)?;
357
358 let max_sequence = stats.max_sequence();
359 series_count += stats.series_count();
360
361 let source = if ranges.len() == 1 {
362 let only_range = ranges.into_values().next().unwrap();
363 let iter = only_range.build_iter()?;
364 Source::Iter(iter)
365 } else {
366 let sources = ranges
368 .into_values()
369 .map(|r| r.build_iter().map(Source::Iter))
370 .collect::<Result<Vec<_>>>()?;
371 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
372 let maybe_dedup = if version.options.append_mode {
373 Box::new(merge_reader) as _
375 } else {
376 match version.options.merge_mode.unwrap_or(MergeMode::LastRow) {
378 MergeMode::LastRow => {
379 Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
380 }
381 MergeMode::LastNonNull => {
382 Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
383 }
384 }
385 };
386 Source::Reader(maybe_dedup)
387 };
388
389 let write_request = SstWriteRequest {
391 op_type: OperationType::Flush,
392 metadata: version.metadata.clone(),
393 source,
394 cache_manager: self.cache_manager.clone(),
395 storage: version.options.storage.clone(),
396 max_sequence: Some(max_sequence),
397 index_options: self.index_options.clone(),
398 inverted_index_config: self.engine_config.inverted_index.clone(),
399 fulltext_index_config: self.engine_config.fulltext_index.clone(),
400 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
401 };
402
403 let (ssts_written, metrics) = self
404 .access_layer
405 .write_sst(write_request, &write_opts, WriteType::Flush)
406 .await?;
407 if ssts_written.is_empty() {
408 continue;
410 }
411 flush_metrics = flush_metrics.merge(metrics);
412
413 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
414 flushed_bytes += sst_info.file_size;
415 FileMeta {
416 region_id: self.region_id,
417 file_id: sst_info.file_id,
418 time_range: sst_info.time_range,
419 level: 0,
420 file_size: sst_info.file_size,
421 available_indexes: sst_info.index_metadata.build_available_indexes(),
422 index_file_size: sst_info.index_metadata.file_size,
423 num_rows: sst_info.num_rows as u64,
424 num_row_groups: sst_info.num_row_groups,
425 sequence: NonZeroU64::new(max_sequence),
426 }
427 }));
428 }
429
430 if !file_metas.is_empty() {
431 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
432 }
433
434 let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
435 info!(
436 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}, metrics: {:?}",
437 self.region_id,
438 self.reason.as_str(),
439 file_ids,
440 series_count,
441 timer.stop_and_record(),
442 flush_metrics,
443 );
444 flush_metrics.observe();
445
446 let edit = RegionEdit {
447 files_to_add: file_metas,
448 files_to_remove: Vec::new(),
449 compaction_time_window: None,
450 flushed_entry_id: Some(version_data.last_entry_id),
452 flushed_sequence: Some(version_data.committed_sequence),
453 };
454 info!("Applying {edit:?} to region {}", self.region_id);
455
456 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
457
458 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
459 RegionLeaderState::Downgrading
460 } else {
461 RegionLeaderState::Writable
462 };
463 let version = self
466 .manifest_ctx
467 .update_manifest(expected_state, action_list)
468 .await?;
469 info!(
470 "Successfully update manifest version to {version}, region: {}, reason: {}",
471 self.region_id,
472 self.reason.as_str()
473 );
474
475 Ok(edit)
476 }
477
478 async fn send_worker_request(&self, request: WorkerRequest) {
480 if let Err(e) = self
481 .request_sender
482 .send(WorkerRequestWithTime::new(request))
483 .await
484 {
485 error!(
486 "Failed to notify flush job status for region {}, request: {:?}",
487 self.region_id, e.0
488 );
489 }
490 }
491
492 fn merge(&mut self, mut other: RegionFlushTask) {
494 assert_eq!(self.region_id, other.region_id);
495 self.senders.append(&mut other.senders);
497 }
498}
499
500pub(crate) struct FlushScheduler {
502 region_status: HashMap<RegionId, FlushStatus>,
504 scheduler: SchedulerRef,
506}
507
508impl FlushScheduler {
509 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
511 FlushScheduler {
512 region_status: HashMap::new(),
513 scheduler,
514 }
515 }
516
517 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
519 self.region_status.contains_key(®ion_id)
520 }
521
522 pub(crate) fn schedule_flush(
524 &mut self,
525 region_id: RegionId,
526 version_control: &VersionControlRef,
527 task: RegionFlushTask,
528 ) -> Result<()> {
529 debug_assert_eq!(region_id, task.region_id);
530
531 let version = version_control.current().version;
532 if version.memtables.is_empty() {
533 debug_assert!(!self.region_status.contains_key(®ion_id));
534 task.on_success();
536 return Ok(());
537 }
538
539 FLUSH_REQUESTS_TOTAL
541 .with_label_values(&[task.reason.as_str()])
542 .inc();
543
544 let flush_status = self
546 .region_status
547 .entry(region_id)
548 .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
549 if flush_status.flushing {
551 flush_status.merge_task(task);
553 return Ok(());
554 }
555
556 if flush_status.pending_task.is_some() {
559 flush_status.merge_task(task);
560 return Ok(());
561 }
562
563 if let Err(e) = version_control.freeze_mutable() {
565 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
566
567 self.region_status.remove(®ion_id);
569 return Err(e);
570 }
571 let job = task.into_flush_job(version_control);
573 if let Err(e) = self.scheduler.schedule(job) {
574 error!(e; "Failed to schedule flush job for region {}", region_id);
577
578 self.region_status.remove(®ion_id);
580 return Err(e);
581 }
582
583 flush_status.flushing = true;
584
585 Ok(())
586 }
587
588 pub(crate) fn on_flush_success(
592 &mut self,
593 region_id: RegionId,
594 ) -> Option<(
595 Vec<SenderDdlRequest>,
596 Vec<SenderWriteRequest>,
597 Vec<SenderBulkRequest>,
598 )> {
599 let flush_status = self.region_status.get_mut(®ion_id)?;
600
601 flush_status.flushing = false;
603
604 let pending_requests = if flush_status.pending_task.is_none() {
605 let flush_status = self.region_status.remove(®ion_id).unwrap();
608 Some((
609 flush_status.pending_ddls,
610 flush_status.pending_writes,
611 flush_status.pending_bulk_writes,
612 ))
613 } else {
614 let version_data = flush_status.version_control.current();
615 if version_data.version.memtables.is_empty() {
616 let task = flush_status.pending_task.take().unwrap();
619 task.on_success();
621 let flush_status = self.region_status.remove(®ion_id).unwrap();
625 Some((
626 flush_status.pending_ddls,
627 flush_status.pending_writes,
628 flush_status.pending_bulk_writes,
629 ))
630 } else {
631 None
633 }
634 };
635
636 if let Err(e) = self.schedule_next_flush() {
638 error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
639 }
640
641 pending_requests
642 }
643
644 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
646 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
647
648 FLUSH_FAILURE_TOTAL.inc();
649
650 let Some(flush_status) = self.region_status.remove(®ion_id) else {
652 return;
653 };
654
655 flush_status.on_failure(err);
657
658 if let Err(e) = self.schedule_next_flush() {
660 error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
661 }
662 }
663
664 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
666 self.remove_region_on_failure(
667 region_id,
668 Arc::new(RegionDroppedSnafu { region_id }.build()),
669 );
670 }
671
672 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
674 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
675 }
676
677 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
679 self.remove_region_on_failure(
680 region_id,
681 Arc::new(RegionTruncatedSnafu { region_id }.build()),
682 );
683 }
684
685 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
686 let Some(flush_status) = self.region_status.remove(®ion_id) else {
688 return;
689 };
690
691 flush_status.on_failure(err);
693 }
694
695 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
700 let status = self.region_status.get_mut(&request.region_id).unwrap();
701 status.pending_ddls.push(request);
702 }
703
704 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
709 let status = self
710 .region_status
711 .get_mut(&request.request.region_id)
712 .unwrap();
713 status.pending_writes.push(request);
714 }
715
716 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
721 let status = self.region_status.get_mut(&request.region_id).unwrap();
722 status.pending_bulk_writes.push(request);
723 }
724
725 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
727 self.region_status
728 .get(®ion_id)
729 .map(|status| !status.pending_ddls.is_empty())
730 .unwrap_or(false)
731 }
732
733 pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
735 debug_assert!(self
736 .region_status
737 .values()
738 .all(|status| status.flushing || status.pending_task.is_some()));
739
740 let Some(flush_status) = self
742 .region_status
743 .values_mut()
744 .find(|status| status.pending_task.is_some())
745 else {
746 return Ok(());
747 };
748 debug_assert!(!flush_status.flushing);
749 let task = flush_status.pending_task.take().unwrap();
750 let region_id = flush_status.region_id;
751 let version_control = flush_status.version_control.clone();
752
753 self.schedule_flush(region_id, &version_control, task)
754 }
755}
756
757impl Drop for FlushScheduler {
758 fn drop(&mut self) {
759 for (region_id, flush_status) in self.region_status.drain() {
760 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
762 }
763 }
764}
765
766struct FlushStatus {
770 region_id: RegionId,
772 version_control: VersionControlRef,
774 flushing: bool,
779 pending_task: Option<RegionFlushTask>,
781 pending_ddls: Vec<SenderDdlRequest>,
783 pending_writes: Vec<SenderWriteRequest>,
785 pending_bulk_writes: Vec<SenderBulkRequest>,
787}
788
789impl FlushStatus {
790 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
791 FlushStatus {
792 region_id,
793 version_control,
794 flushing: false,
795 pending_task: None,
796 pending_ddls: Vec::new(),
797 pending_writes: Vec::new(),
798 pending_bulk_writes: Vec::new(),
799 }
800 }
801
802 fn merge_task(&mut self, task: RegionFlushTask) {
804 if let Some(pending) = &mut self.pending_task {
805 pending.merge(task);
806 } else {
807 self.pending_task = Some(task);
808 }
809 }
810
811 fn on_failure(self, err: Arc<Error>) {
812 if let Some(mut task) = self.pending_task {
813 task.on_failure(err.clone());
814 }
815 for ddl in self.pending_ddls {
816 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
817 region_id: self.region_id,
818 }));
819 }
820 for write_req in self.pending_writes {
821 write_req
822 .sender
823 .send(Err(err.clone()).context(FlushRegionSnafu {
824 region_id: self.region_id,
825 }));
826 }
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use tokio::sync::oneshot;
833
834 use super::*;
835 use crate::cache::CacheManager;
836 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
837 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
838 use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
839
840 #[test]
841 fn test_get_mutable_limit() {
842 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
843 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
844 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
845 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
846 }
847
848 #[test]
849 fn test_over_mutable_limit() {
850 let manager = WriteBufferManagerImpl::new(1000);
852 manager.reserve_mem(400);
853 assert!(!manager.should_flush_engine());
854 assert!(!manager.should_stall());
855
856 manager.reserve_mem(400);
858 assert!(manager.should_flush_engine());
859
860 manager.schedule_free_mem(400);
862 assert!(!manager.should_flush_engine());
863 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
864 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
865
866 manager.free_mem(400);
868 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
869 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
870 }
871
872 #[test]
873 fn test_over_global() {
874 let manager = WriteBufferManagerImpl::new(1000);
876 manager.reserve_mem(1100);
877 assert!(manager.should_stall());
878 manager.schedule_free_mem(200);
880 assert!(manager.should_flush_engine());
881
882 manager.schedule_free_mem(450);
884 assert!(!manager.should_flush_engine());
885
886 manager.reserve_mem(50);
888 assert!(manager.should_flush_engine());
889 manager.reserve_mem(100);
890 assert!(manager.should_flush_engine());
891 }
892
893 #[test]
894 fn test_manager_notify() {
895 let (sender, receiver) = watch::channel(());
896 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
897 manager.reserve_mem(500);
898 assert!(!receiver.has_changed().unwrap());
899 manager.schedule_free_mem(500);
900 assert!(!receiver.has_changed().unwrap());
901 manager.free_mem(500);
902 assert!(receiver.has_changed().unwrap());
903 }
904
905 #[tokio::test]
906 async fn test_schedule_empty() {
907 let env = SchedulerEnv::new().await;
908 let (tx, _rx) = mpsc::channel(4);
909 let mut scheduler = env.mock_flush_scheduler();
910 let builder = VersionControlBuilder::new();
911
912 let version_control = Arc::new(builder.build());
913 let (output_tx, output_rx) = oneshot::channel();
914 let mut task = RegionFlushTask {
915 region_id: builder.region_id(),
916 reason: FlushReason::Others,
917 senders: Vec::new(),
918 request_sender: tx,
919 access_layer: env.access_layer.clone(),
920 listener: WorkerListener::default(),
921 engine_config: Arc::new(MitoConfig::default()),
922 row_group_size: None,
923 cache_manager: Arc::new(CacheManager::default()),
924 manifest_ctx: env
925 .mock_manifest_context(version_control.current().version.metadata.clone())
926 .await,
927 index_options: IndexOptions::default(),
928 };
929 task.push_sender(OptionOutputTx::from(output_tx));
930 scheduler
931 .schedule_flush(builder.region_id(), &version_control, task)
932 .unwrap();
933 assert!(scheduler.region_status.is_empty());
934 let output = output_rx.await.unwrap().unwrap();
935 assert_eq!(output, 0);
936 assert!(scheduler.region_status.is_empty());
937 }
938
939 #[tokio::test]
940 async fn test_schedule_pending_request() {
941 let job_scheduler = Arc::new(VecScheduler::default());
942 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
943 let (tx, _rx) = mpsc::channel(4);
944 let mut scheduler = env.mock_flush_scheduler();
945 let mut builder = VersionControlBuilder::new();
946 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
948 let version_control = Arc::new(builder.build());
949 let version_data = version_control.current();
951 write_rows_to_version(&version_data.version, "host0", 0, 10);
952 let manifest_ctx = env
953 .mock_manifest_context(version_data.version.metadata.clone())
954 .await;
955 let mut tasks: Vec<_> = (0..3)
957 .map(|_| RegionFlushTask {
958 region_id: builder.region_id(),
959 reason: FlushReason::Others,
960 senders: Vec::new(),
961 request_sender: tx.clone(),
962 access_layer: env.access_layer.clone(),
963 listener: WorkerListener::default(),
964 engine_config: Arc::new(MitoConfig::default()),
965 row_group_size: None,
966 cache_manager: Arc::new(CacheManager::default()),
967 manifest_ctx: manifest_ctx.clone(),
968 index_options: IndexOptions::default(),
969 })
970 .collect();
971 let task = tasks.pop().unwrap();
973 scheduler
974 .schedule_flush(builder.region_id(), &version_control, task)
975 .unwrap();
976 assert_eq!(1, scheduler.region_status.len());
978 assert_eq!(1, job_scheduler.num_jobs());
979 let version_data = version_control.current();
981 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
982 let output_rxs: Vec<_> = tasks
984 .into_iter()
985 .map(|mut task| {
986 let (output_tx, output_rx) = oneshot::channel();
987 task.push_sender(OptionOutputTx::from(output_tx));
988 scheduler
989 .schedule_flush(builder.region_id(), &version_control, task)
990 .unwrap();
991 output_rx
992 })
993 .collect();
994 version_control.apply_edit(
996 RegionEdit {
997 files_to_add: Vec::new(),
998 files_to_remove: Vec::new(),
999 compaction_time_window: None,
1000 flushed_entry_id: None,
1001 flushed_sequence: None,
1002 },
1003 &[0],
1004 builder.file_purger(),
1005 );
1006 scheduler.on_flush_success(builder.region_id());
1007 assert_eq!(1, job_scheduler.num_jobs());
1009 assert!(scheduler.region_status.is_empty());
1011 for output_rx in output_rxs {
1012 let output = output_rx.await.unwrap().unwrap();
1013 assert_eq!(output, 0);
1014 }
1015 }
1016}