1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::{RegionId, SequenceNumber};
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39 Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
40 RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44 BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48 INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
58use crate::request::{
59 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72 fn should_flush_engine(&self) -> bool;
74
75 fn should_stall(&self) -> bool;
77
78 fn reserve_mem(&self, mem: usize);
80
81 fn schedule_free_mem(&self, mem: usize);
86
87 fn free_mem(&self, mem: usize);
89
90 fn memory_usage(&self) -> usize;
92
93 fn flush_limit(&self) -> usize;
98}
99
100pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
101
102#[derive(Debug)]
107pub struct WriteBufferManagerImpl {
108 global_write_buffer_size: usize,
110 mutable_limit: usize,
112 memory_used: AtomicUsize,
114 memory_active: AtomicUsize,
116 notifier: Option<watch::Sender<()>>,
119}
120
121impl WriteBufferManagerImpl {
122 pub fn new(global_write_buffer_size: usize) -> Self {
124 Self {
125 global_write_buffer_size,
126 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
127 memory_used: AtomicUsize::new(0),
128 memory_active: AtomicUsize::new(0),
129 notifier: None,
130 }
131 }
132
133 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
135 self.notifier = Some(notifier);
136 self
137 }
138
139 pub fn mutable_usage(&self) -> usize {
141 self.memory_active.load(Ordering::Relaxed)
142 }
143
144 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
146 global_write_buffer_size / 2
148 }
149}
150
151impl WriteBufferManager for WriteBufferManagerImpl {
152 fn should_flush_engine(&self) -> bool {
153 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
154 if mutable_memtable_memory_usage >= self.mutable_limit {
155 debug!(
156 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
157 mutable_memtable_memory_usage,
158 self.memory_usage(),
159 self.mutable_limit,
160 self.global_write_buffer_size,
161 );
162 return true;
163 }
164
165 let memory_usage = self.memory_used.load(Ordering::Relaxed);
166 if memory_usage >= self.global_write_buffer_size {
167 return true;
168 }
169
170 false
171 }
172
173 fn should_stall(&self) -> bool {
174 self.memory_usage() >= self.global_write_buffer_size
175 }
176
177 fn reserve_mem(&self, mem: usize) {
178 self.memory_used.fetch_add(mem, Ordering::Relaxed);
179 self.memory_active.fetch_add(mem, Ordering::Relaxed);
180 }
181
182 fn schedule_free_mem(&self, mem: usize) {
183 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
184 }
185
186 fn free_mem(&self, mem: usize) {
187 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
188 if let Some(notifier) = &self.notifier {
189 let _ = notifier.send(());
193 }
194 }
195
196 fn memory_usage(&self) -> usize {
197 self.memory_used.load(Ordering::Relaxed)
198 }
199
200 fn flush_limit(&self) -> usize {
201 self.mutable_limit
202 }
203}
204
205#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
207pub enum FlushReason {
208 Others,
210 EngineFull,
212 Manual,
214 Alter,
216 Periodically,
218 Downgrading,
220 EnterStaging,
222}
223
224impl FlushReason {
225 fn as_str(&self) -> &'static str {
227 self.into()
228 }
229}
230
231pub(crate) struct RegionFlushTask {
233 pub(crate) region_id: RegionId,
235 pub(crate) reason: FlushReason,
237 pub(crate) senders: Vec<OutputTx>,
239 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
241
242 pub(crate) access_layer: AccessLayerRef,
243 pub(crate) listener: WorkerListener,
244 pub(crate) engine_config: Arc<MitoConfig>,
245 pub(crate) row_group_size: Option<usize>,
246 pub(crate) cache_manager: CacheManagerRef,
247 pub(crate) manifest_ctx: ManifestContextRef,
248
249 pub(crate) index_options: IndexOptions,
251 pub(crate) flush_semaphore: Arc<Semaphore>,
253 pub(crate) is_staging: bool,
255 pub(crate) partition_expr: Option<String>,
259}
260
261impl RegionFlushTask {
262 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
264 if let Some(sender) = sender.take_inner() {
265 self.senders.push(sender);
266 }
267 }
268
269 fn on_success(self) {
271 for sender in self.senders {
272 sender.send(Ok(0));
273 }
274 }
275
276 fn on_failure(&mut self, err: Arc<Error>) {
278 for sender in self.senders.drain(..) {
279 sender.send(Err(err.clone()).context(FlushRegionSnafu {
280 region_id: self.region_id,
281 }));
282 }
283 }
284
285 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
289 let version_data = version_control.current();
292
293 Box::pin(async move {
294 INFLIGHT_FLUSH_COUNT.inc();
295 self.do_flush(version_data).await;
296 INFLIGHT_FLUSH_COUNT.dec();
297 })
298 }
299
300 async fn do_flush(&mut self, version_data: VersionControlData) {
302 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
303 self.listener.on_flush_begin(self.region_id).await;
304
305 let worker_request = match self.flush_memtables(&version_data).await {
306 Ok(edit) => {
307 let memtables_to_remove = version_data
308 .version
309 .memtables
310 .immutables()
311 .iter()
312 .map(|m| m.id())
313 .collect();
314 let flush_finished = FlushFinished {
315 region_id: self.region_id,
316 flushed_entry_id: version_data.last_entry_id,
318 senders: std::mem::take(&mut self.senders),
319 _timer: timer,
320 edit,
321 memtables_to_remove,
322 is_staging: self.is_staging,
323 };
324 WorkerRequest::Background {
325 region_id: self.region_id,
326 notify: BackgroundNotify::FlushFinished(flush_finished),
327 }
328 }
329 Err(e) => {
330 error!(e; "Failed to flush region {}", self.region_id);
331 timer.stop_and_discard();
333
334 let err = Arc::new(e);
335 self.on_failure(err.clone());
336 WorkerRequest::Background {
337 region_id: self.region_id,
338 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
339 }
340 }
341 };
342 self.send_worker_request(worker_request).await;
343 }
344
345 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
348 let version = &version_data.version;
351 let timer = FLUSH_ELAPSED
352 .with_label_values(&["flush_memtables"])
353 .start_timer();
354
355 let mut write_opts = WriteOptions {
356 write_buffer_size: self.engine_config.sst_write_buffer_size,
357 ..Default::default()
358 };
359 if let Some(row_group_size) = self.row_group_size {
360 write_opts.row_group_size = row_group_size;
361 }
362
363 let DoFlushMemtablesResult {
364 file_metas,
365 flushed_bytes,
366 series_count,
367 flush_metrics,
368 } = self.do_flush_memtables(version, write_opts).await?;
369
370 if !file_metas.is_empty() {
371 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
372 }
373
374 let mut file_ids = Vec::with_capacity(file_metas.len());
375 let mut total_rows = 0;
376 let mut total_bytes = 0;
377 for meta in &file_metas {
378 file_ids.push(meta.file_id);
379 total_rows += meta.num_rows;
380 total_bytes += meta.file_size;
381 }
382 info!(
383 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
384 self.region_id,
385 self.reason.as_str(),
386 file_ids,
387 series_count,
388 total_rows,
389 total_bytes,
390 timer.stop_and_record(),
391 flush_metrics,
392 );
393 flush_metrics.observe();
394
395 let edit = RegionEdit {
396 files_to_add: file_metas,
397 files_to_remove: Vec::new(),
398 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
399 compaction_time_window: None,
400 flushed_entry_id: Some(version_data.last_entry_id),
402 flushed_sequence: Some(version_data.committed_sequence),
403 committed_sequence: None,
404 };
405 info!(
406 "Applying {edit:?} to region {}, is_staging: {}",
407 self.region_id, self.is_staging
408 );
409
410 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
411
412 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
413 RegionLeaderState::Downgrading
414 } else {
415 let current_state = self.manifest_ctx.current_state();
417 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
418 RegionLeaderState::Staging
419 } else {
420 RegionLeaderState::Writable
421 }
422 };
423 let version = self
426 .manifest_ctx
427 .update_manifest(expected_state, action_list, self.is_staging)
428 .await?;
429 info!(
430 "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
431 self.region_id,
432 self.is_staging,
433 self.reason.as_str()
434 );
435
436 Ok(edit)
437 }
438
439 async fn do_flush_memtables(
440 &self,
441 version: &VersionRef,
442 write_opts: WriteOptions,
443 ) -> Result<DoFlushMemtablesResult> {
444 let memtables = version.memtables.immutables();
445 let mut file_metas = Vec::with_capacity(memtables.len());
446 let mut flushed_bytes = 0;
447 let mut series_count = 0;
448 let mut flush_metrics = Metrics::new(WriteType::Flush);
449 let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
450 for mem in memtables {
451 if mem.is_empty() {
452 continue;
454 }
455
456 let compact_start = std::time::Instant::now();
458 if let Err(e) = mem.compact(true) {
459 common_telemetry::error!(e; "Failed to compact memtable before flush");
460 }
461 let compact_cost = compact_start.elapsed();
462 flush_metrics.compact_memtable += compact_cost;
463
464 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
466 let num_mem_ranges = mem_ranges.ranges.len();
467
468 let num_mem_rows = mem_ranges.num_rows();
470 let memtable_series_count = mem_ranges.series_count();
471 let memtable_id = mem.id();
472 series_count += memtable_series_count;
475
476 if mem_ranges.is_record_batch() {
477 let flush_start = Instant::now();
478 let FlushFlatMemResult {
479 num_encoded,
480 num_sources,
481 results,
482 } = self
483 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
484 .await?;
485 for (source_idx, result) in results.into_iter().enumerate() {
486 let (max_sequence, ssts_written, metrics) = result?;
487 if ssts_written.is_empty() {
488 continue;
490 }
491
492 common_telemetry::debug!(
493 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
494 self.region_id,
495 memtable_id,
496 source_idx,
497 num_sources,
498 metrics
499 );
500
501 flush_metrics = flush_metrics.merge(metrics);
502
503 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
504 flushed_bytes += sst_info.file_size;
505 Self::new_file_meta(
506 self.region_id,
507 max_sequence,
508 sst_info,
509 partition_expr.clone(),
510 )
511 }));
512 }
513
514 common_telemetry::debug!(
515 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
516 self.region_id,
517 num_sources,
518 memtable_id,
519 num_mem_ranges,
520 num_encoded,
521 num_mem_rows,
522 flush_start.elapsed(),
523 compact_cost,
524 );
525 } else {
526 let max_sequence = mem_ranges.max_sequence();
527 let source = memtable_source(mem_ranges, &version.options).await?;
528
529 let source = Either::Left(source);
531 let write_request = self.new_write_request(version, max_sequence, source);
532
533 let mut metrics = Metrics::new(WriteType::Flush);
534 let ssts_written = self
535 .access_layer
536 .write_sst(write_request, &write_opts, &mut metrics)
537 .await?;
538 if ssts_written.is_empty() {
539 continue;
541 }
542
543 debug!(
544 "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
545 self.region_id, num_mem_ranges, num_mem_rows, metrics
546 );
547
548 flush_metrics = flush_metrics.merge(metrics);
549
550 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
551 flushed_bytes += sst_info.file_size;
552 Self::new_file_meta(
553 self.region_id,
554 max_sequence,
555 sst_info,
556 partition_expr.clone(),
557 )
558 }));
559 };
560 }
561
562 Ok(DoFlushMemtablesResult {
563 file_metas,
564 flushed_bytes,
565 series_count,
566 flush_metrics,
567 })
568 }
569
570 async fn flush_flat_mem_ranges(
571 &self,
572 version: &VersionRef,
573 write_opts: &WriteOptions,
574 mem_ranges: MemtableRanges,
575 ) -> Result<FlushFlatMemResult> {
576 let batch_schema = to_flat_sst_arrow_schema(
577 &version.metadata,
578 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
579 );
580 let flat_sources = memtable_flat_sources(
581 batch_schema,
582 mem_ranges,
583 &version.options,
584 version.metadata.primary_key.len(),
585 )?;
586 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
587 let num_encoded = flat_sources.encoded.len();
588 for (source, max_sequence) in flat_sources.sources {
589 let source = Either::Right(source);
590 let write_request = self.new_write_request(version, max_sequence, source);
591 let access_layer = self.access_layer.clone();
592 let write_opts = write_opts.clone();
593 let semaphore = self.flush_semaphore.clone();
594 let task = common_runtime::spawn_global(async move {
595 let _permit = semaphore.acquire().await.unwrap();
596 let mut metrics = Metrics::new(WriteType::Flush);
597 let ssts = access_layer
598 .write_sst(write_request, &write_opts, &mut metrics)
599 .await?;
600 Ok((max_sequence, ssts, metrics))
601 });
602 tasks.push(task);
603 }
604 for (encoded, max_sequence) in flat_sources.encoded {
605 let access_layer = self.access_layer.clone();
606 let cache_manager = self.cache_manager.clone();
607 let region_id = version.metadata.region_id;
608 let semaphore = self.flush_semaphore.clone();
609 let task = common_runtime::spawn_global(async move {
610 let _permit = semaphore.acquire().await.unwrap();
611 let metrics = access_layer
612 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
613 .await?;
614 Ok((max_sequence, smallvec![encoded.sst_info], metrics))
615 });
616 tasks.push(task);
617 }
618 let num_sources = tasks.len();
619 let results = futures::future::try_join_all(tasks)
620 .await
621 .context(JoinSnafu)?;
622 Ok(FlushFlatMemResult {
623 num_encoded,
624 num_sources,
625 results,
626 })
627 }
628
629 fn new_file_meta(
630 region_id: RegionId,
631 max_sequence: u64,
632 sst_info: SstInfo,
633 partition_expr: Option<PartitionExpr>,
634 ) -> FileMeta {
635 FileMeta {
636 region_id,
637 file_id: sst_info.file_id,
638 time_range: sst_info.time_range,
639 level: 0,
640 file_size: sst_info.file_size,
641 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
642 available_indexes: sst_info.index_metadata.build_available_indexes(),
643 indexes: sst_info.index_metadata.build_indexes(),
644 index_file_size: sst_info.index_metadata.file_size,
645 index_version: 0,
646 num_rows: sst_info.num_rows as u64,
647 num_row_groups: sst_info.num_row_groups,
648 sequence: NonZeroU64::new(max_sequence),
649 partition_expr,
650 num_series: sst_info.num_series,
651 }
652 }
653
654 fn new_write_request(
655 &self,
656 version: &VersionRef,
657 max_sequence: u64,
658 source: Either<Source, FlatSource>,
659 ) -> SstWriteRequest {
660 SstWriteRequest {
661 op_type: OperationType::Flush,
662 metadata: version.metadata.clone(),
663 source,
664 cache_manager: self.cache_manager.clone(),
665 storage: version.options.storage.clone(),
666 max_sequence: Some(max_sequence),
667 index_options: self.index_options.clone(),
668 index_config: self.engine_config.index.clone(),
669 inverted_index_config: self.engine_config.inverted_index.clone(),
670 fulltext_index_config: self.engine_config.fulltext_index.clone(),
671 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
672 #[cfg(feature = "vector_index")]
673 vector_index_config: self.engine_config.vector_index.clone(),
674 }
675 }
676
677 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
679 if let Err(e) = self
680 .request_sender
681 .send(WorkerRequestWithTime::new(request))
682 .await
683 {
684 error!(
685 "Failed to notify flush job status for region {}, request: {:?}",
686 self.region_id, e.0
687 );
688 }
689 }
690
691 fn merge(&mut self, mut other: RegionFlushTask) {
693 assert_eq!(self.region_id, other.region_id);
694 self.senders.append(&mut other.senders);
696 }
697}
698
699struct FlushFlatMemResult {
700 num_encoded: usize,
701 num_sources: usize,
702 results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
703}
704
705struct DoFlushMemtablesResult {
706 file_metas: Vec<FileMeta>,
707 flushed_bytes: u64,
708 series_count: usize,
709 flush_metrics: Metrics,
710}
711
712async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
714 let source = if mem_ranges.ranges.len() == 1 {
715 let only_range = mem_ranges.ranges.into_values().next().unwrap();
716 let iter = only_range.build_iter()?;
717 Source::Iter(iter)
718 } else {
719 let sources = mem_ranges
721 .ranges
722 .into_values()
723 .map(|r| r.build_iter().map(Source::Iter))
724 .collect::<Result<Vec<_>>>()?;
725 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
726 let maybe_dedup = if options.append_mode {
727 Box::new(merge_reader) as _
729 } else {
730 match options.merge_mode.unwrap_or(MergeMode::LastRow) {
732 MergeMode::LastRow => {
733 Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
734 }
735 MergeMode::LastNonNull => Box::new(DedupReader::new(
736 merge_reader,
737 LastNonNull::new(false),
738 None,
739 )) as _,
740 }
741 };
742 Source::Reader(maybe_dedup)
743 };
744 Ok(source)
745}
746
747struct FlatSources {
748 sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
749 encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
750}
751
752fn memtable_flat_sources(
754 schema: SchemaRef,
755 mem_ranges: MemtableRanges,
756 options: &RegionOptions,
757 field_column_start: usize,
758) -> Result<FlatSources> {
759 let MemtableRanges { ranges } = mem_ranges;
760 let mut flat_sources = FlatSources {
761 sources: SmallVec::new(),
762 encoded: SmallVec::new(),
763 };
764
765 if ranges.len() == 1 {
766 let only_range = ranges.into_values().next().unwrap();
767 let max_sequence = only_range.stats().max_sequence();
768 if let Some(encoded) = only_range.encoded() {
769 flat_sources.encoded.push((encoded, max_sequence));
770 } else {
771 let iter = only_range.build_record_batch_iter(None)?;
772 let iter = maybe_dedup_one(
775 options.append_mode,
776 options.merge_mode(),
777 field_column_start,
778 iter,
779 );
780 flat_sources
781 .sources
782 .push((FlatSource::Iter(iter), max_sequence));
783 };
784 } else {
785 let total_rows: usize = ranges.values().map(|r| r.stats().num_rows()).sum();
787 let min_flush_rows = total_rows / 8;
788 let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
789 let mut last_iter_rows = 0;
790 let num_ranges = ranges.len();
791 let mut input_iters = Vec::with_capacity(num_ranges);
792 let mut current_ranges = Vec::new();
793 for (_range_id, range) in ranges {
794 if let Some(encoded) = range.encoded() {
795 let max_sequence = range.stats().max_sequence();
796 flat_sources.encoded.push((encoded, max_sequence));
797 continue;
798 }
799
800 let iter = range.build_record_batch_iter(None)?;
801 input_iters.push(iter);
802 last_iter_rows += range.num_rows();
803 current_ranges.push(range);
804
805 if last_iter_rows > min_flush_rows {
806 let max_sequence = current_ranges
808 .iter()
809 .map(|r| r.stats().max_sequence())
810 .max()
811 .unwrap_or(0);
812
813 let maybe_dedup = merge_and_dedup(
814 &schema,
815 options.append_mode,
816 options.merge_mode(),
817 field_column_start,
818 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
819 )?;
820
821 flat_sources
822 .sources
823 .push((FlatSource::Iter(maybe_dedup), max_sequence));
824 last_iter_rows = 0;
825 current_ranges.clear();
826 }
827 }
828
829 if !input_iters.is_empty() {
831 let max_sequence = current_ranges
832 .iter()
833 .map(|r| r.stats().max_sequence())
834 .max()
835 .unwrap_or(0);
836
837 let maybe_dedup = merge_and_dedup(
838 &schema,
839 options.append_mode,
840 options.merge_mode(),
841 field_column_start,
842 input_iters,
843 )?;
844
845 flat_sources
846 .sources
847 .push((FlatSource::Iter(maybe_dedup), max_sequence));
848 }
849 }
850
851 Ok(flat_sources)
852}
853
854pub fn merge_and_dedup(
899 schema: &SchemaRef,
900 append_mode: bool,
901 merge_mode: MergeMode,
902 field_column_start: usize,
903 input_iters: Vec<BoxedRecordBatchIterator>,
904) -> Result<BoxedRecordBatchIterator> {
905 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
906 let maybe_dedup = if append_mode {
907 Box::new(merge_iter) as _
909 } else {
910 match merge_mode {
912 MergeMode::LastRow => {
913 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
914 }
915 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
916 merge_iter,
917 FlatLastNonNull::new(field_column_start, false),
918 )) as _,
919 }
920 };
921 Ok(maybe_dedup)
922}
923
924pub fn maybe_dedup_one(
925 append_mode: bool,
926 merge_mode: MergeMode,
927 field_column_start: usize,
928 input_iter: BoxedRecordBatchIterator,
929) -> BoxedRecordBatchIterator {
930 if append_mode {
931 input_iter
933 } else {
934 match merge_mode {
936 MergeMode::LastRow => {
937 Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
938 }
939 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
940 input_iter,
941 FlatLastNonNull::new(field_column_start, false),
942 )),
943 }
944 }
945}
946
947pub(crate) struct FlushScheduler {
949 region_status: HashMap<RegionId, FlushStatus>,
951 scheduler: SchedulerRef,
953}
954
955impl FlushScheduler {
956 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
958 FlushScheduler {
959 region_status: HashMap::new(),
960 scheduler,
961 }
962 }
963
964 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
966 self.region_status.contains_key(®ion_id)
967 }
968
969 fn schedule_flush_task(
970 &mut self,
971 version_control: &VersionControlRef,
972 task: RegionFlushTask,
973 ) -> Result<()> {
974 let region_id = task.region_id;
975
976 if let Err(e) = version_control.freeze_mutable() {
978 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
979
980 return Err(e);
981 }
982 let job = task.into_flush_job(version_control);
984 if let Err(e) = self.scheduler.schedule(job) {
985 error!(e; "Failed to schedule flush job for region {}", region_id);
988
989 return Err(e);
990 }
991 Ok(())
992 }
993
994 pub(crate) fn schedule_flush(
996 &mut self,
997 region_id: RegionId,
998 version_control: &VersionControlRef,
999 task: RegionFlushTask,
1000 ) -> Result<()> {
1001 debug_assert_eq!(region_id, task.region_id);
1002
1003 let version = version_control.current().version;
1004 if version.memtables.is_empty() {
1005 debug_assert!(!self.region_status.contains_key(®ion_id));
1006 task.on_success();
1008 return Ok(());
1009 }
1010
1011 FLUSH_REQUESTS_TOTAL
1013 .with_label_values(&[task.reason.as_str()])
1014 .inc();
1015
1016 if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
1018 debug!("Merging flush task for region {}", region_id);
1020 flush_status.merge_task(task);
1021 return Ok(());
1022 }
1023
1024 self.schedule_flush_task(version_control, task)?;
1025
1026 let _ = self.region_status.insert(
1028 region_id,
1029 FlushStatus::new(region_id, version_control.clone()),
1030 );
1031
1032 Ok(())
1033 }
1034
1035 pub(crate) fn on_flush_success(
1039 &mut self,
1040 region_id: RegionId,
1041 ) -> Option<(
1042 Vec<SenderDdlRequest>,
1043 Vec<SenderWriteRequest>,
1044 Vec<SenderBulkRequest>,
1045 )> {
1046 let flush_status = self.region_status.get_mut(®ion_id)?;
1047 if flush_status.pending_task.is_none() {
1049 debug!(
1052 "Region {} doesn't have any pending flush task, removing it from the status",
1053 region_id
1054 );
1055 let flush_status = self.region_status.remove(®ion_id).unwrap();
1056 return Some((
1057 flush_status.pending_ddls,
1058 flush_status.pending_writes,
1059 flush_status.pending_bulk_writes,
1060 ));
1061 }
1062
1063 let version_data = flush_status.version_control.current();
1065 if version_data.version.memtables.is_empty() {
1066 let task = flush_status.pending_task.take().unwrap();
1069 task.on_success();
1071 debug!(
1072 "Region {} has nothing to flush, removing it from the status",
1073 region_id
1074 );
1075 let flush_status = self.region_status.remove(®ion_id).unwrap();
1077 return Some((
1078 flush_status.pending_ddls,
1079 flush_status.pending_writes,
1080 flush_status.pending_bulk_writes,
1081 ));
1082 }
1083
1084 debug!("Scheduling pending flush task for region {}", region_id);
1086 let task = flush_status.pending_task.take().unwrap();
1088 let version_control = flush_status.version_control.clone();
1089 if let Err(err) = self.schedule_flush_task(&version_control, task) {
1090 error!(
1091 err;
1092 "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1093 );
1094 }
1095 None
1097 }
1098
1099 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1101 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1102
1103 FLUSH_FAILURE_TOTAL.inc();
1104
1105 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1107 return;
1108 };
1109
1110 flush_status.on_failure(err);
1112 }
1113
1114 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1116 self.remove_region_on_failure(
1117 region_id,
1118 Arc::new(RegionDroppedSnafu { region_id }.build()),
1119 );
1120 }
1121
1122 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1124 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1125 }
1126
1127 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1129 self.remove_region_on_failure(
1130 region_id,
1131 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1132 );
1133 }
1134
1135 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1136 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1138 return;
1139 };
1140
1141 flush_status.on_failure(err);
1143 }
1144
1145 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1150 let status = self.region_status.get_mut(&request.region_id).unwrap();
1151 status.pending_ddls.push(request);
1152 }
1153
1154 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1159 let status = self
1160 .region_status
1161 .get_mut(&request.request.region_id)
1162 .unwrap();
1163 status.pending_writes.push(request);
1164 }
1165
1166 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1171 let status = self.region_status.get_mut(&request.region_id).unwrap();
1172 status.pending_bulk_writes.push(request);
1173 }
1174
1175 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1177 self.region_status
1178 .get(®ion_id)
1179 .map(|status| !status.pending_ddls.is_empty())
1180 .unwrap_or(false)
1181 }
1182}
1183
1184impl Drop for FlushScheduler {
1185 fn drop(&mut self) {
1186 for (region_id, flush_status) in self.region_status.drain() {
1187 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1189 }
1190 }
1191}
1192
1193struct FlushStatus {
1197 region_id: RegionId,
1199 version_control: VersionControlRef,
1201 pending_task: Option<RegionFlushTask>,
1203 pending_ddls: Vec<SenderDdlRequest>,
1205 pending_writes: Vec<SenderWriteRequest>,
1207 pending_bulk_writes: Vec<SenderBulkRequest>,
1209}
1210
1211impl FlushStatus {
1212 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1213 FlushStatus {
1214 region_id,
1215 version_control,
1216 pending_task: None,
1217 pending_ddls: Vec::new(),
1218 pending_writes: Vec::new(),
1219 pending_bulk_writes: Vec::new(),
1220 }
1221 }
1222
1223 fn merge_task(&mut self, task: RegionFlushTask) {
1225 if let Some(pending) = &mut self.pending_task {
1226 pending.merge(task);
1227 } else {
1228 self.pending_task = Some(task);
1229 }
1230 }
1231
1232 fn on_failure(self, err: Arc<Error>) {
1233 if let Some(mut task) = self.pending_task {
1234 task.on_failure(err.clone());
1235 }
1236 for ddl in self.pending_ddls {
1237 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1238 region_id: self.region_id,
1239 }));
1240 }
1241 for write_req in self.pending_writes {
1242 write_req
1243 .sender
1244 .send(Err(err.clone()).context(FlushRegionSnafu {
1245 region_id: self.region_id,
1246 }));
1247 }
1248 }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253 use mito_codec::row_converter::build_primary_key_codec;
1254 use tokio::sync::oneshot;
1255
1256 use super::*;
1257 use crate::cache::CacheManager;
1258 use crate::memtable::bulk::part::BulkPartConverter;
1259 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1260 use crate::memtable::{Memtable, RangesOptions};
1261 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1262 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1263 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1264 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1265
1266 #[test]
1267 fn test_get_mutable_limit() {
1268 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1269 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1270 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1271 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1272 }
1273
1274 #[test]
1275 fn test_over_mutable_limit() {
1276 let manager = WriteBufferManagerImpl::new(1000);
1278 manager.reserve_mem(400);
1279 assert!(!manager.should_flush_engine());
1280 assert!(!manager.should_stall());
1281
1282 manager.reserve_mem(400);
1284 assert!(manager.should_flush_engine());
1285
1286 manager.schedule_free_mem(400);
1288 assert!(!manager.should_flush_engine());
1289 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1290 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1291
1292 manager.free_mem(400);
1294 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1295 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1296 }
1297
1298 #[test]
1299 fn test_over_global() {
1300 let manager = WriteBufferManagerImpl::new(1000);
1302 manager.reserve_mem(1100);
1303 assert!(manager.should_stall());
1304 manager.schedule_free_mem(200);
1306 assert!(manager.should_flush_engine());
1307 assert!(manager.should_stall());
1308
1309 manager.schedule_free_mem(450);
1311 assert!(manager.should_flush_engine());
1312 assert!(manager.should_stall());
1313
1314 manager.reserve_mem(50);
1316 assert!(manager.should_flush_engine());
1317 manager.reserve_mem(100);
1318 assert!(manager.should_flush_engine());
1319 }
1320
1321 #[test]
1322 fn test_manager_notify() {
1323 let (sender, receiver) = watch::channel(());
1324 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1325 manager.reserve_mem(500);
1326 assert!(!receiver.has_changed().unwrap());
1327 manager.schedule_free_mem(500);
1328 assert!(!receiver.has_changed().unwrap());
1329 manager.free_mem(500);
1330 assert!(receiver.has_changed().unwrap());
1331 }
1332
1333 #[tokio::test]
1334 async fn test_schedule_empty() {
1335 let env = SchedulerEnv::new().await;
1336 let (tx, _rx) = mpsc::channel(4);
1337 let mut scheduler = env.mock_flush_scheduler();
1338 let builder = VersionControlBuilder::new();
1339
1340 let version_control = Arc::new(builder.build());
1341 let (output_tx, output_rx) = oneshot::channel();
1342 let mut task = RegionFlushTask {
1343 region_id: builder.region_id(),
1344 reason: FlushReason::Others,
1345 senders: Vec::new(),
1346 request_sender: tx,
1347 access_layer: env.access_layer.clone(),
1348 listener: WorkerListener::default(),
1349 engine_config: Arc::new(MitoConfig::default()),
1350 row_group_size: None,
1351 cache_manager: Arc::new(CacheManager::default()),
1352 manifest_ctx: env
1353 .mock_manifest_context(version_control.current().version.metadata.clone())
1354 .await,
1355 index_options: IndexOptions::default(),
1356 flush_semaphore: Arc::new(Semaphore::new(2)),
1357 is_staging: false,
1358 partition_expr: None,
1359 };
1360 task.push_sender(OptionOutputTx::from(output_tx));
1361 scheduler
1362 .schedule_flush(builder.region_id(), &version_control, task)
1363 .unwrap();
1364 assert!(scheduler.region_status.is_empty());
1365 let output = output_rx.await.unwrap().unwrap();
1366 assert_eq!(output, 0);
1367 assert!(scheduler.region_status.is_empty());
1368 }
1369
1370 #[tokio::test]
1371 async fn test_schedule_pending_request() {
1372 let job_scheduler = Arc::new(VecScheduler::default());
1373 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1374 let (tx, _rx) = mpsc::channel(4);
1375 let mut scheduler = env.mock_flush_scheduler();
1376 let mut builder = VersionControlBuilder::new();
1377 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1379 let version_control = Arc::new(builder.build());
1380 let version_data = version_control.current();
1382 write_rows_to_version(&version_data.version, "host0", 0, 10);
1383 let manifest_ctx = env
1384 .mock_manifest_context(version_data.version.metadata.clone())
1385 .await;
1386 let mut tasks: Vec<_> = (0..3)
1388 .map(|_| RegionFlushTask {
1389 region_id: builder.region_id(),
1390 reason: FlushReason::Others,
1391 senders: Vec::new(),
1392 request_sender: tx.clone(),
1393 access_layer: env.access_layer.clone(),
1394 listener: WorkerListener::default(),
1395 engine_config: Arc::new(MitoConfig::default()),
1396 row_group_size: None,
1397 cache_manager: Arc::new(CacheManager::default()),
1398 manifest_ctx: manifest_ctx.clone(),
1399 index_options: IndexOptions::default(),
1400 flush_semaphore: Arc::new(Semaphore::new(2)),
1401 is_staging: false,
1402 partition_expr: None,
1403 })
1404 .collect();
1405 let task = tasks.pop().unwrap();
1407 scheduler
1408 .schedule_flush(builder.region_id(), &version_control, task)
1409 .unwrap();
1410 assert_eq!(1, scheduler.region_status.len());
1412 assert_eq!(1, job_scheduler.num_jobs());
1413 let version_data = version_control.current();
1415 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1416 let output_rxs: Vec<_> = tasks
1418 .into_iter()
1419 .map(|mut task| {
1420 let (output_tx, output_rx) = oneshot::channel();
1421 task.push_sender(OptionOutputTx::from(output_tx));
1422 scheduler
1423 .schedule_flush(builder.region_id(), &version_control, task)
1424 .unwrap();
1425 output_rx
1426 })
1427 .collect();
1428 version_control.apply_edit(
1430 Some(RegionEdit {
1431 files_to_add: Vec::new(),
1432 files_to_remove: Vec::new(),
1433 timestamp_ms: None,
1434 compaction_time_window: None,
1435 flushed_entry_id: None,
1436 flushed_sequence: None,
1437 committed_sequence: None,
1438 }),
1439 &[0],
1440 builder.file_purger(),
1441 );
1442 scheduler.on_flush_success(builder.region_id());
1443 assert_eq!(1, job_scheduler.num_jobs());
1445 assert!(scheduler.region_status.is_empty());
1447 for output_rx in output_rxs {
1448 let output = output_rx.await.unwrap().unwrap();
1449 assert_eq!(output, 0);
1450 }
1451 }
1452
1453 #[test]
1455 fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1456 let metadata = metadata_for_test();
1458 let schema = to_flat_sst_arrow_schema(
1459 &metadata,
1460 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1461 );
1462
1463 let capacity = 16;
1466 let pk_codec = build_primary_key_codec(&metadata);
1467 let mut converter =
1468 BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1469 let kvs = build_key_values_with_ts_seq_values(
1470 &metadata,
1471 "dup_key".to_string(),
1472 1,
1473 vec![1000i64, 1000i64].into_iter(),
1474 vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1475 1,
1476 );
1477 converter.append_key_values(&kvs).unwrap();
1478 let part = converter.convert().unwrap();
1479
1480 let build_ranges = |append_mode: bool| -> MemtableRanges {
1483 let memtable = crate::memtable::bulk::BulkMemtable::new(
1484 1,
1485 metadata.clone(),
1486 None,
1487 None,
1488 append_mode,
1489 MergeMode::LastRow,
1490 );
1491 memtable.write_bulk(part.clone()).unwrap();
1492 memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1493 };
1494
1495 {
1497 let mem_ranges = build_ranges(false);
1498 assert_eq!(1, mem_ranges.ranges.len());
1499
1500 let options = RegionOptions {
1501 append_mode: false,
1502 merge_mode: Some(MergeMode::LastRow),
1503 ..Default::default()
1504 };
1505
1506 let flat_sources = memtable_flat_sources(
1507 schema.clone(),
1508 mem_ranges,
1509 &options,
1510 metadata.primary_key.len(),
1511 )
1512 .unwrap();
1513 assert!(flat_sources.encoded.is_empty());
1514 assert_eq!(1, flat_sources.sources.len());
1515
1516 let mut total_rows = 0usize;
1518 for (source, _sequence) in flat_sources.sources {
1519 match source {
1520 crate::read::FlatSource::Iter(iter) => {
1521 for rb in iter {
1522 total_rows += rb.unwrap().num_rows();
1523 }
1524 }
1525 crate::read::FlatSource::Stream(_) => unreachable!(),
1526 }
1527 }
1528 assert_eq!(1, total_rows, "dedup should keep a single row");
1529 }
1530
1531 {
1533 let mem_ranges = build_ranges(true);
1534 assert_eq!(1, mem_ranges.ranges.len());
1535
1536 let options = RegionOptions {
1537 append_mode: true,
1538 ..Default::default()
1539 };
1540
1541 let flat_sources =
1542 memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1543 .unwrap();
1544 assert!(flat_sources.encoded.is_empty());
1545 assert_eq!(1, flat_sources.sources.len());
1546
1547 let mut total_rows = 0usize;
1548 for (source, _sequence) in flat_sources.sources {
1549 match source {
1550 crate::read::FlatSource::Iter(iter) => {
1551 for rb in iter {
1552 total_rows += rb.unwrap().num_rows();
1553 }
1554 }
1555 crate::read::FlatSource::Stream(_) => unreachable!(),
1556 }
1557 }
1558 assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1559 }
1560 }
1561
1562 #[tokio::test]
1563 async fn test_schedule_pending_request_on_flush_success() {
1564 common_telemetry::init_default_ut_logging();
1565 let job_scheduler = Arc::new(VecScheduler::default());
1566 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1567 let (tx, _rx) = mpsc::channel(4);
1568 let mut scheduler = env.mock_flush_scheduler();
1569 let mut builder = VersionControlBuilder::new();
1570 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1572 let version_control = Arc::new(builder.build());
1573 let version_data = version_control.current();
1575 write_rows_to_version(&version_data.version, "host0", 0, 10);
1576 let manifest_ctx = env
1577 .mock_manifest_context(version_data.version.metadata.clone())
1578 .await;
1579 let mut tasks: Vec<_> = (0..2)
1581 .map(|_| RegionFlushTask {
1582 region_id: builder.region_id(),
1583 reason: FlushReason::Others,
1584 senders: Vec::new(),
1585 request_sender: tx.clone(),
1586 access_layer: env.access_layer.clone(),
1587 listener: WorkerListener::default(),
1588 engine_config: Arc::new(MitoConfig::default()),
1589 row_group_size: None,
1590 cache_manager: Arc::new(CacheManager::default()),
1591 manifest_ctx: manifest_ctx.clone(),
1592 index_options: IndexOptions::default(),
1593 flush_semaphore: Arc::new(Semaphore::new(2)),
1594 is_staging: false,
1595 partition_expr: None,
1596 })
1597 .collect();
1598 let task = tasks.pop().unwrap();
1600 scheduler
1601 .schedule_flush(builder.region_id(), &version_control, task)
1602 .unwrap();
1603 assert_eq!(1, scheduler.region_status.len());
1605 assert_eq!(1, job_scheduler.num_jobs());
1606 let task = tasks.pop().unwrap();
1608 scheduler
1609 .schedule_flush(builder.region_id(), &version_control, task)
1610 .unwrap();
1611 assert!(
1612 scheduler
1613 .region_status
1614 .get(&builder.region_id())
1615 .unwrap()
1616 .pending_task
1617 .is_some()
1618 );
1619
1620 let version_data = version_control.current();
1622 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1623 version_control.apply_edit(
1625 Some(RegionEdit {
1626 files_to_add: Vec::new(),
1627 files_to_remove: Vec::new(),
1628 timestamp_ms: None,
1629 compaction_time_window: None,
1630 flushed_entry_id: None,
1631 flushed_sequence: None,
1632 committed_sequence: None,
1633 }),
1634 &[0],
1635 builder.file_purger(),
1636 );
1637 write_rows_to_version(&version_data.version, "host1", 0, 10);
1638 scheduler.on_flush_success(builder.region_id());
1639 assert_eq!(2, job_scheduler.num_jobs());
1640 assert!(
1642 scheduler
1643 .region_status
1644 .get(&builder.region_id())
1645 .unwrap()
1646 .pending_task
1647 .is_none()
1648 );
1649 }
1650}