1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, trace};
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use strum::IntoStaticStr;
26use tokio::sync::{mpsc, watch};
27
28use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
29use crate::cache::CacheManagerRef;
30use crate::config::MitoConfig;
31use crate::error::{
32 Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
33};
34use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
35use crate::metrics::{
36 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL,
37 INFLIGHT_FLUSH_COUNT,
38};
39use crate::read::Source;
40use crate::region::options::IndexOptions;
41use crate::region::version::{VersionControlData, VersionControlRef};
42use crate::region::{ManifestContextRef, RegionLeaderState};
43use crate::request::{
44 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
45 SenderWriteRequest, WorkerRequest,
46};
47use crate::schedule::scheduler::{Job, SchedulerRef};
48use crate::sst::file::FileMeta;
49use crate::sst::parquet::WriteOptions;
50use crate::worker::WorkerListener;
51
52pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
56 fn should_flush_engine(&self) -> bool;
58
59 fn should_stall(&self) -> bool;
61
62 fn reserve_mem(&self, mem: usize);
64
65 fn schedule_free_mem(&self, mem: usize);
70
71 fn free_mem(&self, mem: usize);
73
74 fn memory_usage(&self) -> usize;
76}
77
78pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
79
80#[derive(Debug)]
85pub struct WriteBufferManagerImpl {
86 global_write_buffer_size: usize,
88 mutable_limit: usize,
90 memory_used: AtomicUsize,
92 memory_active: AtomicUsize,
94 notifier: Option<watch::Sender<()>>,
97}
98
99impl WriteBufferManagerImpl {
100 pub fn new(global_write_buffer_size: usize) -> Self {
102 Self {
103 global_write_buffer_size,
104 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
105 memory_used: AtomicUsize::new(0),
106 memory_active: AtomicUsize::new(0),
107 notifier: None,
108 }
109 }
110
111 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
113 self.notifier = Some(notifier);
114 self
115 }
116
117 pub fn mutable_usage(&self) -> usize {
119 self.memory_active.load(Ordering::Relaxed)
120 }
121
122 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
124 global_write_buffer_size / 2
126 }
127}
128
129impl WriteBufferManager for WriteBufferManagerImpl {
130 fn should_flush_engine(&self) -> bool {
131 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
132 if mutable_memtable_memory_usage > self.mutable_limit {
133 debug!(
134 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
135 mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
136 );
137 return true;
138 }
139
140 let memory_usage = self.memory_used.load(Ordering::Relaxed);
141 if memory_usage >= self.global_write_buffer_size {
145 if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
146 debug!(
147 "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
148 mutable_usage: {}.",
149 memory_usage,
150 self.global_write_buffer_size,
151 mutable_memtable_memory_usage);
152 return true;
153 } else {
154 trace!(
155 "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
156 memory_usage,
157 self.global_write_buffer_size,
158 mutable_memtable_memory_usage);
159 }
160 }
161
162 false
163 }
164
165 fn should_stall(&self) -> bool {
166 self.memory_usage() >= self.global_write_buffer_size
167 }
168
169 fn reserve_mem(&self, mem: usize) {
170 self.memory_used.fetch_add(mem, Ordering::Relaxed);
171 self.memory_active.fetch_add(mem, Ordering::Relaxed);
172 }
173
174 fn schedule_free_mem(&self, mem: usize) {
175 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
176 }
177
178 fn free_mem(&self, mem: usize) {
179 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
180 if let Some(notifier) = &self.notifier {
181 let _ = notifier.send(());
185 }
186 }
187
188 fn memory_usage(&self) -> usize {
189 self.memory_used.load(Ordering::Relaxed)
190 }
191}
192
193#[derive(Debug, IntoStaticStr)]
195pub enum FlushReason {
196 Others,
198 EngineFull,
200 Manual,
202 Alter,
204 Periodically,
206 Downgrading,
208}
209
210impl FlushReason {
211 fn as_str(&self) -> &'static str {
213 self.into()
214 }
215}
216
217pub(crate) struct RegionFlushTask {
219 pub(crate) region_id: RegionId,
221 pub(crate) reason: FlushReason,
223 pub(crate) senders: Vec<OutputTx>,
225 pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
227
228 pub(crate) access_layer: AccessLayerRef,
229 pub(crate) listener: WorkerListener,
230 pub(crate) engine_config: Arc<MitoConfig>,
231 pub(crate) row_group_size: Option<usize>,
232 pub(crate) cache_manager: CacheManagerRef,
233 pub(crate) manifest_ctx: ManifestContextRef,
234
235 pub(crate) index_options: IndexOptions,
237}
238
239impl RegionFlushTask {
240 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
242 if let Some(sender) = sender.take_inner() {
243 self.senders.push(sender);
244 }
245 }
246
247 fn on_success(self) {
249 for sender in self.senders {
250 sender.send(Ok(0));
251 }
252 }
253
254 fn on_failure(&mut self, err: Arc<Error>) {
256 for sender in self.senders.drain(..) {
257 sender.send(Err(err.clone()).context(FlushRegionSnafu {
258 region_id: self.region_id,
259 }));
260 }
261 }
262
263 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
267 let version_data = version_control.current();
270
271 Box::pin(async move {
272 INFLIGHT_FLUSH_COUNT.inc();
273 self.do_flush(version_data).await;
274 INFLIGHT_FLUSH_COUNT.dec();
275 })
276 }
277
278 async fn do_flush(&mut self, version_data: VersionControlData) {
280 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
281 self.listener.on_flush_begin(self.region_id).await;
282
283 let worker_request = match self.flush_memtables(&version_data).await {
284 Ok(edit) => {
285 let memtables_to_remove = version_data
286 .version
287 .memtables
288 .immutables()
289 .iter()
290 .map(|m| m.id())
291 .collect();
292 let flush_finished = FlushFinished {
293 region_id: self.region_id,
294 flushed_entry_id: version_data.last_entry_id,
296 senders: std::mem::take(&mut self.senders),
297 _timer: timer,
298 edit,
299 memtables_to_remove,
300 };
301 WorkerRequest::Background {
302 region_id: self.region_id,
303 notify: BackgroundNotify::FlushFinished(flush_finished),
304 }
305 }
306 Err(e) => {
307 error!(e; "Failed to flush region {}", self.region_id);
308 timer.stop_and_discard();
310
311 let err = Arc::new(e);
312 self.on_failure(err.clone());
313 WorkerRequest::Background {
314 region_id: self.region_id,
315 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
316 }
317 }
318 };
319 self.send_worker_request(worker_request).await;
320 }
321
322 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
325 let version = &version_data.version;
328 let timer = FLUSH_ELAPSED
329 .with_label_values(&["flush_memtables"])
330 .start_timer();
331
332 let mut write_opts = WriteOptions {
333 write_buffer_size: self.engine_config.sst_write_buffer_size,
334 ..Default::default()
335 };
336 if let Some(row_group_size) = self.row_group_size {
337 write_opts.row_group_size = row_group_size;
338 }
339
340 let memtables = version.memtables.immutables();
341 let mut file_metas = Vec::with_capacity(memtables.len());
342 let mut flushed_bytes = 0;
343 for mem in memtables {
344 if mem.is_empty() {
345 continue;
347 }
348
349 let max_sequence = mem.stats().max_sequence();
350 let iter = mem.iter(None, None, None)?;
351 let source = Source::Iter(iter);
352
353 let write_request = SstWriteRequest {
355 op_type: OperationType::Flush,
356 metadata: version.metadata.clone(),
357 source,
358 cache_manager: self.cache_manager.clone(),
359 storage: version.options.storage.clone(),
360 max_sequence: Some(max_sequence),
361 index_options: self.index_options.clone(),
362 inverted_index_config: self.engine_config.inverted_index.clone(),
363 fulltext_index_config: self.engine_config.fulltext_index.clone(),
364 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
365 };
366
367 let ssts_written = self
368 .access_layer
369 .write_sst(write_request, &write_opts)
370 .await?;
371 if ssts_written.is_empty() {
372 continue;
374 }
375
376 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
377 flushed_bytes += sst_info.file_size;
378 FileMeta {
379 region_id: self.region_id,
380 file_id: sst_info.file_id,
381 time_range: sst_info.time_range,
382 level: 0,
383 file_size: sst_info.file_size,
384 available_indexes: sst_info.index_metadata.build_available_indexes(),
385 index_file_size: sst_info.index_metadata.file_size,
386 num_rows: sst_info.num_rows as u64,
387 num_row_groups: sst_info.num_row_groups,
388 sequence: NonZeroU64::new(max_sequence),
389 }
390 }));
391 }
392
393 if !file_metas.is_empty() {
394 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
395 }
396
397 let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
398 info!(
399 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}s",
400 self.region_id,
401 self.reason.as_str(),
402 file_ids,
403 timer.stop_and_record(),
404 );
405
406 let edit = RegionEdit {
407 files_to_add: file_metas,
408 files_to_remove: Vec::new(),
409 compaction_time_window: None,
410 flushed_entry_id: Some(version_data.last_entry_id),
412 flushed_sequence: Some(version_data.committed_sequence),
413 };
414 info!("Applying {edit:?} to region {}", self.region_id);
415
416 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
417
418 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
419 RegionLeaderState::Downgrading
420 } else {
421 RegionLeaderState::Writable
422 };
423 let version = self
426 .manifest_ctx
427 .update_manifest(expected_state, action_list)
428 .await?;
429 info!(
430 "Successfully update manifest version to {version}, region: {}, reason: {}",
431 self.region_id,
432 self.reason.as_str()
433 );
434
435 Ok(edit)
436 }
437
438 async fn send_worker_request(&self, request: WorkerRequest) {
440 if let Err(e) = self.request_sender.send(request).await {
441 error!(
442 "Failed to notify flush job status for region {}, request: {:?}",
443 self.region_id, e.0
444 );
445 }
446 }
447
448 fn merge(&mut self, mut other: RegionFlushTask) {
450 assert_eq!(self.region_id, other.region_id);
451 self.senders.append(&mut other.senders);
453 }
454}
455
456pub(crate) struct FlushScheduler {
458 region_status: HashMap<RegionId, FlushStatus>,
460 scheduler: SchedulerRef,
462}
463
464impl FlushScheduler {
465 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
467 FlushScheduler {
468 region_status: HashMap::new(),
469 scheduler,
470 }
471 }
472
473 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
475 self.region_status.contains_key(®ion_id)
476 }
477
478 pub(crate) fn schedule_flush(
480 &mut self,
481 region_id: RegionId,
482 version_control: &VersionControlRef,
483 task: RegionFlushTask,
484 ) -> Result<()> {
485 debug_assert_eq!(region_id, task.region_id);
486
487 let version = version_control.current().version;
488 if version.memtables.is_empty() {
489 debug_assert!(!self.region_status.contains_key(®ion_id));
490 task.on_success();
492 return Ok(());
493 }
494
495 FLUSH_REQUESTS_TOTAL
497 .with_label_values(&[task.reason.as_str()])
498 .inc();
499
500 let flush_status = self
502 .region_status
503 .entry(region_id)
504 .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
505 if flush_status.flushing {
507 flush_status.merge_task(task);
509 return Ok(());
510 }
511
512 if flush_status.pending_task.is_some() {
515 flush_status.merge_task(task);
516 return Ok(());
517 }
518
519 if let Err(e) = version_control.freeze_mutable() {
521 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
522
523 self.region_status.remove(®ion_id);
525 return Err(e);
526 }
527 let job = task.into_flush_job(version_control);
529 if let Err(e) = self.scheduler.schedule(job) {
530 error!(e; "Failed to schedule flush job for region {}", region_id);
533
534 self.region_status.remove(®ion_id);
536 return Err(e);
537 }
538
539 flush_status.flushing = true;
540
541 Ok(())
542 }
543
544 pub(crate) fn on_flush_success(
548 &mut self,
549 region_id: RegionId,
550 ) -> Option<(Vec<SenderDdlRequest>, Vec<SenderWriteRequest>)> {
551 let flush_status = self.region_status.get_mut(®ion_id)?;
552
553 flush_status.flushing = false;
555
556 let pending_requests = if flush_status.pending_task.is_none() {
557 let flush_status = self.region_status.remove(®ion_id).unwrap();
560 Some((flush_status.pending_ddls, flush_status.pending_writes))
561 } else {
562 let version_data = flush_status.version_control.current();
563 if version_data.version.memtables.is_empty() {
564 let task = flush_status.pending_task.take().unwrap();
567 task.on_success();
569 let flush_status = self.region_status.remove(®ion_id).unwrap();
573 Some((flush_status.pending_ddls, flush_status.pending_writes))
574 } else {
575 None
577 }
578 };
579
580 if let Err(e) = self.schedule_next_flush() {
582 error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
583 }
584
585 pending_requests
586 }
587
588 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
590 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
591
592 FLUSH_ERRORS_TOTAL.inc();
593
594 let Some(flush_status) = self.region_status.remove(®ion_id) else {
596 return;
597 };
598
599 flush_status.on_failure(err);
601
602 if let Err(e) = self.schedule_next_flush() {
604 error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
605 }
606 }
607
608 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
610 self.remove_region_on_failure(
611 region_id,
612 Arc::new(RegionDroppedSnafu { region_id }.build()),
613 );
614 }
615
616 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
618 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
619 }
620
621 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
623 self.remove_region_on_failure(
624 region_id,
625 Arc::new(RegionTruncatedSnafu { region_id }.build()),
626 );
627 }
628
629 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
630 let Some(flush_status) = self.region_status.remove(®ion_id) else {
632 return;
633 };
634
635 flush_status.on_failure(err);
637 }
638
639 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
644 let status = self.region_status.get_mut(&request.region_id).unwrap();
645 status.pending_ddls.push(request);
646 }
647
648 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
653 let status = self
654 .region_status
655 .get_mut(&request.request.region_id)
656 .unwrap();
657 status.pending_writes.push(request);
658 }
659
660 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
662 self.region_status
663 .get(®ion_id)
664 .map(|status| !status.pending_ddls.is_empty())
665 .unwrap_or(false)
666 }
667
668 pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
670 debug_assert!(self
671 .region_status
672 .values()
673 .all(|status| status.flushing || status.pending_task.is_some()));
674
675 let Some(flush_status) = self
677 .region_status
678 .values_mut()
679 .find(|status| status.pending_task.is_some())
680 else {
681 return Ok(());
682 };
683 debug_assert!(!flush_status.flushing);
684 let task = flush_status.pending_task.take().unwrap();
685 let region_id = flush_status.region_id;
686 let version_control = flush_status.version_control.clone();
687
688 self.schedule_flush(region_id, &version_control, task)
689 }
690}
691
692impl Drop for FlushScheduler {
693 fn drop(&mut self) {
694 for (region_id, flush_status) in self.region_status.drain() {
695 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
697 }
698 }
699}
700
701struct FlushStatus {
705 region_id: RegionId,
707 version_control: VersionControlRef,
709 flushing: bool,
714 pending_task: Option<RegionFlushTask>,
716 pending_ddls: Vec<SenderDdlRequest>,
718 pending_writes: Vec<SenderWriteRequest>,
720}
721
722impl FlushStatus {
723 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
724 FlushStatus {
725 region_id,
726 version_control,
727 flushing: false,
728 pending_task: None,
729 pending_ddls: Vec::new(),
730 pending_writes: Vec::new(),
731 }
732 }
733
734 fn merge_task(&mut self, task: RegionFlushTask) {
736 if let Some(pending) = &mut self.pending_task {
737 pending.merge(task);
738 } else {
739 self.pending_task = Some(task);
740 }
741 }
742
743 fn on_failure(self, err: Arc<Error>) {
744 if let Some(mut task) = self.pending_task {
745 task.on_failure(err.clone());
746 }
747 for ddl in self.pending_ddls {
748 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
749 region_id: self.region_id,
750 }));
751 }
752 for write_req in self.pending_writes {
753 write_req
754 .sender
755 .send(Err(err.clone()).context(FlushRegionSnafu {
756 region_id: self.region_id,
757 }));
758 }
759 }
760}
761
762#[cfg(test)]
763mod tests {
764 use tokio::sync::oneshot;
765
766 use super::*;
767 use crate::cache::CacheManager;
768 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
769 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
770 use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
771
772 #[test]
773 fn test_get_mutable_limit() {
774 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
775 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
776 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
777 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
778 }
779
780 #[test]
781 fn test_over_mutable_limit() {
782 let manager = WriteBufferManagerImpl::new(1000);
784 manager.reserve_mem(400);
785 assert!(!manager.should_flush_engine());
786 assert!(!manager.should_stall());
787
788 manager.reserve_mem(400);
790 assert!(manager.should_flush_engine());
791
792 manager.schedule_free_mem(400);
794 assert!(!manager.should_flush_engine());
795 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
796 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
797
798 manager.free_mem(400);
800 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
801 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
802 }
803
804 #[test]
805 fn test_over_global() {
806 let manager = WriteBufferManagerImpl::new(1000);
808 manager.reserve_mem(1100);
809 assert!(manager.should_stall());
810 manager.schedule_free_mem(200);
812 assert!(manager.should_flush_engine());
813
814 manager.schedule_free_mem(450);
816 assert!(!manager.should_flush_engine());
817
818 manager.reserve_mem(50);
820 assert!(manager.should_flush_engine());
821 manager.reserve_mem(100);
822 assert!(manager.should_flush_engine());
823 }
824
825 #[test]
826 fn test_manager_notify() {
827 let (sender, receiver) = watch::channel(());
828 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
829 manager.reserve_mem(500);
830 assert!(!receiver.has_changed().unwrap());
831 manager.schedule_free_mem(500);
832 assert!(!receiver.has_changed().unwrap());
833 manager.free_mem(500);
834 assert!(receiver.has_changed().unwrap());
835 }
836
837 #[tokio::test]
838 async fn test_schedule_empty() {
839 let env = SchedulerEnv::new().await;
840 let (tx, _rx) = mpsc::channel(4);
841 let mut scheduler = env.mock_flush_scheduler();
842 let builder = VersionControlBuilder::new();
843
844 let version_control = Arc::new(builder.build());
845 let (output_tx, output_rx) = oneshot::channel();
846 let mut task = RegionFlushTask {
847 region_id: builder.region_id(),
848 reason: FlushReason::Others,
849 senders: Vec::new(),
850 request_sender: tx,
851 access_layer: env.access_layer.clone(),
852 listener: WorkerListener::default(),
853 engine_config: Arc::new(MitoConfig::default()),
854 row_group_size: None,
855 cache_manager: Arc::new(CacheManager::default()),
856 manifest_ctx: env
857 .mock_manifest_context(version_control.current().version.metadata.clone())
858 .await,
859 index_options: IndexOptions::default(),
860 };
861 task.push_sender(OptionOutputTx::from(output_tx));
862 scheduler
863 .schedule_flush(builder.region_id(), &version_control, task)
864 .unwrap();
865 assert!(scheduler.region_status.is_empty());
866 let output = output_rx.await.unwrap().unwrap();
867 assert_eq!(output, 0);
868 assert!(scheduler.region_status.is_empty());
869 }
870
871 #[tokio::test]
872 async fn test_schedule_pending_request() {
873 let job_scheduler = Arc::new(VecScheduler::default());
874 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
875 let (tx, _rx) = mpsc::channel(4);
876 let mut scheduler = env.mock_flush_scheduler();
877 let mut builder = VersionControlBuilder::new();
878 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
880 let version_control = Arc::new(builder.build());
881 let version_data = version_control.current();
883 write_rows_to_version(&version_data.version, "host0", 0, 10);
884 let manifest_ctx = env
885 .mock_manifest_context(version_data.version.metadata.clone())
886 .await;
887 let mut tasks: Vec<_> = (0..3)
889 .map(|_| RegionFlushTask {
890 region_id: builder.region_id(),
891 reason: FlushReason::Others,
892 senders: Vec::new(),
893 request_sender: tx.clone(),
894 access_layer: env.access_layer.clone(),
895 listener: WorkerListener::default(),
896 engine_config: Arc::new(MitoConfig::default()),
897 row_group_size: None,
898 cache_manager: Arc::new(CacheManager::default()),
899 manifest_ctx: manifest_ctx.clone(),
900 index_options: IndexOptions::default(),
901 })
902 .collect();
903 let task = tasks.pop().unwrap();
905 scheduler
906 .schedule_flush(builder.region_id(), &version_control, task)
907 .unwrap();
908 assert_eq!(1, scheduler.region_status.len());
910 assert_eq!(1, job_scheduler.num_jobs());
911 let version_data = version_control.current();
913 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
914 let output_rxs: Vec<_> = tasks
916 .into_iter()
917 .map(|mut task| {
918 let (output_tx, output_rx) = oneshot::channel();
919 task.push_sender(OptionOutputTx::from(output_tx));
920 scheduler
921 .schedule_flush(builder.region_id(), &version_control, task)
922 .unwrap();
923 output_rx
924 })
925 .collect();
926 version_control.apply_edit(
928 RegionEdit {
929 files_to_add: Vec::new(),
930 files_to_remove: Vec::new(),
931 compaction_time_window: None,
932 flushed_entry_id: None,
933 flushed_sequence: None,
934 },
935 &[0],
936 builder.file_purger(),
937 );
938 scheduler.on_flush_success(builder.region_id());
939 assert_eq!(1, job_scheduler.num_jobs());
941 assert!(scheduler.region_status.is_empty());
943 for output_rx in output_rxs {
944 let output = output_rx.await.unwrap().unwrap();
945 assert_eq!(output, 0);
946 }
947 }
948}