1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, trace};
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25use strum::IntoStaticStr;
26use tokio::sync::{mpsc, watch};
27
28use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType};
29use crate::cache::CacheManagerRef;
30use crate::config::MitoConfig;
31use crate::error::{
32 Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
33};
34use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
35use crate::memtable::MemtableRanges;
36use crate::metrics::{
37 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
38 INFLIGHT_FLUSH_COUNT,
39};
40use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
41use crate::read::merge::MergeReaderBuilder;
42use crate::read::scan_region::PredicateGroup;
43use crate::read::Source;
44use crate::region::options::{IndexOptions, MergeMode};
45use crate::region::version::{VersionControlData, VersionControlRef};
46use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
47use crate::request::{
48 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
49 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
50};
51use crate::schedule::scheduler::{Job, SchedulerRef};
52use crate::sst::file::FileMeta;
53use crate::sst::parquet::WriteOptions;
54use crate::worker::WorkerListener;
55
56pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
60 fn should_flush_engine(&self) -> bool;
62
63 fn should_stall(&self) -> bool;
65
66 fn reserve_mem(&self, mem: usize);
68
69 fn schedule_free_mem(&self, mem: usize);
74
75 fn free_mem(&self, mem: usize);
77
78 fn memory_usage(&self) -> usize;
80}
81
82pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
83
84#[derive(Debug)]
89pub struct WriteBufferManagerImpl {
90 global_write_buffer_size: usize,
92 mutable_limit: usize,
94 memory_used: AtomicUsize,
96 memory_active: AtomicUsize,
98 notifier: Option<watch::Sender<()>>,
101}
102
103impl WriteBufferManagerImpl {
104 pub fn new(global_write_buffer_size: usize) -> Self {
106 Self {
107 global_write_buffer_size,
108 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
109 memory_used: AtomicUsize::new(0),
110 memory_active: AtomicUsize::new(0),
111 notifier: None,
112 }
113 }
114
115 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
117 self.notifier = Some(notifier);
118 self
119 }
120
121 pub fn mutable_usage(&self) -> usize {
123 self.memory_active.load(Ordering::Relaxed)
124 }
125
126 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
128 global_write_buffer_size / 2
130 }
131}
132
133impl WriteBufferManager for WriteBufferManagerImpl {
134 fn should_flush_engine(&self) -> bool {
135 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
136 if mutable_memtable_memory_usage > self.mutable_limit {
137 debug!(
138 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
139 mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
140 );
141 return true;
142 }
143
144 let memory_usage = self.memory_used.load(Ordering::Relaxed);
145 if memory_usage >= self.global_write_buffer_size {
149 if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
150 debug!(
151 "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
152 mutable_usage: {}.",
153 memory_usage,
154 self.global_write_buffer_size,
155 mutable_memtable_memory_usage);
156 return true;
157 } else {
158 trace!(
159 "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
160 memory_usage,
161 self.global_write_buffer_size,
162 mutable_memtable_memory_usage);
163 }
164 }
165
166 false
167 }
168
169 fn should_stall(&self) -> bool {
170 self.memory_usage() >= self.global_write_buffer_size
171 }
172
173 fn reserve_mem(&self, mem: usize) {
174 self.memory_used.fetch_add(mem, Ordering::Relaxed);
175 self.memory_active.fetch_add(mem, Ordering::Relaxed);
176 }
177
178 fn schedule_free_mem(&self, mem: usize) {
179 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
180 }
181
182 fn free_mem(&self, mem: usize) {
183 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
184 if let Some(notifier) = &self.notifier {
185 let _ = notifier.send(());
189 }
190 }
191
192 fn memory_usage(&self) -> usize {
193 self.memory_used.load(Ordering::Relaxed)
194 }
195}
196
197#[derive(Debug, IntoStaticStr)]
199pub enum FlushReason {
200 Others,
202 EngineFull,
204 Manual,
206 Alter,
208 Periodically,
210 Downgrading,
212}
213
214impl FlushReason {
215 fn as_str(&self) -> &'static str {
217 self.into()
218 }
219}
220
221pub(crate) struct RegionFlushTask {
223 pub(crate) region_id: RegionId,
225 pub(crate) reason: FlushReason,
227 pub(crate) senders: Vec<OutputTx>,
229 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
231
232 pub(crate) access_layer: AccessLayerRef,
233 pub(crate) listener: WorkerListener,
234 pub(crate) engine_config: Arc<MitoConfig>,
235 pub(crate) row_group_size: Option<usize>,
236 pub(crate) cache_manager: CacheManagerRef,
237 pub(crate) manifest_ctx: ManifestContextRef,
238
239 pub(crate) index_options: IndexOptions,
241}
242
243impl RegionFlushTask {
244 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
246 if let Some(sender) = sender.take_inner() {
247 self.senders.push(sender);
248 }
249 }
250
251 fn on_success(self) {
253 for sender in self.senders {
254 sender.send(Ok(0));
255 }
256 }
257
258 fn on_failure(&mut self, err: Arc<Error>) {
260 for sender in self.senders.drain(..) {
261 sender.send(Err(err.clone()).context(FlushRegionSnafu {
262 region_id: self.region_id,
263 }));
264 }
265 }
266
267 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
271 let version_data = version_control.current();
274
275 Box::pin(async move {
276 INFLIGHT_FLUSH_COUNT.inc();
277 self.do_flush(version_data).await;
278 INFLIGHT_FLUSH_COUNT.dec();
279 })
280 }
281
282 async fn do_flush(&mut self, version_data: VersionControlData) {
284 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
285 self.listener.on_flush_begin(self.region_id).await;
286
287 let worker_request = match self.flush_memtables(&version_data).await {
288 Ok(edit) => {
289 let memtables_to_remove = version_data
290 .version
291 .memtables
292 .immutables()
293 .iter()
294 .map(|m| m.id())
295 .collect();
296 let flush_finished = FlushFinished {
297 region_id: self.region_id,
298 flushed_entry_id: version_data.last_entry_id,
300 senders: std::mem::take(&mut self.senders),
301 _timer: timer,
302 edit,
303 memtables_to_remove,
304 };
305 WorkerRequest::Background {
306 region_id: self.region_id,
307 notify: BackgroundNotify::FlushFinished(flush_finished),
308 }
309 }
310 Err(e) => {
311 error!(e; "Failed to flush region {}", self.region_id);
312 timer.stop_and_discard();
314
315 let err = Arc::new(e);
316 self.on_failure(err.clone());
317 WorkerRequest::Background {
318 region_id: self.region_id,
319 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
320 }
321 }
322 };
323 self.send_worker_request(worker_request).await;
324 }
325
326 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
329 let version = &version_data.version;
332 let timer = FLUSH_ELAPSED
333 .with_label_values(&["flush_memtables"])
334 .start_timer();
335
336 let mut write_opts = WriteOptions {
337 write_buffer_size: self.engine_config.sst_write_buffer_size,
338 ..Default::default()
339 };
340 if let Some(row_group_size) = self.row_group_size {
341 write_opts.row_group_size = row_group_size;
342 }
343
344 let memtables = version.memtables.immutables();
345 let mut file_metas = Vec::with_capacity(memtables.len());
346 let mut flushed_bytes = 0;
347 let mut series_count = 0;
348 let mut flush_metrics = Metrics::new(WriteType::Flush);
349 for mem in memtables {
350 if mem.is_empty() {
351 continue;
353 }
354
355 let MemtableRanges { ranges, stats } =
356 mem.ranges(None, PredicateGroup::default(), None)?;
357
358 let max_sequence = stats.max_sequence();
359 series_count += stats.series_count();
360
361 let source = if ranges.len() == 1 {
362 let only_range = ranges.into_values().next().unwrap();
363 let iter = only_range.build_iter()?;
364 Source::Iter(iter)
365 } else {
366 let sources = ranges
368 .into_values()
369 .map(|r| r.build_iter().map(Source::Iter))
370 .collect::<Result<Vec<_>>>()?;
371 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
372 let maybe_dedup = if version.options.append_mode {
373 Box::new(merge_reader) as _
375 } else {
376 match version.options.merge_mode.unwrap_or(MergeMode::LastRow) {
378 MergeMode::LastRow => {
379 Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
380 }
381 MergeMode::LastNonNull => {
382 Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
383 }
384 }
385 };
386 Source::Reader(maybe_dedup)
387 };
388
389 let write_request = SstWriteRequest {
391 op_type: OperationType::Flush,
392 metadata: version.metadata.clone(),
393 source,
394 cache_manager: self.cache_manager.clone(),
395 storage: version.options.storage.clone(),
396 max_sequence: Some(max_sequence),
397 index_options: self.index_options.clone(),
398 inverted_index_config: self.engine_config.inverted_index.clone(),
399 fulltext_index_config: self.engine_config.fulltext_index.clone(),
400 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
401 };
402
403 let (ssts_written, metrics) = self
404 .access_layer
405 .write_sst(write_request, &write_opts, WriteType::Flush)
406 .await?;
407 if ssts_written.is_empty() {
408 continue;
410 }
411 flush_metrics = flush_metrics.merge(metrics);
412
413 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
414 flushed_bytes += sst_info.file_size;
415 FileMeta {
416 region_id: self.region_id,
417 file_id: sst_info.file_id,
418 time_range: sst_info.time_range,
419 level: 0,
420 file_size: sst_info.file_size,
421 available_indexes: sst_info.index_metadata.build_available_indexes(),
422 index_file_size: sst_info.index_metadata.file_size,
423 num_rows: sst_info.num_rows as u64,
424 num_row_groups: sst_info.num_row_groups,
425 sequence: NonZeroU64::new(max_sequence),
426 }
427 }));
428 }
429
430 if !file_metas.is_empty() {
431 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
432 }
433
434 let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
435 info!(
436 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}, metrics: {:?}",
437 self.region_id,
438 self.reason.as_str(),
439 file_ids,
440 series_count,
441 timer.stop_and_record(),
442 flush_metrics,
443 );
444 flush_metrics.observe();
445
446 let edit = RegionEdit {
447 files_to_add: file_metas,
448 files_to_remove: Vec::new(),
449 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
450 compaction_time_window: None,
451 flushed_entry_id: Some(version_data.last_entry_id),
453 flushed_sequence: Some(version_data.committed_sequence),
454 };
455 info!("Applying {edit:?} to region {}", self.region_id);
456
457 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
458
459 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
460 RegionLeaderState::Downgrading
461 } else {
462 let current_state = self.manifest_ctx.current_state();
464 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
465 RegionLeaderState::Staging
466 } else {
467 RegionLeaderState::Writable
468 }
469 };
470 let version = self
473 .manifest_ctx
474 .update_manifest(expected_state, action_list)
475 .await?;
476 info!(
477 "Successfully update manifest version to {version}, region: {}, reason: {}",
478 self.region_id,
479 self.reason.as_str()
480 );
481
482 Ok(edit)
483 }
484
485 async fn send_worker_request(&self, request: WorkerRequest) {
487 if let Err(e) = self
488 .request_sender
489 .send(WorkerRequestWithTime::new(request))
490 .await
491 {
492 error!(
493 "Failed to notify flush job status for region {}, request: {:?}",
494 self.region_id, e.0
495 );
496 }
497 }
498
499 fn merge(&mut self, mut other: RegionFlushTask) {
501 assert_eq!(self.region_id, other.region_id);
502 self.senders.append(&mut other.senders);
504 }
505}
506
507pub(crate) struct FlushScheduler {
509 region_status: HashMap<RegionId, FlushStatus>,
511 scheduler: SchedulerRef,
513}
514
515impl FlushScheduler {
516 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
518 FlushScheduler {
519 region_status: HashMap::new(),
520 scheduler,
521 }
522 }
523
524 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
526 self.region_status.contains_key(®ion_id)
527 }
528
529 pub(crate) fn schedule_flush(
531 &mut self,
532 region_id: RegionId,
533 version_control: &VersionControlRef,
534 task: RegionFlushTask,
535 ) -> Result<()> {
536 debug_assert_eq!(region_id, task.region_id);
537
538 let version = version_control.current().version;
539 if version.memtables.is_empty() {
540 debug_assert!(!self.region_status.contains_key(®ion_id));
541 task.on_success();
543 return Ok(());
544 }
545
546 FLUSH_REQUESTS_TOTAL
548 .with_label_values(&[task.reason.as_str()])
549 .inc();
550
551 let flush_status = self
553 .region_status
554 .entry(region_id)
555 .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
556 if flush_status.flushing {
558 flush_status.merge_task(task);
560 return Ok(());
561 }
562
563 if flush_status.pending_task.is_some() {
566 flush_status.merge_task(task);
567 return Ok(());
568 }
569
570 if let Err(e) = version_control.freeze_mutable() {
572 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
573
574 self.region_status.remove(®ion_id);
576 return Err(e);
577 }
578 let job = task.into_flush_job(version_control);
580 if let Err(e) = self.scheduler.schedule(job) {
581 error!(e; "Failed to schedule flush job for region {}", region_id);
584
585 self.region_status.remove(®ion_id);
587 return Err(e);
588 }
589
590 flush_status.flushing = true;
591
592 Ok(())
593 }
594
595 pub(crate) fn on_flush_success(
599 &mut self,
600 region_id: RegionId,
601 ) -> Option<(
602 Vec<SenderDdlRequest>,
603 Vec<SenderWriteRequest>,
604 Vec<SenderBulkRequest>,
605 )> {
606 let flush_status = self.region_status.get_mut(®ion_id)?;
607
608 flush_status.flushing = false;
610
611 let pending_requests = if flush_status.pending_task.is_none() {
612 let flush_status = self.region_status.remove(®ion_id).unwrap();
615 Some((
616 flush_status.pending_ddls,
617 flush_status.pending_writes,
618 flush_status.pending_bulk_writes,
619 ))
620 } else {
621 let version_data = flush_status.version_control.current();
622 if version_data.version.memtables.is_empty() {
623 let task = flush_status.pending_task.take().unwrap();
626 task.on_success();
628 let flush_status = self.region_status.remove(®ion_id).unwrap();
632 Some((
633 flush_status.pending_ddls,
634 flush_status.pending_writes,
635 flush_status.pending_bulk_writes,
636 ))
637 } else {
638 None
640 }
641 };
642
643 if let Err(e) = self.schedule_next_flush() {
645 error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
646 }
647
648 pending_requests
649 }
650
651 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
653 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
654
655 FLUSH_FAILURE_TOTAL.inc();
656
657 let Some(flush_status) = self.region_status.remove(®ion_id) else {
659 return;
660 };
661
662 flush_status.on_failure(err);
664
665 if let Err(e) = self.schedule_next_flush() {
667 error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
668 }
669 }
670
671 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
673 self.remove_region_on_failure(
674 region_id,
675 Arc::new(RegionDroppedSnafu { region_id }.build()),
676 );
677 }
678
679 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
681 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
682 }
683
684 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
686 self.remove_region_on_failure(
687 region_id,
688 Arc::new(RegionTruncatedSnafu { region_id }.build()),
689 );
690 }
691
692 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
693 let Some(flush_status) = self.region_status.remove(®ion_id) else {
695 return;
696 };
697
698 flush_status.on_failure(err);
700 }
701
702 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
707 let status = self.region_status.get_mut(&request.region_id).unwrap();
708 status.pending_ddls.push(request);
709 }
710
711 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
716 let status = self
717 .region_status
718 .get_mut(&request.request.region_id)
719 .unwrap();
720 status.pending_writes.push(request);
721 }
722
723 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
728 let status = self.region_status.get_mut(&request.region_id).unwrap();
729 status.pending_bulk_writes.push(request);
730 }
731
732 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
734 self.region_status
735 .get(®ion_id)
736 .map(|status| !status.pending_ddls.is_empty())
737 .unwrap_or(false)
738 }
739
740 pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
742 debug_assert!(self
743 .region_status
744 .values()
745 .all(|status| status.flushing || status.pending_task.is_some()));
746
747 let Some(flush_status) = self
749 .region_status
750 .values_mut()
751 .find(|status| status.pending_task.is_some())
752 else {
753 return Ok(());
754 };
755 debug_assert!(!flush_status.flushing);
756 let task = flush_status.pending_task.take().unwrap();
757 let region_id = flush_status.region_id;
758 let version_control = flush_status.version_control.clone();
759
760 self.schedule_flush(region_id, &version_control, task)
761 }
762}
763
764impl Drop for FlushScheduler {
765 fn drop(&mut self) {
766 for (region_id, flush_status) in self.region_status.drain() {
767 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
769 }
770 }
771}
772
773struct FlushStatus {
777 region_id: RegionId,
779 version_control: VersionControlRef,
781 flushing: bool,
786 pending_task: Option<RegionFlushTask>,
788 pending_ddls: Vec<SenderDdlRequest>,
790 pending_writes: Vec<SenderWriteRequest>,
792 pending_bulk_writes: Vec<SenderBulkRequest>,
794}
795
796impl FlushStatus {
797 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
798 FlushStatus {
799 region_id,
800 version_control,
801 flushing: false,
802 pending_task: None,
803 pending_ddls: Vec::new(),
804 pending_writes: Vec::new(),
805 pending_bulk_writes: Vec::new(),
806 }
807 }
808
809 fn merge_task(&mut self, task: RegionFlushTask) {
811 if let Some(pending) = &mut self.pending_task {
812 pending.merge(task);
813 } else {
814 self.pending_task = Some(task);
815 }
816 }
817
818 fn on_failure(self, err: Arc<Error>) {
819 if let Some(mut task) = self.pending_task {
820 task.on_failure(err.clone());
821 }
822 for ddl in self.pending_ddls {
823 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
824 region_id: self.region_id,
825 }));
826 }
827 for write_req in self.pending_writes {
828 write_req
829 .sender
830 .send(Err(err.clone()).context(FlushRegionSnafu {
831 region_id: self.region_id,
832 }));
833 }
834 }
835}
836
837#[cfg(test)]
838mod tests {
839 use tokio::sync::oneshot;
840
841 use super::*;
842 use crate::cache::CacheManager;
843 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
844 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
845 use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
846
847 #[test]
848 fn test_get_mutable_limit() {
849 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
850 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
851 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
852 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
853 }
854
855 #[test]
856 fn test_over_mutable_limit() {
857 let manager = WriteBufferManagerImpl::new(1000);
859 manager.reserve_mem(400);
860 assert!(!manager.should_flush_engine());
861 assert!(!manager.should_stall());
862
863 manager.reserve_mem(400);
865 assert!(manager.should_flush_engine());
866
867 manager.schedule_free_mem(400);
869 assert!(!manager.should_flush_engine());
870 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
871 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
872
873 manager.free_mem(400);
875 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
876 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
877 }
878
879 #[test]
880 fn test_over_global() {
881 let manager = WriteBufferManagerImpl::new(1000);
883 manager.reserve_mem(1100);
884 assert!(manager.should_stall());
885 manager.schedule_free_mem(200);
887 assert!(manager.should_flush_engine());
888
889 manager.schedule_free_mem(450);
891 assert!(!manager.should_flush_engine());
892
893 manager.reserve_mem(50);
895 assert!(manager.should_flush_engine());
896 manager.reserve_mem(100);
897 assert!(manager.should_flush_engine());
898 }
899
900 #[test]
901 fn test_manager_notify() {
902 let (sender, receiver) = watch::channel(());
903 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
904 manager.reserve_mem(500);
905 assert!(!receiver.has_changed().unwrap());
906 manager.schedule_free_mem(500);
907 assert!(!receiver.has_changed().unwrap());
908 manager.free_mem(500);
909 assert!(receiver.has_changed().unwrap());
910 }
911
912 #[tokio::test]
913 async fn test_schedule_empty() {
914 let env = SchedulerEnv::new().await;
915 let (tx, _rx) = mpsc::channel(4);
916 let mut scheduler = env.mock_flush_scheduler();
917 let builder = VersionControlBuilder::new();
918
919 let version_control = Arc::new(builder.build());
920 let (output_tx, output_rx) = oneshot::channel();
921 let mut task = RegionFlushTask {
922 region_id: builder.region_id(),
923 reason: FlushReason::Others,
924 senders: Vec::new(),
925 request_sender: tx,
926 access_layer: env.access_layer.clone(),
927 listener: WorkerListener::default(),
928 engine_config: Arc::new(MitoConfig::default()),
929 row_group_size: None,
930 cache_manager: Arc::new(CacheManager::default()),
931 manifest_ctx: env
932 .mock_manifest_context(version_control.current().version.metadata.clone())
933 .await,
934 index_options: IndexOptions::default(),
935 };
936 task.push_sender(OptionOutputTx::from(output_tx));
937 scheduler
938 .schedule_flush(builder.region_id(), &version_control, task)
939 .unwrap();
940 assert!(scheduler.region_status.is_empty());
941 let output = output_rx.await.unwrap().unwrap();
942 assert_eq!(output, 0);
943 assert!(scheduler.region_status.is_empty());
944 }
945
946 #[tokio::test]
947 async fn test_schedule_pending_request() {
948 let job_scheduler = Arc::new(VecScheduler::default());
949 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
950 let (tx, _rx) = mpsc::channel(4);
951 let mut scheduler = env.mock_flush_scheduler();
952 let mut builder = VersionControlBuilder::new();
953 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
955 let version_control = Arc::new(builder.build());
956 let version_data = version_control.current();
958 write_rows_to_version(&version_data.version, "host0", 0, 10);
959 let manifest_ctx = env
960 .mock_manifest_context(version_data.version.metadata.clone())
961 .await;
962 let mut tasks: Vec<_> = (0..3)
964 .map(|_| RegionFlushTask {
965 region_id: builder.region_id(),
966 reason: FlushReason::Others,
967 senders: Vec::new(),
968 request_sender: tx.clone(),
969 access_layer: env.access_layer.clone(),
970 listener: WorkerListener::default(),
971 engine_config: Arc::new(MitoConfig::default()),
972 row_group_size: None,
973 cache_manager: Arc::new(CacheManager::default()),
974 manifest_ctx: manifest_ctx.clone(),
975 index_options: IndexOptions::default(),
976 })
977 .collect();
978 let task = tasks.pop().unwrap();
980 scheduler
981 .schedule_flush(builder.region_id(), &version_control, task)
982 .unwrap();
983 assert_eq!(1, scheduler.region_status.len());
985 assert_eq!(1, job_scheduler.num_jobs());
986 let version_data = version_control.current();
988 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
989 let output_rxs: Vec<_> = tasks
991 .into_iter()
992 .map(|mut task| {
993 let (output_tx, output_rx) = oneshot::channel();
994 task.push_sender(OptionOutputTx::from(output_tx));
995 scheduler
996 .schedule_flush(builder.region_id(), &version_control, task)
997 .unwrap();
998 output_rx
999 })
1000 .collect();
1001 version_control.apply_edit(
1003 RegionEdit {
1004 files_to_add: Vec::new(),
1005 files_to_remove: Vec::new(),
1006 timestamp_ms: None,
1007 compaction_time_window: None,
1008 flushed_entry_id: None,
1009 flushed_sequence: None,
1010 },
1011 &[0],
1012 builder.file_purger(),
1013 );
1014 scheduler.on_flush_success(builder.region_id());
1015 assert_eq!(1, job_scheduler.num_jobs());
1017 assert!(scheduler.region_status.is_empty());
1019 for output_rx in output_rxs {
1020 let output = output_rx.await.unwrap().unwrap();
1021 assert_eq!(output, 0);
1022 }
1023 }
1024}