1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, trace};
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use strum::IntoStaticStr;
26use tokio::sync::{mpsc, watch};
27
28use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
29use crate::cache::CacheManagerRef;
30use crate::config::MitoConfig;
31use crate::error::{
32 Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
33};
34use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
35use crate::metrics::{
36 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
37 INFLIGHT_FLUSH_COUNT,
38};
39use crate::read::Source;
40use crate::region::options::IndexOptions;
41use crate::region::version::{VersionControlData, VersionControlRef};
42use crate::region::{ManifestContextRef, RegionLeaderState};
43use crate::request::{
44 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
45 SenderDdlRequest, SenderWriteRequest, WorkerRequest,
46};
47use crate::schedule::scheduler::{Job, SchedulerRef};
48use crate::sst::file::FileMeta;
49use crate::sst::parquet::WriteOptions;
50use crate::worker::WorkerListener;
51
52pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
56 fn should_flush_engine(&self) -> bool;
58
59 fn should_stall(&self) -> bool;
61
62 fn reserve_mem(&self, mem: usize);
64
65 fn schedule_free_mem(&self, mem: usize);
70
71 fn free_mem(&self, mem: usize);
73
74 fn memory_usage(&self) -> usize;
76}
77
78pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
79
80#[derive(Debug)]
85pub struct WriteBufferManagerImpl {
86 global_write_buffer_size: usize,
88 mutable_limit: usize,
90 memory_used: AtomicUsize,
92 memory_active: AtomicUsize,
94 notifier: Option<watch::Sender<()>>,
97}
98
99impl WriteBufferManagerImpl {
100 pub fn new(global_write_buffer_size: usize) -> Self {
102 Self {
103 global_write_buffer_size,
104 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
105 memory_used: AtomicUsize::new(0),
106 memory_active: AtomicUsize::new(0),
107 notifier: None,
108 }
109 }
110
111 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
113 self.notifier = Some(notifier);
114 self
115 }
116
117 pub fn mutable_usage(&self) -> usize {
119 self.memory_active.load(Ordering::Relaxed)
120 }
121
122 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
124 global_write_buffer_size / 2
126 }
127}
128
129impl WriteBufferManager for WriteBufferManagerImpl {
130 fn should_flush_engine(&self) -> bool {
131 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
132 if mutable_memtable_memory_usage > self.mutable_limit {
133 debug!(
134 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
135 mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
136 );
137 return true;
138 }
139
140 let memory_usage = self.memory_used.load(Ordering::Relaxed);
141 if memory_usage >= self.global_write_buffer_size {
145 if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
146 debug!(
147 "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
148 mutable_usage: {}.",
149 memory_usage,
150 self.global_write_buffer_size,
151 mutable_memtable_memory_usage);
152 return true;
153 } else {
154 trace!(
155 "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
156 memory_usage,
157 self.global_write_buffer_size,
158 mutable_memtable_memory_usage);
159 }
160 }
161
162 false
163 }
164
165 fn should_stall(&self) -> bool {
166 self.memory_usage() >= self.global_write_buffer_size
167 }
168
169 fn reserve_mem(&self, mem: usize) {
170 self.memory_used.fetch_add(mem, Ordering::Relaxed);
171 self.memory_active.fetch_add(mem, Ordering::Relaxed);
172 }
173
174 fn schedule_free_mem(&self, mem: usize) {
175 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
176 }
177
178 fn free_mem(&self, mem: usize) {
179 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
180 if let Some(notifier) = &self.notifier {
181 let _ = notifier.send(());
185 }
186 }
187
188 fn memory_usage(&self) -> usize {
189 self.memory_used.load(Ordering::Relaxed)
190 }
191}
192
193#[derive(Debug, IntoStaticStr)]
195pub enum FlushReason {
196 Others,
198 EngineFull,
200 Manual,
202 Alter,
204 Periodically,
206 Downgrading,
208}
209
210impl FlushReason {
211 fn as_str(&self) -> &'static str {
213 self.into()
214 }
215}
216
217pub(crate) struct RegionFlushTask {
219 pub(crate) region_id: RegionId,
221 pub(crate) reason: FlushReason,
223 pub(crate) senders: Vec<OutputTx>,
225 pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
227
228 pub(crate) access_layer: AccessLayerRef,
229 pub(crate) listener: WorkerListener,
230 pub(crate) engine_config: Arc<MitoConfig>,
231 pub(crate) row_group_size: Option<usize>,
232 pub(crate) cache_manager: CacheManagerRef,
233 pub(crate) manifest_ctx: ManifestContextRef,
234
235 pub(crate) index_options: IndexOptions,
237}
238
239impl RegionFlushTask {
240 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
242 if let Some(sender) = sender.take_inner() {
243 self.senders.push(sender);
244 }
245 }
246
247 fn on_success(self) {
249 for sender in self.senders {
250 sender.send(Ok(0));
251 }
252 }
253
254 fn on_failure(&mut self, err: Arc<Error>) {
256 for sender in self.senders.drain(..) {
257 sender.send(Err(err.clone()).context(FlushRegionSnafu {
258 region_id: self.region_id,
259 }));
260 }
261 }
262
263 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
267 let version_data = version_control.current();
270
271 Box::pin(async move {
272 INFLIGHT_FLUSH_COUNT.inc();
273 self.do_flush(version_data).await;
274 INFLIGHT_FLUSH_COUNT.dec();
275 })
276 }
277
278 async fn do_flush(&mut self, version_data: VersionControlData) {
280 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
281 self.listener.on_flush_begin(self.region_id).await;
282
283 let worker_request = match self.flush_memtables(&version_data).await {
284 Ok(edit) => {
285 let memtables_to_remove = version_data
286 .version
287 .memtables
288 .immutables()
289 .iter()
290 .map(|m| m.id())
291 .collect();
292 let flush_finished = FlushFinished {
293 region_id: self.region_id,
294 flushed_entry_id: version_data.last_entry_id,
296 senders: std::mem::take(&mut self.senders),
297 _timer: timer,
298 edit,
299 memtables_to_remove,
300 };
301 WorkerRequest::Background {
302 region_id: self.region_id,
303 notify: BackgroundNotify::FlushFinished(flush_finished),
304 }
305 }
306 Err(e) => {
307 error!(e; "Failed to flush region {}", self.region_id);
308 timer.stop_and_discard();
310
311 let err = Arc::new(e);
312 self.on_failure(err.clone());
313 WorkerRequest::Background {
314 region_id: self.region_id,
315 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
316 }
317 }
318 };
319 self.send_worker_request(worker_request).await;
320 }
321
322 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
325 let version = &version_data.version;
328 let timer = FLUSH_ELAPSED
329 .with_label_values(&["flush_memtables"])
330 .start_timer();
331
332 let mut write_opts = WriteOptions {
333 write_buffer_size: self.engine_config.sst_write_buffer_size,
334 ..Default::default()
335 };
336 if let Some(row_group_size) = self.row_group_size {
337 write_opts.row_group_size = row_group_size;
338 }
339
340 let memtables = version.memtables.immutables();
341 let mut file_metas = Vec::with_capacity(memtables.len());
342 let mut flushed_bytes = 0;
343 let mut series_count = 0;
344 for mem in memtables {
345 if mem.is_empty() {
346 continue;
348 }
349
350 let stats = mem.stats();
351 let max_sequence = stats.max_sequence();
352 series_count += stats.series_count();
353 let iter = mem.iter(None, None, None)?;
354 let source = Source::Iter(iter);
355
356 let write_request = SstWriteRequest {
358 op_type: OperationType::Flush,
359 metadata: version.metadata.clone(),
360 source,
361 cache_manager: self.cache_manager.clone(),
362 storage: version.options.storage.clone(),
363 max_sequence: Some(max_sequence),
364 index_options: self.index_options.clone(),
365 inverted_index_config: self.engine_config.inverted_index.clone(),
366 fulltext_index_config: self.engine_config.fulltext_index.clone(),
367 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
368 };
369
370 let ssts_written = self
371 .access_layer
372 .write_sst(write_request, &write_opts)
373 .await?;
374 if ssts_written.is_empty() {
375 continue;
377 }
378
379 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
380 flushed_bytes += sst_info.file_size;
381 FileMeta {
382 region_id: self.region_id,
383 file_id: sst_info.file_id,
384 time_range: sst_info.time_range,
385 level: 0,
386 file_size: sst_info.file_size,
387 available_indexes: sst_info.index_metadata.build_available_indexes(),
388 index_file_size: sst_info.index_metadata.file_size,
389 num_rows: sst_info.num_rows as u64,
390 num_row_groups: sst_info.num_row_groups,
391 sequence: NonZeroU64::new(max_sequence),
392 }
393 }));
394 }
395
396 if !file_metas.is_empty() {
397 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
398 }
399
400 let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
401 info!(
402 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}s",
403 self.region_id,
404 self.reason.as_str(),
405 file_ids,
406 series_count,
407 timer.stop_and_record(),
408 );
409
410 let edit = RegionEdit {
411 files_to_add: file_metas,
412 files_to_remove: Vec::new(),
413 compaction_time_window: None,
414 flushed_entry_id: Some(version_data.last_entry_id),
416 flushed_sequence: Some(version_data.committed_sequence),
417 };
418 info!("Applying {edit:?} to region {}", self.region_id);
419
420 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
421
422 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
423 RegionLeaderState::Downgrading
424 } else {
425 RegionLeaderState::Writable
426 };
427 let version = self
430 .manifest_ctx
431 .update_manifest(expected_state, action_list)
432 .await?;
433 info!(
434 "Successfully update manifest version to {version}, region: {}, reason: {}",
435 self.region_id,
436 self.reason.as_str()
437 );
438
439 Ok(edit)
440 }
441
442 async fn send_worker_request(&self, request: WorkerRequest) {
444 if let Err(e) = self.request_sender.send(request).await {
445 error!(
446 "Failed to notify flush job status for region {}, request: {:?}",
447 self.region_id, e.0
448 );
449 }
450 }
451
452 fn merge(&mut self, mut other: RegionFlushTask) {
454 assert_eq!(self.region_id, other.region_id);
455 self.senders.append(&mut other.senders);
457 }
458}
459
460pub(crate) struct FlushScheduler {
462 region_status: HashMap<RegionId, FlushStatus>,
464 scheduler: SchedulerRef,
466}
467
468impl FlushScheduler {
469 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
471 FlushScheduler {
472 region_status: HashMap::new(),
473 scheduler,
474 }
475 }
476
477 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
479 self.region_status.contains_key(®ion_id)
480 }
481
482 pub(crate) fn schedule_flush(
484 &mut self,
485 region_id: RegionId,
486 version_control: &VersionControlRef,
487 task: RegionFlushTask,
488 ) -> Result<()> {
489 debug_assert_eq!(region_id, task.region_id);
490
491 let version = version_control.current().version;
492 if version.memtables.is_empty() {
493 debug_assert!(!self.region_status.contains_key(®ion_id));
494 task.on_success();
496 return Ok(());
497 }
498
499 FLUSH_REQUESTS_TOTAL
501 .with_label_values(&[task.reason.as_str()])
502 .inc();
503
504 let flush_status = self
506 .region_status
507 .entry(region_id)
508 .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
509 if flush_status.flushing {
511 flush_status.merge_task(task);
513 return Ok(());
514 }
515
516 if flush_status.pending_task.is_some() {
519 flush_status.merge_task(task);
520 return Ok(());
521 }
522
523 if let Err(e) = version_control.freeze_mutable() {
525 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
526
527 self.region_status.remove(®ion_id);
529 return Err(e);
530 }
531 let job = task.into_flush_job(version_control);
533 if let Err(e) = self.scheduler.schedule(job) {
534 error!(e; "Failed to schedule flush job for region {}", region_id);
537
538 self.region_status.remove(®ion_id);
540 return Err(e);
541 }
542
543 flush_status.flushing = true;
544
545 Ok(())
546 }
547
548 pub(crate) fn on_flush_success(
552 &mut self,
553 region_id: RegionId,
554 ) -> Option<(
555 Vec<SenderDdlRequest>,
556 Vec<SenderWriteRequest>,
557 Vec<SenderBulkRequest>,
558 )> {
559 let flush_status = self.region_status.get_mut(®ion_id)?;
560
561 flush_status.flushing = false;
563
564 let pending_requests = if flush_status.pending_task.is_none() {
565 let flush_status = self.region_status.remove(®ion_id).unwrap();
568 Some((
569 flush_status.pending_ddls,
570 flush_status.pending_writes,
571 flush_status.pending_bulk_writes,
572 ))
573 } else {
574 let version_data = flush_status.version_control.current();
575 if version_data.version.memtables.is_empty() {
576 let task = flush_status.pending_task.take().unwrap();
579 task.on_success();
581 let flush_status = self.region_status.remove(®ion_id).unwrap();
585 Some((
586 flush_status.pending_ddls,
587 flush_status.pending_writes,
588 flush_status.pending_bulk_writes,
589 ))
590 } else {
591 None
593 }
594 };
595
596 if let Err(e) = self.schedule_next_flush() {
598 error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
599 }
600
601 pending_requests
602 }
603
604 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
606 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
607
608 FLUSH_FAILURE_TOTAL.inc();
609
610 let Some(flush_status) = self.region_status.remove(®ion_id) else {
612 return;
613 };
614
615 flush_status.on_failure(err);
617
618 if let Err(e) = self.schedule_next_flush() {
620 error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
621 }
622 }
623
624 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
626 self.remove_region_on_failure(
627 region_id,
628 Arc::new(RegionDroppedSnafu { region_id }.build()),
629 );
630 }
631
632 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
634 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
635 }
636
637 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
639 self.remove_region_on_failure(
640 region_id,
641 Arc::new(RegionTruncatedSnafu { region_id }.build()),
642 );
643 }
644
645 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
646 let Some(flush_status) = self.region_status.remove(®ion_id) else {
648 return;
649 };
650
651 flush_status.on_failure(err);
653 }
654
655 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
660 let status = self.region_status.get_mut(&request.region_id).unwrap();
661 status.pending_ddls.push(request);
662 }
663
664 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
669 let status = self
670 .region_status
671 .get_mut(&request.request.region_id)
672 .unwrap();
673 status.pending_writes.push(request);
674 }
675
676 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
681 let status = self.region_status.get_mut(&request.region_id).unwrap();
682 status.pending_bulk_writes.push(request);
683 }
684
685 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
687 self.region_status
688 .get(®ion_id)
689 .map(|status| !status.pending_ddls.is_empty())
690 .unwrap_or(false)
691 }
692
693 pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
695 debug_assert!(self
696 .region_status
697 .values()
698 .all(|status| status.flushing || status.pending_task.is_some()));
699
700 let Some(flush_status) = self
702 .region_status
703 .values_mut()
704 .find(|status| status.pending_task.is_some())
705 else {
706 return Ok(());
707 };
708 debug_assert!(!flush_status.flushing);
709 let task = flush_status.pending_task.take().unwrap();
710 let region_id = flush_status.region_id;
711 let version_control = flush_status.version_control.clone();
712
713 self.schedule_flush(region_id, &version_control, task)
714 }
715}
716
717impl Drop for FlushScheduler {
718 fn drop(&mut self) {
719 for (region_id, flush_status) in self.region_status.drain() {
720 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
722 }
723 }
724}
725
726struct FlushStatus {
730 region_id: RegionId,
732 version_control: VersionControlRef,
734 flushing: bool,
739 pending_task: Option<RegionFlushTask>,
741 pending_ddls: Vec<SenderDdlRequest>,
743 pending_writes: Vec<SenderWriteRequest>,
745 pending_bulk_writes: Vec<SenderBulkRequest>,
747}
748
749impl FlushStatus {
750 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
751 FlushStatus {
752 region_id,
753 version_control,
754 flushing: false,
755 pending_task: None,
756 pending_ddls: Vec::new(),
757 pending_writes: Vec::new(),
758 pending_bulk_writes: Vec::new(),
759 }
760 }
761
762 fn merge_task(&mut self, task: RegionFlushTask) {
764 if let Some(pending) = &mut self.pending_task {
765 pending.merge(task);
766 } else {
767 self.pending_task = Some(task);
768 }
769 }
770
771 fn on_failure(self, err: Arc<Error>) {
772 if let Some(mut task) = self.pending_task {
773 task.on_failure(err.clone());
774 }
775 for ddl in self.pending_ddls {
776 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
777 region_id: self.region_id,
778 }));
779 }
780 for write_req in self.pending_writes {
781 write_req
782 .sender
783 .send(Err(err.clone()).context(FlushRegionSnafu {
784 region_id: self.region_id,
785 }));
786 }
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use tokio::sync::oneshot;
793
794 use super::*;
795 use crate::cache::CacheManager;
796 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
797 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
798 use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
799
800 #[test]
801 fn test_get_mutable_limit() {
802 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
803 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
804 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
805 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
806 }
807
808 #[test]
809 fn test_over_mutable_limit() {
810 let manager = WriteBufferManagerImpl::new(1000);
812 manager.reserve_mem(400);
813 assert!(!manager.should_flush_engine());
814 assert!(!manager.should_stall());
815
816 manager.reserve_mem(400);
818 assert!(manager.should_flush_engine());
819
820 manager.schedule_free_mem(400);
822 assert!(!manager.should_flush_engine());
823 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
824 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
825
826 manager.free_mem(400);
828 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
829 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
830 }
831
832 #[test]
833 fn test_over_global() {
834 let manager = WriteBufferManagerImpl::new(1000);
836 manager.reserve_mem(1100);
837 assert!(manager.should_stall());
838 manager.schedule_free_mem(200);
840 assert!(manager.should_flush_engine());
841
842 manager.schedule_free_mem(450);
844 assert!(!manager.should_flush_engine());
845
846 manager.reserve_mem(50);
848 assert!(manager.should_flush_engine());
849 manager.reserve_mem(100);
850 assert!(manager.should_flush_engine());
851 }
852
853 #[test]
854 fn test_manager_notify() {
855 let (sender, receiver) = watch::channel(());
856 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
857 manager.reserve_mem(500);
858 assert!(!receiver.has_changed().unwrap());
859 manager.schedule_free_mem(500);
860 assert!(!receiver.has_changed().unwrap());
861 manager.free_mem(500);
862 assert!(receiver.has_changed().unwrap());
863 }
864
865 #[tokio::test]
866 async fn test_schedule_empty() {
867 let env = SchedulerEnv::new().await;
868 let (tx, _rx) = mpsc::channel(4);
869 let mut scheduler = env.mock_flush_scheduler();
870 let builder = VersionControlBuilder::new();
871
872 let version_control = Arc::new(builder.build());
873 let (output_tx, output_rx) = oneshot::channel();
874 let mut task = RegionFlushTask {
875 region_id: builder.region_id(),
876 reason: FlushReason::Others,
877 senders: Vec::new(),
878 request_sender: tx,
879 access_layer: env.access_layer.clone(),
880 listener: WorkerListener::default(),
881 engine_config: Arc::new(MitoConfig::default()),
882 row_group_size: None,
883 cache_manager: Arc::new(CacheManager::default()),
884 manifest_ctx: env
885 .mock_manifest_context(version_control.current().version.metadata.clone())
886 .await,
887 index_options: IndexOptions::default(),
888 };
889 task.push_sender(OptionOutputTx::from(output_tx));
890 scheduler
891 .schedule_flush(builder.region_id(), &version_control, task)
892 .unwrap();
893 assert!(scheduler.region_status.is_empty());
894 let output = output_rx.await.unwrap().unwrap();
895 assert_eq!(output, 0);
896 assert!(scheduler.region_status.is_empty());
897 }
898
899 #[tokio::test]
900 async fn test_schedule_pending_request() {
901 let job_scheduler = Arc::new(VecScheduler::default());
902 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
903 let (tx, _rx) = mpsc::channel(4);
904 let mut scheduler = env.mock_flush_scheduler();
905 let mut builder = VersionControlBuilder::new();
906 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
908 let version_control = Arc::new(builder.build());
909 let version_data = version_control.current();
911 write_rows_to_version(&version_data.version, "host0", 0, 10);
912 let manifest_ctx = env
913 .mock_manifest_context(version_data.version.metadata.clone())
914 .await;
915 let mut tasks: Vec<_> = (0..3)
917 .map(|_| RegionFlushTask {
918 region_id: builder.region_id(),
919 reason: FlushReason::Others,
920 senders: Vec::new(),
921 request_sender: tx.clone(),
922 access_layer: env.access_layer.clone(),
923 listener: WorkerListener::default(),
924 engine_config: Arc::new(MitoConfig::default()),
925 row_group_size: None,
926 cache_manager: Arc::new(CacheManager::default()),
927 manifest_ctx: manifest_ctx.clone(),
928 index_options: IndexOptions::default(),
929 })
930 .collect();
931 let task = tasks.pop().unwrap();
933 scheduler
934 .schedule_flush(builder.region_id(), &version_control, task)
935 .unwrap();
936 assert_eq!(1, scheduler.region_status.len());
938 assert_eq!(1, job_scheduler.num_jobs());
939 let version_data = version_control.current();
941 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
942 let output_rxs: Vec<_> = tasks
944 .into_iter()
945 .map(|mut task| {
946 let (output_tx, output_rx) = oneshot::channel();
947 task.push_sender(OptionOutputTx::from(output_tx));
948 scheduler
949 .schedule_flush(builder.region_id(), &version_control, task)
950 .unwrap();
951 output_rx
952 })
953 .collect();
954 version_control.apply_edit(
956 RegionEdit {
957 files_to_add: Vec::new(),
958 files_to_remove: Vec::new(),
959 compaction_time_window: None,
960 flushed_entry_id: None,
961 flushed_sequence: None,
962 },
963 &[0],
964 builder.file_purger(),
965 );
966 scheduler.on_flush_success(builder.region_id());
967 assert_eq!(1, job_scheduler.num_jobs());
969 assert!(scheduler.region_status.is_empty());
971 for output_rx in output_rxs {
972 let output = output_rx.await.unwrap().unwrap();
973 assert_eq!(output, 0);
974 }
975 }
976}