1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use bytes::Bytes;
24use common_telemetry::{debug, error, info};
25use datatypes::arrow::datatypes::SchemaRef;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::region_request::RegionFlushReason;
30use store_api::storage::{RegionId, SequenceNumber};
31use strum::IntoStaticStr;
32use tokio::sync::{Semaphore, mpsc, watch};
33
34use crate::access_layer::{
35 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
36};
37use crate::cache::CacheManagerRef;
38use crate::config::MitoConfig;
39use crate::error::{
40 Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
41 RegionTruncatedSnafu, Result,
42};
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
45use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
46use crate::metrics::{
47 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
48 INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::FlatSource;
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
54use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
55use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
56use crate::request::{
57 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
58 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
59};
60use crate::schedule::scheduler::{Job, SchedulerRef};
61use crate::sst::file::FileMeta;
62use crate::sst::parquet::metadata::extract_primary_key_range;
63use crate::sst::parquet::{
64 DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
65};
66use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
67use crate::worker::WorkerListener;
68
69pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
73 fn should_flush_engine(&self) -> bool;
75
76 fn should_stall(&self) -> bool;
78
79 fn reserve_mem(&self, mem: usize);
81
82 fn schedule_free_mem(&self, mem: usize);
87
88 fn free_mem(&self, mem: usize);
90
91 fn memory_usage(&self) -> usize;
93
94 fn flush_limit(&self) -> usize;
99}
100
101pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
102
103#[derive(Debug)]
108pub struct WriteBufferManagerImpl {
109 global_write_buffer_size: usize,
111 mutable_limit: usize,
113 memory_used: AtomicUsize,
115 memory_active: AtomicUsize,
117 notifier: Option<watch::Sender<()>>,
120}
121
122impl WriteBufferManagerImpl {
123 pub fn new(global_write_buffer_size: usize) -> Self {
125 Self {
126 global_write_buffer_size,
127 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
128 memory_used: AtomicUsize::new(0),
129 memory_active: AtomicUsize::new(0),
130 notifier: None,
131 }
132 }
133
134 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
136 self.notifier = Some(notifier);
137 self
138 }
139
140 pub fn mutable_usage(&self) -> usize {
142 self.memory_active.load(Ordering::Relaxed)
143 }
144
145 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
147 global_write_buffer_size / 2
149 }
150}
151
152impl WriteBufferManager for WriteBufferManagerImpl {
153 fn should_flush_engine(&self) -> bool {
154 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
155 if mutable_memtable_memory_usage >= self.mutable_limit {
156 debug!(
157 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
158 mutable_memtable_memory_usage,
159 self.memory_usage(),
160 self.mutable_limit,
161 self.global_write_buffer_size,
162 );
163 return true;
164 }
165
166 let memory_usage = self.memory_used.load(Ordering::Relaxed);
167 if memory_usage >= self.global_write_buffer_size {
168 return true;
169 }
170
171 false
172 }
173
174 fn should_stall(&self) -> bool {
175 self.memory_usage() >= self.global_write_buffer_size
176 }
177
178 fn reserve_mem(&self, mem: usize) {
179 self.memory_used.fetch_add(mem, Ordering::Relaxed);
180 self.memory_active.fetch_add(mem, Ordering::Relaxed);
181 }
182
183 fn schedule_free_mem(&self, mem: usize) {
184 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
185 }
186
187 fn free_mem(&self, mem: usize) {
188 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
189 if let Some(notifier) = &self.notifier {
190 let _ = notifier.send(());
194 }
195 }
196
197 fn memory_usage(&self) -> usize {
198 self.memory_used.load(Ordering::Relaxed)
199 }
200
201 fn flush_limit(&self) -> usize {
202 self.mutable_limit
203 }
204}
205
206#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
208pub enum FlushReason {
209 Others,
211 EngineFull,
213 Manual,
215 Alter,
217 Periodically,
219 Downgrading,
221 EnterStaging,
223 Closing,
225 RegionMigration,
227 Repartition,
229 RemoteWalPrune,
231}
232
233impl FlushReason {
234 fn as_str(&self) -> &'static str {
236 self.into()
237 }
238}
239
240impl From<RegionFlushReason> for FlushReason {
241 fn from(reason: RegionFlushReason) -> Self {
242 match reason {
243 RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
244 RegionFlushReason::Repartition => FlushReason::Repartition,
245 RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
246 }
247 }
248}
249
250pub(crate) struct RegionFlushTask {
252 pub(crate) region_id: RegionId,
254 pub(crate) reason: FlushReason,
256 pub(crate) senders: Vec<OutputTx>,
258 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
260
261 pub(crate) access_layer: AccessLayerRef,
262 pub(crate) listener: WorkerListener,
263 pub(crate) engine_config: Arc<MitoConfig>,
264 pub(crate) row_group_size: Option<usize>,
265 pub(crate) cache_manager: CacheManagerRef,
266 pub(crate) manifest_ctx: ManifestContextRef,
267
268 pub(crate) index_options: IndexOptions,
270 pub(crate) flush_semaphore: Arc<Semaphore>,
272 pub(crate) is_staging: bool,
274 pub(crate) partition_expr: Option<String>,
278}
279
280impl RegionFlushTask {
281 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
283 if let Some(sender) = sender.take_inner() {
284 self.senders.push(sender);
285 }
286 }
287
288 fn on_success(self) {
290 for sender in self.senders {
291 sender.send(Ok(0));
292 }
293 }
294
295 fn on_failure(&mut self, err: Arc<Error>) {
297 for sender in self.senders.drain(..) {
298 sender.send(Err(err.clone()).context(FlushRegionSnafu {
299 region_id: self.region_id,
300 }));
301 }
302 }
303
304 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
308 let version_data = version_control.current();
311
312 Box::pin(async move {
313 INFLIGHT_FLUSH_COUNT.inc();
314 self.do_flush(version_data).await;
315 INFLIGHT_FLUSH_COUNT.dec();
316 })
317 }
318
319 async fn do_flush(&mut self, version_data: VersionControlData) {
321 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
322 self.listener.on_flush_begin(self.region_id).await;
323
324 let worker_request = match self.flush_memtables(&version_data).await {
325 Ok(edit) => {
326 let memtables_to_remove = version_data
327 .version
328 .memtables
329 .immutables()
330 .iter()
331 .map(|m| m.id())
332 .collect();
333 let flush_finished = FlushFinished {
334 region_id: self.region_id,
335 flushed_entry_id: version_data.last_entry_id,
337 senders: std::mem::take(&mut self.senders),
338 _timer: timer,
339 edit,
340 memtables_to_remove,
341 is_staging: self.is_staging,
342 flush_reason: self.reason,
343 };
344 WorkerRequest::Background {
345 region_id: self.region_id,
346 notify: BackgroundNotify::FlushFinished(flush_finished),
347 }
348 }
349 Err(e) => {
350 error!(e; "Failed to flush region {}", self.region_id);
351 timer.stop_and_discard();
353
354 let err = Arc::new(e);
355 self.on_failure(err.clone());
356 WorkerRequest::Background {
357 region_id: self.region_id,
358 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
359 }
360 }
361 };
362 self.send_worker_request(worker_request).await;
363 }
364
365 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
368 let version = &version_data.version;
371 let timer = FLUSH_ELAPSED
372 .with_label_values(&["flush_memtables"])
373 .start_timer();
374
375 let mut write_opts = WriteOptions {
376 write_buffer_size: self.engine_config.sst_write_buffer_size,
377 ..Default::default()
378 };
379 if let Some(row_group_size) = self.row_group_size {
380 write_opts.row_group_size = row_group_size;
381 }
382
383 let DoFlushMemtablesResult {
384 file_metas,
385 flushed_bytes,
386 series_count,
387 encoded_part_count,
388 flush_metrics,
389 } = self.do_flush_memtables(version, write_opts).await?;
390
391 if !file_metas.is_empty() {
392 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
393 }
394
395 let mut file_ids = Vec::with_capacity(file_metas.len());
396 let mut total_rows = 0;
397 let mut total_bytes = 0;
398 for meta in &file_metas {
399 file_ids.push(meta.file_id);
400 total_rows += meta.num_rows;
401 total_bytes += meta.file_size;
402 }
403 info!(
404 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
405 self.region_id,
406 self.reason.as_str(),
407 file_ids,
408 series_count,
409 total_rows,
410 total_bytes,
411 timer.stop_and_record(),
412 encoded_part_count,
413 flush_metrics,
414 );
415 flush_metrics.observe();
416
417 let edit = RegionEdit {
418 files_to_add: file_metas,
419 files_to_remove: Vec::new(),
420 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
421 compaction_time_window: None,
422 flushed_entry_id: Some(version_data.last_entry_id),
424 flushed_sequence: Some(version_data.committed_sequence),
425 committed_sequence: None,
426 };
427 info!(
428 "Applying {edit:?} to region {}, is_staging: {}",
429 self.region_id, self.is_staging
430 );
431
432 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
433
434 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
435 RegionLeaderState::Downgrading
436 } else {
437 let current_state = self.manifest_ctx.current_state();
439 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
440 RegionLeaderState::Staging
441 } else {
442 RegionLeaderState::Writable
443 }
444 };
445 let version = self
448 .manifest_ctx
449 .update_manifest(expected_state, action_list, self.is_staging)
450 .await?;
451 info!(
452 "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
453 self.region_id,
454 self.is_staging,
455 self.reason.as_str()
456 );
457
458 Ok(edit)
459 }
460
461 async fn do_flush_memtables(
462 &self,
463 version: &VersionRef,
464 write_opts: WriteOptions,
465 ) -> Result<DoFlushMemtablesResult> {
466 let memtables = version.memtables.immutables();
467 let mut file_metas = Vec::with_capacity(memtables.len());
468 let mut flushed_bytes = 0;
469 let mut series_count = 0;
470 let mut encoded_part_count = 0;
471 let mut flush_metrics = Metrics::new(WriteType::Flush);
472 let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
473 for mem in memtables {
474 if mem.is_empty() {
475 continue;
477 }
478
479 let compact_start = std::time::Instant::now();
481 if let Err(e) = mem.compact(true) {
482 common_telemetry::error!(e; "Failed to compact memtable before flush");
483 }
484 let compact_cost = compact_start.elapsed();
485 flush_metrics.compact_memtable += compact_cost;
486
487 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
489 let num_mem_ranges = mem_ranges.ranges.len();
490
491 let num_mem_rows = mem_ranges.num_rows();
493 let memtable_series_count = mem_ranges.series_count();
494 let memtable_id = mem.id();
495 series_count += memtable_series_count;
498
499 let flush_start = Instant::now();
500 let FlushFlatMemResult {
501 num_encoded,
502 num_sources,
503 results,
504 } = self
505 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
506 .await?;
507 encoded_part_count += num_encoded;
508 for (source_idx, result) in results.into_iter().enumerate() {
509 let (max_sequence, ssts_written, metrics) = result?;
510 if ssts_written.is_empty() {
511 continue;
513 }
514
515 common_telemetry::debug!(
516 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
517 self.region_id,
518 memtable_id,
519 source_idx,
520 num_sources,
521 metrics
522 );
523
524 flush_metrics = flush_metrics.merge(metrics);
525
526 for sst_info in ssts_written {
527 flushed_bytes += sst_info.file_size;
528 let pk_range = sst_info
529 .file_metadata
530 .as_ref()
531 .and_then(|meta| extract_primary_key_range(meta, &version.metadata));
532 file_metas.push(Self::new_file_meta(
533 self.region_id,
534 max_sequence,
535 sst_info,
536 partition_expr.clone(),
537 pk_range,
538 ));
539 }
540 }
541
542 common_telemetry::debug!(
543 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
544 self.region_id,
545 num_sources,
546 memtable_id,
547 num_mem_ranges,
548 num_encoded,
549 num_mem_rows,
550 flush_start.elapsed(),
551 compact_cost,
552 );
553 }
554
555 Ok(DoFlushMemtablesResult {
556 file_metas,
557 flushed_bytes,
558 series_count,
559 encoded_part_count,
560 flush_metrics,
561 })
562 }
563
564 async fn flush_flat_mem_ranges(
565 &self,
566 version: &VersionRef,
567 write_opts: &WriteOptions,
568 mem_ranges: MemtableRanges,
569 ) -> Result<FlushFlatMemResult> {
570 let batch_schema = to_flat_sst_arrow_schema(
571 &version.metadata,
572 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
573 );
574 let field_column_start =
575 flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
576 let flat_sources = memtable_flat_sources(
577 batch_schema,
578 mem_ranges,
579 &version.options,
580 field_column_start,
581 )?;
582 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
583 let num_encoded = flat_sources.encoded.len();
584 for (source, max_sequence) in flat_sources.sources {
585 let write_request = self.new_write_request(version, max_sequence, source);
586 let access_layer = self.access_layer.clone();
587 let write_opts = write_opts.clone();
588 let semaphore = self.flush_semaphore.clone();
589 let task = common_runtime::spawn_global(async move {
590 let _permit = semaphore.acquire().await.unwrap();
591 let mut metrics = Metrics::new(WriteType::Flush);
592 let ssts = access_layer
593 .write_sst(write_request, &write_opts, &mut metrics)
594 .await?;
595 FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
596 Ok((max_sequence, ssts, metrics))
597 });
598 tasks.push(task);
599 }
600 for (encoded, max_sequence) in flat_sources.encoded {
601 let access_layer = self.access_layer.clone();
602 let cache_manager = self.cache_manager.clone();
603 let region_id = version.metadata.region_id;
604 let semaphore = self.flush_semaphore.clone();
605 let task = common_runtime::spawn_global(async move {
606 let _permit = semaphore.acquire().await.unwrap();
607 let metrics = access_layer
608 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
609 .await?;
610 FLUSH_FILE_TOTAL.inc();
611 Ok((max_sequence, smallvec![encoded.sst_info], metrics))
612 });
613 tasks.push(task);
614 }
615 let num_sources = tasks.len();
616 let results = futures::future::try_join_all(tasks)
617 .await
618 .context(JoinSnafu)?;
619 Ok(FlushFlatMemResult {
620 num_encoded,
621 num_sources,
622 results,
623 })
624 }
625
626 fn new_file_meta(
627 region_id: RegionId,
628 max_sequence: u64,
629 sst_info: SstInfo,
630 partition_expr: Option<PartitionExpr>,
631 primary_key_range: Option<(Bytes, Bytes)>,
632 ) -> FileMeta {
633 let (primary_key_min, primary_key_max) = match primary_key_range {
634 Some((min, max)) => (Some(min), Some(max)),
635 None => (None, None),
636 };
637 FileMeta {
638 region_id,
639 file_id: sst_info.file_id,
640 time_range: sst_info.time_range,
641 level: 0,
642 file_size: sst_info.file_size,
643 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
644 available_indexes: sst_info.index_metadata.build_available_indexes(),
645 indexes: sst_info.index_metadata.build_indexes(),
646 index_file_size: sst_info.index_metadata.file_size,
647 index_version: 0,
648 num_rows: sst_info.num_rows as u64,
649 num_row_groups: sst_info.num_row_groups,
650 sequence: NonZeroU64::new(max_sequence),
651 partition_expr,
652 num_series: sst_info.num_series,
653 primary_key_min,
654 primary_key_max,
655 }
656 }
657
658 fn new_write_request(
659 &self,
660 version: &VersionRef,
661 max_sequence: u64,
662 source: FlatSource,
663 ) -> SstWriteRequest {
664 let flat_format = version
665 .options
666 .sst_format
667 .map(|f| f == FormatType::Flat)
668 .unwrap_or(self.engine_config.default_flat_format);
669 SstWriteRequest {
670 op_type: OperationType::Flush,
671 metadata: version.metadata.clone(),
672 source,
673 cache_manager: self.cache_manager.clone(),
674 storage: version.options.storage.clone(),
675 max_sequence: Some(max_sequence),
676 sst_write_format: if flat_format {
677 FormatType::Flat
678 } else {
679 FormatType::PrimaryKey
680 },
681 index_options: self.index_options.clone(),
682 index_config: self.engine_config.index.clone(),
683 inverted_index_config: self.engine_config.inverted_index.clone(),
684 fulltext_index_config: self.engine_config.fulltext_index.clone(),
685 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
686 #[cfg(feature = "vector_index")]
687 vector_index_config: self.engine_config.vector_index.clone(),
688 }
689 }
690
691 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
693 if let Err(e) = self
694 .request_sender
695 .send(WorkerRequestWithTime::new(request))
696 .await
697 {
698 error!(
699 "Failed to notify flush job status for region {}, request: {:?}",
700 self.region_id, e.0
701 );
702 }
703 }
704
705 fn merge(&mut self, mut other: RegionFlushTask) {
707 assert_eq!(self.region_id, other.region_id);
708 self.senders.append(&mut other.senders);
710 }
711}
712
713struct FlushFlatMemResult {
714 num_encoded: usize,
715 num_sources: usize,
716 results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
717}
718
719struct DoFlushMemtablesResult {
720 file_metas: Vec<FileMeta>,
721 flushed_bytes: u64,
722 series_count: usize,
723 encoded_part_count: usize,
724 flush_metrics: Metrics,
725}
726
727struct FlatSources {
728 sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
729 encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
730}
731
732fn memtable_flat_sources(
734 schema: SchemaRef,
735 mem_ranges: MemtableRanges,
736 options: &RegionOptions,
737 field_column_start: usize,
738) -> Result<FlatSources> {
739 let MemtableRanges { ranges } = mem_ranges;
740 let mut flat_sources = FlatSources {
741 sources: SmallVec::new(),
742 encoded: SmallVec::new(),
743 };
744
745 if ranges.len() == 1 {
746 debug!("Flushing single flat range");
747
748 let only_range = ranges.into_values().next().unwrap();
749 let max_sequence = only_range.stats().max_sequence();
750 if let Some(encoded) = only_range.encoded() {
751 flat_sources.encoded.push((encoded, max_sequence));
752 } else {
753 let iter = only_range.build_record_batch_iter(None, None)?;
754 let iter = maybe_dedup_one(
757 options.append_mode,
758 options.merge_mode(),
759 field_column_start,
760 iter,
761 );
762 flat_sources
763 .sources
764 .push((FlatSource::Iter(iter), max_sequence));
765 };
766 } else {
767 let min_flush_rows = *ENCODE_ROW_THRESHOLD;
768 let total_rows: usize = ranges
770 .values()
771 .filter(|r| r.encoded().is_none())
772 .map(|r| r.num_rows())
773 .sum();
774 debug!(
775 "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
776 total_rows,
777 min_flush_rows,
778 ranges.len()
779 );
780 let mut rows_remaining = total_rows;
781 let mut last_iter_rows = 0;
782 let num_ranges = ranges.len();
783 let mut input_iters = Vec::with_capacity(num_ranges);
784 let mut current_ranges = Vec::new();
785 for (_range_id, range) in ranges {
786 if let Some(encoded) = range.encoded() {
787 let max_sequence = range.stats().max_sequence();
788 flat_sources.encoded.push((encoded, max_sequence));
789 continue;
790 }
791
792 let iter = range.build_record_batch_iter(None, None)?;
793 input_iters.push(iter);
794 let range_rows = range.num_rows();
795 last_iter_rows += range_rows;
796 rows_remaining -= range_rows;
797 current_ranges.push(range);
798
799 if last_iter_rows >= min_flush_rows
802 && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
803 {
804 debug!(
805 "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
806 last_iter_rows,
807 min_flush_rows,
808 input_iters.len(),
809 rows_remaining
810 );
811
812 let max_sequence = current_ranges
814 .iter()
815 .map(|r| r.stats().max_sequence())
816 .max()
817 .unwrap_or(0);
818
819 let maybe_dedup = merge_and_dedup(
820 &schema,
821 options.append_mode,
822 options.merge_mode(),
823 field_column_start,
824 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
825 )?;
826
827 flat_sources
828 .sources
829 .push((FlatSource::Iter(maybe_dedup), max_sequence));
830 last_iter_rows = 0;
831 current_ranges.clear();
832 }
833 }
834
835 if !input_iters.is_empty() {
837 debug!(
838 "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
839 last_iter_rows,
840 min_flush_rows,
841 input_iters.len(),
842 rows_remaining
843 );
844 let max_sequence = current_ranges
845 .iter()
846 .map(|r| r.stats().max_sequence())
847 .max()
848 .unwrap_or(0);
849
850 let maybe_dedup = merge_and_dedup(
851 &schema,
852 options.append_mode,
853 options.merge_mode(),
854 field_column_start,
855 input_iters,
856 )?;
857
858 flat_sources
859 .sources
860 .push((FlatSource::Iter(maybe_dedup), max_sequence));
861 }
862 }
863
864 Ok(flat_sources)
865}
866
867pub fn merge_and_dedup(
912 schema: &SchemaRef,
913 append_mode: bool,
914 merge_mode: MergeMode,
915 field_column_start: usize,
916 input_iters: Vec<BoxedRecordBatchIterator>,
917) -> Result<BoxedRecordBatchIterator> {
918 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
919 let maybe_dedup = if append_mode {
920 Box::new(merge_iter) as _
922 } else {
923 match merge_mode {
925 MergeMode::LastRow => {
926 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
927 }
928 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
929 merge_iter,
930 FlatLastNonNull::new(field_column_start, false),
931 )) as _,
932 }
933 };
934 Ok(maybe_dedup)
935}
936
937pub fn maybe_dedup_one(
938 append_mode: bool,
939 merge_mode: MergeMode,
940 field_column_start: usize,
941 input_iter: BoxedRecordBatchIterator,
942) -> BoxedRecordBatchIterator {
943 if append_mode {
944 input_iter
946 } else {
947 match merge_mode {
949 MergeMode::LastRow => {
950 Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
951 }
952 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
953 input_iter,
954 FlatLastNonNull::new(field_column_start, false),
955 )),
956 }
957 }
958}
959
960pub(crate) struct FlushScheduler {
962 region_status: HashMap<RegionId, FlushStatus>,
964 scheduler: SchedulerRef,
966}
967
968impl FlushScheduler {
969 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
971 FlushScheduler {
972 region_status: HashMap::new(),
973 scheduler,
974 }
975 }
976
977 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
979 self.region_status.contains_key(®ion_id)
980 }
981
982 fn schedule_flush_task(
983 &mut self,
984 version_control: &VersionControlRef,
985 task: RegionFlushTask,
986 ) -> Result<()> {
987 let region_id = task.region_id;
988
989 if let Err(e) = version_control.freeze_mutable() {
991 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
992
993 return Err(e);
994 }
995 let job = task.into_flush_job(version_control);
997 if let Err(e) = self.scheduler.schedule(job) {
998 error!(e; "Failed to schedule flush job for region {}", region_id);
1001
1002 return Err(e);
1003 }
1004 Ok(())
1005 }
1006
1007 pub(crate) fn schedule_flush(
1009 &mut self,
1010 region_id: RegionId,
1011 version_control: &VersionControlRef,
1012 task: RegionFlushTask,
1013 ) -> Result<()> {
1014 debug_assert_eq!(region_id, task.region_id);
1015
1016 let version = version_control.current().version;
1017 if version.memtables.is_empty() {
1018 debug_assert!(!self.region_status.contains_key(®ion_id));
1019 task.on_success();
1021 return Ok(());
1022 }
1023
1024 FLUSH_REQUESTS_TOTAL
1026 .with_label_values(&[task.reason.as_str()])
1027 .inc();
1028
1029 if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
1031 debug!("Merging flush task for region {}", region_id);
1033 flush_status.merge_task(task);
1034 return Ok(());
1035 }
1036
1037 self.schedule_flush_task(version_control, task)?;
1038
1039 let _ = self.region_status.insert(
1041 region_id,
1042 FlushStatus::new(region_id, version_control.clone()),
1043 );
1044
1045 Ok(())
1046 }
1047
1048 pub(crate) fn on_flush_success(
1052 &mut self,
1053 region_id: RegionId,
1054 ) -> Option<(
1055 Vec<SenderDdlRequest>,
1056 Vec<SenderWriteRequest>,
1057 Vec<SenderBulkRequest>,
1058 )> {
1059 let flush_status = self.region_status.get_mut(®ion_id)?;
1060 if flush_status.pending_task.is_none() {
1062 debug!(
1065 "Region {} doesn't have any pending flush task, removing it from the status",
1066 region_id
1067 );
1068 let flush_status = self.region_status.remove(®ion_id).unwrap();
1069 return Some((
1070 flush_status.pending_ddls,
1071 flush_status.pending_writes,
1072 flush_status.pending_bulk_writes,
1073 ));
1074 }
1075
1076 let version_data = flush_status.version_control.current();
1078 if version_data.version.memtables.is_empty() {
1079 let task = flush_status.pending_task.take().unwrap();
1082 task.on_success();
1084 debug!(
1085 "Region {} has nothing to flush, removing it from the status",
1086 region_id
1087 );
1088 let flush_status = self.region_status.remove(®ion_id).unwrap();
1090 return Some((
1091 flush_status.pending_ddls,
1092 flush_status.pending_writes,
1093 flush_status.pending_bulk_writes,
1094 ));
1095 }
1096
1097 debug!("Scheduling pending flush task for region {}", region_id);
1099 let task = flush_status.pending_task.take().unwrap();
1101 let version_control = flush_status.version_control.clone();
1102 if let Err(err) = self.schedule_flush_task(&version_control, task) {
1103 error!(
1104 err;
1105 "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1106 );
1107 }
1108 None
1110 }
1111
1112 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1114 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1115
1116 FLUSH_FAILURE_TOTAL.inc();
1117
1118 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1120 return;
1121 };
1122
1123 flush_status.on_failure(err);
1125 }
1126
1127 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1129 self.remove_region_on_failure(
1130 region_id,
1131 Arc::new(RegionDroppedSnafu { region_id }.build()),
1132 );
1133 }
1134
1135 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1137 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1138 }
1139
1140 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1142 self.remove_region_on_failure(
1143 region_id,
1144 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1145 );
1146 }
1147
1148 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1149 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1151 return;
1152 };
1153
1154 flush_status.on_failure(err);
1156 }
1157
1158 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1163 let status = self.region_status.get_mut(&request.region_id).unwrap();
1164 status.pending_ddls.push(request);
1165 }
1166
1167 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1172 let status = self
1173 .region_status
1174 .get_mut(&request.request.region_id)
1175 .unwrap();
1176 status.pending_writes.push(request);
1177 }
1178
1179 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1184 let status = self.region_status.get_mut(&request.region_id).unwrap();
1185 status.pending_bulk_writes.push(request);
1186 }
1187
1188 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1190 self.region_status
1191 .get(®ion_id)
1192 .map(|status| !status.pending_ddls.is_empty())
1193 .unwrap_or(false)
1194 }
1195}
1196
1197impl Drop for FlushScheduler {
1198 fn drop(&mut self) {
1199 for (region_id, flush_status) in self.region_status.drain() {
1200 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1202 }
1203 }
1204}
1205
1206struct FlushStatus {
1210 region_id: RegionId,
1212 version_control: VersionControlRef,
1214 pending_task: Option<RegionFlushTask>,
1216 pending_ddls: Vec<SenderDdlRequest>,
1218 pending_writes: Vec<SenderWriteRequest>,
1220 pending_bulk_writes: Vec<SenderBulkRequest>,
1222}
1223
1224impl FlushStatus {
1225 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1226 FlushStatus {
1227 region_id,
1228 version_control,
1229 pending_task: None,
1230 pending_ddls: Vec::new(),
1231 pending_writes: Vec::new(),
1232 pending_bulk_writes: Vec::new(),
1233 }
1234 }
1235
1236 fn merge_task(&mut self, task: RegionFlushTask) {
1238 if let Some(pending) = &mut self.pending_task {
1239 pending.merge(task);
1240 } else {
1241 self.pending_task = Some(task);
1242 }
1243 }
1244
1245 fn on_failure(self, err: Arc<Error>) {
1246 if let Some(mut task) = self.pending_task {
1247 task.on_failure(err.clone());
1248 }
1249 for ddl in self.pending_ddls {
1250 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1251 region_id: self.region_id,
1252 }));
1253 }
1254 for write_req in self.pending_writes {
1255 write_req
1256 .sender
1257 .send(Err(err.clone()).context(FlushRegionSnafu {
1258 region_id: self.region_id,
1259 }));
1260 }
1261 }
1262}
1263
1264#[cfg(test)]
1265mod tests {
1266 use mito_codec::row_converter::build_primary_key_codec;
1267 use tokio::sync::oneshot;
1268
1269 use super::*;
1270 use crate::cache::CacheManager;
1271 use crate::memtable::bulk::part::BulkPartConverter;
1272 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1273 use crate::memtable::{Memtable, RangesOptions};
1274 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1275 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1276 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1277 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1278
1279 #[test]
1280 fn test_get_mutable_limit() {
1281 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1282 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1283 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1284 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1285 }
1286
1287 #[test]
1288 fn test_over_mutable_limit() {
1289 let manager = WriteBufferManagerImpl::new(1000);
1291 manager.reserve_mem(400);
1292 assert!(!manager.should_flush_engine());
1293 assert!(!manager.should_stall());
1294
1295 manager.reserve_mem(400);
1297 assert!(manager.should_flush_engine());
1298
1299 manager.schedule_free_mem(400);
1301 assert!(!manager.should_flush_engine());
1302 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1303 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1304
1305 manager.free_mem(400);
1307 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1308 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1309 }
1310
1311 #[test]
1312 fn test_over_global() {
1313 let manager = WriteBufferManagerImpl::new(1000);
1315 manager.reserve_mem(1100);
1316 assert!(manager.should_stall());
1317 manager.schedule_free_mem(200);
1319 assert!(manager.should_flush_engine());
1320 assert!(manager.should_stall());
1321
1322 manager.schedule_free_mem(450);
1324 assert!(manager.should_flush_engine());
1325 assert!(manager.should_stall());
1326
1327 manager.reserve_mem(50);
1329 assert!(manager.should_flush_engine());
1330 manager.reserve_mem(100);
1331 assert!(manager.should_flush_engine());
1332 }
1333
1334 #[test]
1335 fn test_manager_notify() {
1336 let (sender, receiver) = watch::channel(());
1337 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1338 manager.reserve_mem(500);
1339 assert!(!receiver.has_changed().unwrap());
1340 manager.schedule_free_mem(500);
1341 assert!(!receiver.has_changed().unwrap());
1342 manager.free_mem(500);
1343 assert!(receiver.has_changed().unwrap());
1344 }
1345
1346 #[tokio::test]
1347 async fn test_schedule_empty() {
1348 let env = SchedulerEnv::new().await;
1349 let (tx, _rx) = mpsc::channel(4);
1350 let mut scheduler = env.mock_flush_scheduler();
1351 let builder = VersionControlBuilder::new();
1352
1353 let version_control = Arc::new(builder.build());
1354 let (output_tx, output_rx) = oneshot::channel();
1355 let mut task = RegionFlushTask {
1356 region_id: builder.region_id(),
1357 reason: FlushReason::Others,
1358 senders: Vec::new(),
1359 request_sender: tx,
1360 access_layer: env.access_layer.clone(),
1361 listener: WorkerListener::default(),
1362 engine_config: Arc::new(MitoConfig::default()),
1363 row_group_size: None,
1364 cache_manager: Arc::new(CacheManager::default()),
1365 manifest_ctx: env
1366 .mock_manifest_context(version_control.current().version.metadata.clone())
1367 .await,
1368 index_options: IndexOptions::default(),
1369 flush_semaphore: Arc::new(Semaphore::new(2)),
1370 is_staging: false,
1371 partition_expr: None,
1372 };
1373 task.push_sender(OptionOutputTx::from(output_tx));
1374 scheduler
1375 .schedule_flush(builder.region_id(), &version_control, task)
1376 .unwrap();
1377 assert!(scheduler.region_status.is_empty());
1378 let output = output_rx.await.unwrap().unwrap();
1379 assert_eq!(output, 0);
1380 assert!(scheduler.region_status.is_empty());
1381 }
1382
1383 #[tokio::test]
1384 async fn test_schedule_pending_request() {
1385 let job_scheduler = Arc::new(VecScheduler::default());
1386 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1387 let (tx, _rx) = mpsc::channel(4);
1388 let mut scheduler = env.mock_flush_scheduler();
1389 let mut builder = VersionControlBuilder::new();
1390 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1392 let version_control = Arc::new(builder.build());
1393 let version_data = version_control.current();
1395 write_rows_to_version(&version_data.version, "host0", 0, 10);
1396 let manifest_ctx = env
1397 .mock_manifest_context(version_data.version.metadata.clone())
1398 .await;
1399 let mut tasks: Vec<_> = (0..3)
1401 .map(|_| RegionFlushTask {
1402 region_id: builder.region_id(),
1403 reason: FlushReason::Others,
1404 senders: Vec::new(),
1405 request_sender: tx.clone(),
1406 access_layer: env.access_layer.clone(),
1407 listener: WorkerListener::default(),
1408 engine_config: Arc::new(MitoConfig::default()),
1409 row_group_size: None,
1410 cache_manager: Arc::new(CacheManager::default()),
1411 manifest_ctx: manifest_ctx.clone(),
1412 index_options: IndexOptions::default(),
1413 flush_semaphore: Arc::new(Semaphore::new(2)),
1414 is_staging: false,
1415 partition_expr: None,
1416 })
1417 .collect();
1418 let task = tasks.pop().unwrap();
1420 scheduler
1421 .schedule_flush(builder.region_id(), &version_control, task)
1422 .unwrap();
1423 assert_eq!(1, scheduler.region_status.len());
1425 assert_eq!(1, job_scheduler.num_jobs());
1426 let version_data = version_control.current();
1428 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1429 let output_rxs: Vec<_> = tasks
1431 .into_iter()
1432 .map(|mut task| {
1433 let (output_tx, output_rx) = oneshot::channel();
1434 task.push_sender(OptionOutputTx::from(output_tx));
1435 scheduler
1436 .schedule_flush(builder.region_id(), &version_control, task)
1437 .unwrap();
1438 output_rx
1439 })
1440 .collect();
1441 version_control.apply_edit(
1443 Some(RegionEdit {
1444 files_to_add: Vec::new(),
1445 files_to_remove: Vec::new(),
1446 timestamp_ms: None,
1447 compaction_time_window: None,
1448 flushed_entry_id: None,
1449 flushed_sequence: None,
1450 committed_sequence: None,
1451 }),
1452 &[0],
1453 builder.file_purger(),
1454 );
1455 scheduler.on_flush_success(builder.region_id());
1456 assert_eq!(1, job_scheduler.num_jobs());
1458 assert!(scheduler.region_status.is_empty());
1460 for output_rx in output_rxs {
1461 let output = output_rx.await.unwrap().unwrap();
1462 assert_eq!(output, 0);
1463 }
1464 }
1465
1466 #[test]
1468 fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1469 let metadata = metadata_for_test();
1471 let schema = to_flat_sst_arrow_schema(
1472 &metadata,
1473 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1474 );
1475
1476 let capacity = 16;
1479 let pk_codec = build_primary_key_codec(&metadata);
1480 let mut converter =
1481 BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1482 let kvs = build_key_values_with_ts_seq_values(
1483 &metadata,
1484 "dup_key".to_string(),
1485 1,
1486 vec![1000i64, 1000i64].into_iter(),
1487 vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1488 1,
1489 );
1490 converter.append_key_values(&kvs).unwrap();
1491 let part = converter.convert().unwrap();
1492
1493 let build_ranges = |append_mode: bool| -> MemtableRanges {
1496 let memtable = crate::memtable::bulk::BulkMemtable::new(
1497 1,
1498 crate::memtable::bulk::BulkMemtableConfig::default(),
1499 metadata.clone(),
1500 None,
1501 None,
1502 append_mode,
1503 MergeMode::LastRow,
1504 );
1505 memtable.write_bulk(part.clone()).unwrap();
1506 memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1507 };
1508
1509 {
1511 let mem_ranges = build_ranges(false);
1512 assert_eq!(1, mem_ranges.ranges.len());
1513
1514 let options = RegionOptions {
1515 append_mode: false,
1516 merge_mode: Some(MergeMode::LastRow),
1517 ..Default::default()
1518 };
1519
1520 let flat_sources = memtable_flat_sources(
1521 schema.clone(),
1522 mem_ranges,
1523 &options,
1524 metadata.primary_key.len(),
1525 )
1526 .unwrap();
1527 assert!(flat_sources.encoded.is_empty());
1528 assert_eq!(1, flat_sources.sources.len());
1529
1530 let mut total_rows = 0usize;
1532 for (source, _sequence) in flat_sources.sources {
1533 match source {
1534 crate::read::FlatSource::Iter(iter) => {
1535 for rb in iter {
1536 total_rows += rb.unwrap().num_rows();
1537 }
1538 }
1539 crate::read::FlatSource::Stream(_) => unreachable!(),
1540 }
1541 }
1542 assert_eq!(1, total_rows, "dedup should keep a single row");
1543 }
1544
1545 {
1547 let mem_ranges = build_ranges(true);
1548 assert_eq!(1, mem_ranges.ranges.len());
1549
1550 let options = RegionOptions {
1551 append_mode: true,
1552 ..Default::default()
1553 };
1554
1555 let flat_sources =
1556 memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1557 .unwrap();
1558 assert!(flat_sources.encoded.is_empty());
1559 assert_eq!(1, flat_sources.sources.len());
1560
1561 let mut total_rows = 0usize;
1562 for (source, _sequence) in flat_sources.sources {
1563 match source {
1564 crate::read::FlatSource::Iter(iter) => {
1565 for rb in iter {
1566 total_rows += rb.unwrap().num_rows();
1567 }
1568 }
1569 crate::read::FlatSource::Stream(_) => unreachable!(),
1570 }
1571 }
1572 assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1573 }
1574 }
1575
1576 #[tokio::test]
1577 async fn test_schedule_pending_request_on_flush_success() {
1578 common_telemetry::init_default_ut_logging();
1579 let job_scheduler = Arc::new(VecScheduler::default());
1580 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1581 let (tx, _rx) = mpsc::channel(4);
1582 let mut scheduler = env.mock_flush_scheduler();
1583 let mut builder = VersionControlBuilder::new();
1584 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1586 let version_control = Arc::new(builder.build());
1587 let version_data = version_control.current();
1589 write_rows_to_version(&version_data.version, "host0", 0, 10);
1590 let manifest_ctx = env
1591 .mock_manifest_context(version_data.version.metadata.clone())
1592 .await;
1593 let mut tasks: Vec<_> = (0..2)
1595 .map(|_| RegionFlushTask {
1596 region_id: builder.region_id(),
1597 reason: FlushReason::Others,
1598 senders: Vec::new(),
1599 request_sender: tx.clone(),
1600 access_layer: env.access_layer.clone(),
1601 listener: WorkerListener::default(),
1602 engine_config: Arc::new(MitoConfig::default()),
1603 row_group_size: None,
1604 cache_manager: Arc::new(CacheManager::default()),
1605 manifest_ctx: manifest_ctx.clone(),
1606 index_options: IndexOptions::default(),
1607 flush_semaphore: Arc::new(Semaphore::new(2)),
1608 is_staging: false,
1609 partition_expr: None,
1610 })
1611 .collect();
1612 let task = tasks.pop().unwrap();
1614 scheduler
1615 .schedule_flush(builder.region_id(), &version_control, task)
1616 .unwrap();
1617 assert_eq!(1, scheduler.region_status.len());
1619 assert_eq!(1, job_scheduler.num_jobs());
1620 let task = tasks.pop().unwrap();
1622 scheduler
1623 .schedule_flush(builder.region_id(), &version_control, task)
1624 .unwrap();
1625 assert!(
1626 scheduler
1627 .region_status
1628 .get(&builder.region_id())
1629 .unwrap()
1630 .pending_task
1631 .is_some()
1632 );
1633
1634 let version_data = version_control.current();
1636 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1637 version_control.apply_edit(
1639 Some(RegionEdit {
1640 files_to_add: Vec::new(),
1641 files_to_remove: Vec::new(),
1642 timestamp_ms: None,
1643 compaction_time_window: None,
1644 flushed_entry_id: None,
1645 flushed_sequence: None,
1646 committed_sequence: None,
1647 }),
1648 &[0],
1649 builder.file_purger(),
1650 );
1651 write_rows_to_version(&version_data.version, "host1", 0, 10);
1652 scheduler.on_flush_success(builder.region_id());
1653 assert_eq!(2, job_scheduler.num_jobs());
1654 assert!(
1656 scheduler
1657 .region_status
1658 .get(&builder.region_id())
1659 .unwrap()
1660 .pending_task
1661 .is_none()
1662 );
1663 }
1664}