1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::{RegionId, SequenceNumber};
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39 Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
40 RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
44use crate::memtable::{
45 BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
46};
47use crate::metrics::{
48 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
49 INFLIGHT_FLUSH_COUNT,
50};
51use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
52use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
53use crate::read::flat_merge::FlatMergeIterator;
54use crate::read::merge::MergeReaderBuilder;
55use crate::read::{FlatSource, Source};
56use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
57use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
58use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
59use crate::request::{
60 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
61 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
62};
63use crate::schedule::scheduler::{Job, SchedulerRef};
64use crate::sst::file::FileMeta;
65use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
66use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
67use crate::worker::WorkerListener;
68
69pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
73 fn should_flush_engine(&self) -> bool;
75
76 fn should_stall(&self) -> bool;
78
79 fn reserve_mem(&self, mem: usize);
81
82 fn schedule_free_mem(&self, mem: usize);
87
88 fn free_mem(&self, mem: usize);
90
91 fn memory_usage(&self) -> usize;
93
94 fn flush_limit(&self) -> usize;
99}
100
101pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
102
103#[derive(Debug)]
108pub struct WriteBufferManagerImpl {
109 global_write_buffer_size: usize,
111 mutable_limit: usize,
113 memory_used: AtomicUsize,
115 memory_active: AtomicUsize,
117 notifier: Option<watch::Sender<()>>,
120}
121
122impl WriteBufferManagerImpl {
123 pub fn new(global_write_buffer_size: usize) -> Self {
125 Self {
126 global_write_buffer_size,
127 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
128 memory_used: AtomicUsize::new(0),
129 memory_active: AtomicUsize::new(0),
130 notifier: None,
131 }
132 }
133
134 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
136 self.notifier = Some(notifier);
137 self
138 }
139
140 pub fn mutable_usage(&self) -> usize {
142 self.memory_active.load(Ordering::Relaxed)
143 }
144
145 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
147 global_write_buffer_size / 2
149 }
150}
151
152impl WriteBufferManager for WriteBufferManagerImpl {
153 fn should_flush_engine(&self) -> bool {
154 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
155 if mutable_memtable_memory_usage >= self.mutable_limit {
156 debug!(
157 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
158 mutable_memtable_memory_usage,
159 self.memory_usage(),
160 self.mutable_limit,
161 self.global_write_buffer_size,
162 );
163 return true;
164 }
165
166 let memory_usage = self.memory_used.load(Ordering::Relaxed);
167 if memory_usage >= self.global_write_buffer_size {
168 return true;
169 }
170
171 false
172 }
173
174 fn should_stall(&self) -> bool {
175 self.memory_usage() >= self.global_write_buffer_size
176 }
177
178 fn reserve_mem(&self, mem: usize) {
179 self.memory_used.fetch_add(mem, Ordering::Relaxed);
180 self.memory_active.fetch_add(mem, Ordering::Relaxed);
181 }
182
183 fn schedule_free_mem(&self, mem: usize) {
184 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
185 }
186
187 fn free_mem(&self, mem: usize) {
188 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
189 if let Some(notifier) = &self.notifier {
190 let _ = notifier.send(());
194 }
195 }
196
197 fn memory_usage(&self) -> usize {
198 self.memory_used.load(Ordering::Relaxed)
199 }
200
201 fn flush_limit(&self) -> usize {
202 self.mutable_limit
203 }
204}
205
206#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
208pub enum FlushReason {
209 Others,
211 EngineFull,
213 Manual,
215 Alter,
217 Periodically,
219 Downgrading,
221 EnterStaging,
223 Closing,
225}
226
227impl FlushReason {
228 fn as_str(&self) -> &'static str {
230 self.into()
231 }
232}
233
234pub(crate) struct RegionFlushTask {
236 pub(crate) region_id: RegionId,
238 pub(crate) reason: FlushReason,
240 pub(crate) senders: Vec<OutputTx>,
242 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
244
245 pub(crate) access_layer: AccessLayerRef,
246 pub(crate) listener: WorkerListener,
247 pub(crate) engine_config: Arc<MitoConfig>,
248 pub(crate) row_group_size: Option<usize>,
249 pub(crate) cache_manager: CacheManagerRef,
250 pub(crate) manifest_ctx: ManifestContextRef,
251
252 pub(crate) index_options: IndexOptions,
254 pub(crate) flush_semaphore: Arc<Semaphore>,
256 pub(crate) is_staging: bool,
258 pub(crate) partition_expr: Option<String>,
262}
263
264impl RegionFlushTask {
265 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
267 if let Some(sender) = sender.take_inner() {
268 self.senders.push(sender);
269 }
270 }
271
272 fn on_success(self) {
274 for sender in self.senders {
275 sender.send(Ok(0));
276 }
277 }
278
279 fn on_failure(&mut self, err: Arc<Error>) {
281 for sender in self.senders.drain(..) {
282 sender.send(Err(err.clone()).context(FlushRegionSnafu {
283 region_id: self.region_id,
284 }));
285 }
286 }
287
288 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
292 let version_data = version_control.current();
295
296 Box::pin(async move {
297 INFLIGHT_FLUSH_COUNT.inc();
298 self.do_flush(version_data).await;
299 INFLIGHT_FLUSH_COUNT.dec();
300 })
301 }
302
303 async fn do_flush(&mut self, version_data: VersionControlData) {
305 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
306 self.listener.on_flush_begin(self.region_id).await;
307
308 let worker_request = match self.flush_memtables(&version_data).await {
309 Ok(edit) => {
310 let memtables_to_remove = version_data
311 .version
312 .memtables
313 .immutables()
314 .iter()
315 .map(|m| m.id())
316 .collect();
317 let flush_finished = FlushFinished {
318 region_id: self.region_id,
319 flushed_entry_id: version_data.last_entry_id,
321 senders: std::mem::take(&mut self.senders),
322 _timer: timer,
323 edit,
324 memtables_to_remove,
325 is_staging: self.is_staging,
326 flush_reason: self.reason,
327 };
328 WorkerRequest::Background {
329 region_id: self.region_id,
330 notify: BackgroundNotify::FlushFinished(flush_finished),
331 }
332 }
333 Err(e) => {
334 error!(e; "Failed to flush region {}", self.region_id);
335 timer.stop_and_discard();
337
338 let err = Arc::new(e);
339 self.on_failure(err.clone());
340 WorkerRequest::Background {
341 region_id: self.region_id,
342 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
343 }
344 }
345 };
346 self.send_worker_request(worker_request).await;
347 }
348
349 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
352 let version = &version_data.version;
355 let timer = FLUSH_ELAPSED
356 .with_label_values(&["flush_memtables"])
357 .start_timer();
358
359 let mut write_opts = WriteOptions {
360 write_buffer_size: self.engine_config.sst_write_buffer_size,
361 ..Default::default()
362 };
363 if let Some(row_group_size) = self.row_group_size {
364 write_opts.row_group_size = row_group_size;
365 }
366
367 let DoFlushMemtablesResult {
368 file_metas,
369 flushed_bytes,
370 series_count,
371 encoded_part_count,
372 flush_metrics,
373 } = self.do_flush_memtables(version, write_opts).await?;
374
375 if !file_metas.is_empty() {
376 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
377 }
378
379 let mut file_ids = Vec::with_capacity(file_metas.len());
380 let mut total_rows = 0;
381 let mut total_bytes = 0;
382 for meta in &file_metas {
383 file_ids.push(meta.file_id);
384 total_rows += meta.num_rows;
385 total_bytes += meta.file_size;
386 }
387 info!(
388 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
389 self.region_id,
390 self.reason.as_str(),
391 file_ids,
392 series_count,
393 total_rows,
394 total_bytes,
395 timer.stop_and_record(),
396 encoded_part_count,
397 flush_metrics,
398 );
399 flush_metrics.observe();
400
401 let edit = RegionEdit {
402 files_to_add: file_metas,
403 files_to_remove: Vec::new(),
404 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
405 compaction_time_window: None,
406 flushed_entry_id: Some(version_data.last_entry_id),
408 flushed_sequence: Some(version_data.committed_sequence),
409 committed_sequence: None,
410 };
411 info!(
412 "Applying {edit:?} to region {}, is_staging: {}",
413 self.region_id, self.is_staging
414 );
415
416 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
417
418 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
419 RegionLeaderState::Downgrading
420 } else {
421 let current_state = self.manifest_ctx.current_state();
423 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
424 RegionLeaderState::Staging
425 } else {
426 RegionLeaderState::Writable
427 }
428 };
429 let version = self
432 .manifest_ctx
433 .update_manifest(expected_state, action_list, self.is_staging)
434 .await?;
435 info!(
436 "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
437 self.region_id,
438 self.is_staging,
439 self.reason.as_str()
440 );
441
442 Ok(edit)
443 }
444
445 async fn do_flush_memtables(
446 &self,
447 version: &VersionRef,
448 write_opts: WriteOptions,
449 ) -> Result<DoFlushMemtablesResult> {
450 let memtables = version.memtables.immutables();
451 let mut file_metas = Vec::with_capacity(memtables.len());
452 let mut flushed_bytes = 0;
453 let mut series_count = 0;
454 let mut encoded_part_count = 0;
455 let mut flush_metrics = Metrics::new(WriteType::Flush);
456 let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
457 for mem in memtables {
458 if mem.is_empty() {
459 continue;
461 }
462
463 let compact_start = std::time::Instant::now();
465 if let Err(e) = mem.compact(true) {
466 common_telemetry::error!(e; "Failed to compact memtable before flush");
467 }
468 let compact_cost = compact_start.elapsed();
469 flush_metrics.compact_memtable += compact_cost;
470
471 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
473 let num_mem_ranges = mem_ranges.ranges.len();
474
475 let num_mem_rows = mem_ranges.num_rows();
477 let memtable_series_count = mem_ranges.series_count();
478 let memtable_id = mem.id();
479 series_count += memtable_series_count;
482
483 if mem_ranges.is_record_batch() {
484 let flush_start = Instant::now();
485 let FlushFlatMemResult {
486 num_encoded,
487 num_sources,
488 results,
489 } = self
490 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
491 .await?;
492 encoded_part_count += num_encoded;
493 for (source_idx, result) in results.into_iter().enumerate() {
494 let (max_sequence, ssts_written, metrics) = result?;
495 if ssts_written.is_empty() {
496 continue;
498 }
499
500 common_telemetry::debug!(
501 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
502 self.region_id,
503 memtable_id,
504 source_idx,
505 num_sources,
506 metrics
507 );
508
509 flush_metrics = flush_metrics.merge(metrics);
510
511 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
512 flushed_bytes += sst_info.file_size;
513 Self::new_file_meta(
514 self.region_id,
515 max_sequence,
516 sst_info,
517 partition_expr.clone(),
518 )
519 }));
520 }
521
522 common_telemetry::debug!(
523 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
524 self.region_id,
525 num_sources,
526 memtable_id,
527 num_mem_ranges,
528 num_encoded,
529 num_mem_rows,
530 flush_start.elapsed(),
531 compact_cost,
532 );
533 } else {
534 let max_sequence = mem_ranges.max_sequence();
535 let source = memtable_source(mem_ranges, &version.options).await?;
536
537 let source = Either::Left(source);
539 let write_request = self.new_write_request(version, max_sequence, source);
540
541 let mut metrics = Metrics::new(WriteType::Flush);
542 let ssts_written = self
543 .access_layer
544 .write_sst(write_request, &write_opts, &mut metrics)
545 .await?;
546 FLUSH_FILE_TOTAL.inc_by(ssts_written.len() as u64);
547 if ssts_written.is_empty() {
548 continue;
550 }
551
552 debug!(
553 "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
554 self.region_id, num_mem_ranges, num_mem_rows, metrics
555 );
556
557 flush_metrics = flush_metrics.merge(metrics);
558
559 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
560 flushed_bytes += sst_info.file_size;
561 Self::new_file_meta(
562 self.region_id,
563 max_sequence,
564 sst_info,
565 partition_expr.clone(),
566 )
567 }));
568 };
569 }
570
571 Ok(DoFlushMemtablesResult {
572 file_metas,
573 flushed_bytes,
574 series_count,
575 encoded_part_count,
576 flush_metrics,
577 })
578 }
579
580 async fn flush_flat_mem_ranges(
581 &self,
582 version: &VersionRef,
583 write_opts: &WriteOptions,
584 mem_ranges: MemtableRanges,
585 ) -> Result<FlushFlatMemResult> {
586 let batch_schema = to_flat_sst_arrow_schema(
587 &version.metadata,
588 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
589 );
590 let flat_sources = memtable_flat_sources(
591 batch_schema,
592 mem_ranges,
593 &version.options,
594 version.metadata.primary_key.len(),
595 )?;
596 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
597 let num_encoded = flat_sources.encoded.len();
598 for (source, max_sequence) in flat_sources.sources {
599 let source = Either::Right(source);
600 let write_request = self.new_write_request(version, max_sequence, source);
601 let access_layer = self.access_layer.clone();
602 let write_opts = write_opts.clone();
603 let semaphore = self.flush_semaphore.clone();
604 let task = common_runtime::spawn_global(async move {
605 let _permit = semaphore.acquire().await.unwrap();
606 let mut metrics = Metrics::new(WriteType::Flush);
607 let ssts = access_layer
608 .write_sst(write_request, &write_opts, &mut metrics)
609 .await?;
610 FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
611 Ok((max_sequence, ssts, metrics))
612 });
613 tasks.push(task);
614 }
615 for (encoded, max_sequence) in flat_sources.encoded {
616 let access_layer = self.access_layer.clone();
617 let cache_manager = self.cache_manager.clone();
618 let region_id = version.metadata.region_id;
619 let semaphore = self.flush_semaphore.clone();
620 let task = common_runtime::spawn_global(async move {
621 let _permit = semaphore.acquire().await.unwrap();
622 let metrics = access_layer
623 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
624 .await?;
625 FLUSH_FILE_TOTAL.inc();
626 Ok((max_sequence, smallvec![encoded.sst_info], metrics))
627 });
628 tasks.push(task);
629 }
630 let num_sources = tasks.len();
631 let results = futures::future::try_join_all(tasks)
632 .await
633 .context(JoinSnafu)?;
634 Ok(FlushFlatMemResult {
635 num_encoded,
636 num_sources,
637 results,
638 })
639 }
640
641 fn new_file_meta(
642 region_id: RegionId,
643 max_sequence: u64,
644 sst_info: SstInfo,
645 partition_expr: Option<PartitionExpr>,
646 ) -> FileMeta {
647 FileMeta {
648 region_id,
649 file_id: sst_info.file_id,
650 time_range: sst_info.time_range,
651 level: 0,
652 file_size: sst_info.file_size,
653 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
654 available_indexes: sst_info.index_metadata.build_available_indexes(),
655 indexes: sst_info.index_metadata.build_indexes(),
656 index_file_size: sst_info.index_metadata.file_size,
657 index_version: 0,
658 num_rows: sst_info.num_rows as u64,
659 num_row_groups: sst_info.num_row_groups,
660 sequence: NonZeroU64::new(max_sequence),
661 partition_expr,
662 num_series: sst_info.num_series,
663 }
664 }
665
666 fn new_write_request(
667 &self,
668 version: &VersionRef,
669 max_sequence: u64,
670 source: Either<Source, FlatSource>,
671 ) -> SstWriteRequest {
672 SstWriteRequest {
673 op_type: OperationType::Flush,
674 metadata: version.metadata.clone(),
675 source,
676 cache_manager: self.cache_manager.clone(),
677 storage: version.options.storage.clone(),
678 max_sequence: Some(max_sequence),
679 index_options: self.index_options.clone(),
680 index_config: self.engine_config.index.clone(),
681 inverted_index_config: self.engine_config.inverted_index.clone(),
682 fulltext_index_config: self.engine_config.fulltext_index.clone(),
683 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
684 #[cfg(feature = "vector_index")]
685 vector_index_config: self.engine_config.vector_index.clone(),
686 }
687 }
688
689 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
691 if let Err(e) = self
692 .request_sender
693 .send(WorkerRequestWithTime::new(request))
694 .await
695 {
696 error!(
697 "Failed to notify flush job status for region {}, request: {:?}",
698 self.region_id, e.0
699 );
700 }
701 }
702
703 fn merge(&mut self, mut other: RegionFlushTask) {
705 assert_eq!(self.region_id, other.region_id);
706 self.senders.append(&mut other.senders);
708 }
709}
710
711struct FlushFlatMemResult {
712 num_encoded: usize,
713 num_sources: usize,
714 results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
715}
716
717struct DoFlushMemtablesResult {
718 file_metas: Vec<FileMeta>,
719 flushed_bytes: u64,
720 series_count: usize,
721 encoded_part_count: usize,
722 flush_metrics: Metrics,
723}
724
725async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
727 let source = if mem_ranges.ranges.len() == 1 {
728 let only_range = mem_ranges.ranges.into_values().next().unwrap();
729 let iter = only_range.build_iter()?;
730 Source::Iter(iter)
731 } else {
732 let sources = mem_ranges
734 .ranges
735 .into_values()
736 .map(|r| r.build_iter().map(Source::Iter))
737 .collect::<Result<Vec<_>>>()?;
738 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
739 let maybe_dedup = if options.append_mode {
740 Box::new(merge_reader) as _
742 } else {
743 match options.merge_mode.unwrap_or(MergeMode::LastRow) {
745 MergeMode::LastRow => {
746 Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
747 }
748 MergeMode::LastNonNull => Box::new(DedupReader::new(
749 merge_reader,
750 LastNonNull::new(false),
751 None,
752 )) as _,
753 }
754 };
755 Source::Reader(maybe_dedup)
756 };
757 Ok(source)
758}
759
760struct FlatSources {
761 sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
762 encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
763}
764
765fn memtable_flat_sources(
767 schema: SchemaRef,
768 mem_ranges: MemtableRanges,
769 options: &RegionOptions,
770 field_column_start: usize,
771) -> Result<FlatSources> {
772 let MemtableRanges { ranges } = mem_ranges;
773 let mut flat_sources = FlatSources {
774 sources: SmallVec::new(),
775 encoded: SmallVec::new(),
776 };
777
778 if ranges.len() == 1 {
779 debug!("Flushing single flat range");
780
781 let only_range = ranges.into_values().next().unwrap();
782 let max_sequence = only_range.stats().max_sequence();
783 if let Some(encoded) = only_range.encoded() {
784 flat_sources.encoded.push((encoded, max_sequence));
785 } else {
786 let iter = only_range.build_record_batch_iter(None)?;
787 let iter = maybe_dedup_one(
790 options.append_mode,
791 options.merge_mode(),
792 field_column_start,
793 iter,
794 );
795 flat_sources
796 .sources
797 .push((FlatSource::Iter(iter), max_sequence));
798 };
799 } else {
800 let min_flush_rows = *ENCODE_ROW_THRESHOLD;
801 let total_rows: usize = ranges
803 .values()
804 .filter(|r| r.encoded().is_none())
805 .map(|r| r.num_rows())
806 .sum();
807 debug!(
808 "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
809 total_rows,
810 min_flush_rows,
811 ranges.len()
812 );
813 let mut rows_remaining = total_rows;
814 let mut last_iter_rows = 0;
815 let num_ranges = ranges.len();
816 let mut input_iters = Vec::with_capacity(num_ranges);
817 let mut current_ranges = Vec::new();
818 for (_range_id, range) in ranges {
819 if let Some(encoded) = range.encoded() {
820 let max_sequence = range.stats().max_sequence();
821 flat_sources.encoded.push((encoded, max_sequence));
822 continue;
823 }
824
825 let iter = range.build_record_batch_iter(None)?;
826 input_iters.push(iter);
827 let range_rows = range.num_rows();
828 last_iter_rows += range_rows;
829 rows_remaining -= range_rows;
830 current_ranges.push(range);
831
832 if last_iter_rows >= min_flush_rows
835 && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
836 {
837 debug!(
838 "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
839 last_iter_rows,
840 min_flush_rows,
841 input_iters.len(),
842 rows_remaining
843 );
844
845 let max_sequence = current_ranges
847 .iter()
848 .map(|r| r.stats().max_sequence())
849 .max()
850 .unwrap_or(0);
851
852 let maybe_dedup = merge_and_dedup(
853 &schema,
854 options.append_mode,
855 options.merge_mode(),
856 field_column_start,
857 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
858 )?;
859
860 flat_sources
861 .sources
862 .push((FlatSource::Iter(maybe_dedup), max_sequence));
863 last_iter_rows = 0;
864 current_ranges.clear();
865 }
866 }
867
868 if !input_iters.is_empty() {
870 debug!(
871 "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
872 last_iter_rows,
873 min_flush_rows,
874 input_iters.len(),
875 rows_remaining
876 );
877 let max_sequence = current_ranges
878 .iter()
879 .map(|r| r.stats().max_sequence())
880 .max()
881 .unwrap_or(0);
882
883 let maybe_dedup = merge_and_dedup(
884 &schema,
885 options.append_mode,
886 options.merge_mode(),
887 field_column_start,
888 input_iters,
889 )?;
890
891 flat_sources
892 .sources
893 .push((FlatSource::Iter(maybe_dedup), max_sequence));
894 }
895 }
896
897 Ok(flat_sources)
898}
899
900pub fn merge_and_dedup(
945 schema: &SchemaRef,
946 append_mode: bool,
947 merge_mode: MergeMode,
948 field_column_start: usize,
949 input_iters: Vec<BoxedRecordBatchIterator>,
950) -> Result<BoxedRecordBatchIterator> {
951 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
952 let maybe_dedup = if append_mode {
953 Box::new(merge_iter) as _
955 } else {
956 match merge_mode {
958 MergeMode::LastRow => {
959 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
960 }
961 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
962 merge_iter,
963 FlatLastNonNull::new(field_column_start, false),
964 )) as _,
965 }
966 };
967 Ok(maybe_dedup)
968}
969
970pub fn maybe_dedup_one(
971 append_mode: bool,
972 merge_mode: MergeMode,
973 field_column_start: usize,
974 input_iter: BoxedRecordBatchIterator,
975) -> BoxedRecordBatchIterator {
976 if append_mode {
977 input_iter
979 } else {
980 match merge_mode {
982 MergeMode::LastRow => {
983 Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
984 }
985 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
986 input_iter,
987 FlatLastNonNull::new(field_column_start, false),
988 )),
989 }
990 }
991}
992
993pub(crate) struct FlushScheduler {
995 region_status: HashMap<RegionId, FlushStatus>,
997 scheduler: SchedulerRef,
999}
1000
1001impl FlushScheduler {
1002 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
1004 FlushScheduler {
1005 region_status: HashMap::new(),
1006 scheduler,
1007 }
1008 }
1009
1010 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
1012 self.region_status.contains_key(®ion_id)
1013 }
1014
1015 fn schedule_flush_task(
1016 &mut self,
1017 version_control: &VersionControlRef,
1018 task: RegionFlushTask,
1019 ) -> Result<()> {
1020 let region_id = task.region_id;
1021
1022 if let Err(e) = version_control.freeze_mutable() {
1024 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
1025
1026 return Err(e);
1027 }
1028 let job = task.into_flush_job(version_control);
1030 if let Err(e) = self.scheduler.schedule(job) {
1031 error!(e; "Failed to schedule flush job for region {}", region_id);
1034
1035 return Err(e);
1036 }
1037 Ok(())
1038 }
1039
1040 pub(crate) fn schedule_flush(
1042 &mut self,
1043 region_id: RegionId,
1044 version_control: &VersionControlRef,
1045 task: RegionFlushTask,
1046 ) -> Result<()> {
1047 debug_assert_eq!(region_id, task.region_id);
1048
1049 let version = version_control.current().version;
1050 if version.memtables.is_empty() {
1051 debug_assert!(!self.region_status.contains_key(®ion_id));
1052 task.on_success();
1054 return Ok(());
1055 }
1056
1057 FLUSH_REQUESTS_TOTAL
1059 .with_label_values(&[task.reason.as_str()])
1060 .inc();
1061
1062 if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
1064 debug!("Merging flush task for region {}", region_id);
1066 flush_status.merge_task(task);
1067 return Ok(());
1068 }
1069
1070 self.schedule_flush_task(version_control, task)?;
1071
1072 let _ = self.region_status.insert(
1074 region_id,
1075 FlushStatus::new(region_id, version_control.clone()),
1076 );
1077
1078 Ok(())
1079 }
1080
1081 pub(crate) fn on_flush_success(
1085 &mut self,
1086 region_id: RegionId,
1087 ) -> Option<(
1088 Vec<SenderDdlRequest>,
1089 Vec<SenderWriteRequest>,
1090 Vec<SenderBulkRequest>,
1091 )> {
1092 let flush_status = self.region_status.get_mut(®ion_id)?;
1093 if flush_status.pending_task.is_none() {
1095 debug!(
1098 "Region {} doesn't have any pending flush task, removing it from the status",
1099 region_id
1100 );
1101 let flush_status = self.region_status.remove(®ion_id).unwrap();
1102 return Some((
1103 flush_status.pending_ddls,
1104 flush_status.pending_writes,
1105 flush_status.pending_bulk_writes,
1106 ));
1107 }
1108
1109 let version_data = flush_status.version_control.current();
1111 if version_data.version.memtables.is_empty() {
1112 let task = flush_status.pending_task.take().unwrap();
1115 task.on_success();
1117 debug!(
1118 "Region {} has nothing to flush, removing it from the status",
1119 region_id
1120 );
1121 let flush_status = self.region_status.remove(®ion_id).unwrap();
1123 return Some((
1124 flush_status.pending_ddls,
1125 flush_status.pending_writes,
1126 flush_status.pending_bulk_writes,
1127 ));
1128 }
1129
1130 debug!("Scheduling pending flush task for region {}", region_id);
1132 let task = flush_status.pending_task.take().unwrap();
1134 let version_control = flush_status.version_control.clone();
1135 if let Err(err) = self.schedule_flush_task(&version_control, task) {
1136 error!(
1137 err;
1138 "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1139 );
1140 }
1141 None
1143 }
1144
1145 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1147 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1148
1149 FLUSH_FAILURE_TOTAL.inc();
1150
1151 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1153 return;
1154 };
1155
1156 flush_status.on_failure(err);
1158 }
1159
1160 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1162 self.remove_region_on_failure(
1163 region_id,
1164 Arc::new(RegionDroppedSnafu { region_id }.build()),
1165 );
1166 }
1167
1168 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1170 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1171 }
1172
1173 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1175 self.remove_region_on_failure(
1176 region_id,
1177 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1178 );
1179 }
1180
1181 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1182 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1184 return;
1185 };
1186
1187 flush_status.on_failure(err);
1189 }
1190
1191 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1196 let status = self.region_status.get_mut(&request.region_id).unwrap();
1197 status.pending_ddls.push(request);
1198 }
1199
1200 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1205 let status = self
1206 .region_status
1207 .get_mut(&request.request.region_id)
1208 .unwrap();
1209 status.pending_writes.push(request);
1210 }
1211
1212 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1217 let status = self.region_status.get_mut(&request.region_id).unwrap();
1218 status.pending_bulk_writes.push(request);
1219 }
1220
1221 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1223 self.region_status
1224 .get(®ion_id)
1225 .map(|status| !status.pending_ddls.is_empty())
1226 .unwrap_or(false)
1227 }
1228}
1229
1230impl Drop for FlushScheduler {
1231 fn drop(&mut self) {
1232 for (region_id, flush_status) in self.region_status.drain() {
1233 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1235 }
1236 }
1237}
1238
1239struct FlushStatus {
1243 region_id: RegionId,
1245 version_control: VersionControlRef,
1247 pending_task: Option<RegionFlushTask>,
1249 pending_ddls: Vec<SenderDdlRequest>,
1251 pending_writes: Vec<SenderWriteRequest>,
1253 pending_bulk_writes: Vec<SenderBulkRequest>,
1255}
1256
1257impl FlushStatus {
1258 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1259 FlushStatus {
1260 region_id,
1261 version_control,
1262 pending_task: None,
1263 pending_ddls: Vec::new(),
1264 pending_writes: Vec::new(),
1265 pending_bulk_writes: Vec::new(),
1266 }
1267 }
1268
1269 fn merge_task(&mut self, task: RegionFlushTask) {
1271 if let Some(pending) = &mut self.pending_task {
1272 pending.merge(task);
1273 } else {
1274 self.pending_task = Some(task);
1275 }
1276 }
1277
1278 fn on_failure(self, err: Arc<Error>) {
1279 if let Some(mut task) = self.pending_task {
1280 task.on_failure(err.clone());
1281 }
1282 for ddl in self.pending_ddls {
1283 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1284 region_id: self.region_id,
1285 }));
1286 }
1287 for write_req in self.pending_writes {
1288 write_req
1289 .sender
1290 .send(Err(err.clone()).context(FlushRegionSnafu {
1291 region_id: self.region_id,
1292 }));
1293 }
1294 }
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299 use mito_codec::row_converter::build_primary_key_codec;
1300 use tokio::sync::oneshot;
1301
1302 use super::*;
1303 use crate::cache::CacheManager;
1304 use crate::memtable::bulk::part::BulkPartConverter;
1305 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1306 use crate::memtable::{Memtable, RangesOptions};
1307 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1308 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1309 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1310 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1311
1312 #[test]
1313 fn test_get_mutable_limit() {
1314 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1315 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1316 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1317 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1318 }
1319
1320 #[test]
1321 fn test_over_mutable_limit() {
1322 let manager = WriteBufferManagerImpl::new(1000);
1324 manager.reserve_mem(400);
1325 assert!(!manager.should_flush_engine());
1326 assert!(!manager.should_stall());
1327
1328 manager.reserve_mem(400);
1330 assert!(manager.should_flush_engine());
1331
1332 manager.schedule_free_mem(400);
1334 assert!(!manager.should_flush_engine());
1335 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1336 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1337
1338 manager.free_mem(400);
1340 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1341 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1342 }
1343
1344 #[test]
1345 fn test_over_global() {
1346 let manager = WriteBufferManagerImpl::new(1000);
1348 manager.reserve_mem(1100);
1349 assert!(manager.should_stall());
1350 manager.schedule_free_mem(200);
1352 assert!(manager.should_flush_engine());
1353 assert!(manager.should_stall());
1354
1355 manager.schedule_free_mem(450);
1357 assert!(manager.should_flush_engine());
1358 assert!(manager.should_stall());
1359
1360 manager.reserve_mem(50);
1362 assert!(manager.should_flush_engine());
1363 manager.reserve_mem(100);
1364 assert!(manager.should_flush_engine());
1365 }
1366
1367 #[test]
1368 fn test_manager_notify() {
1369 let (sender, receiver) = watch::channel(());
1370 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1371 manager.reserve_mem(500);
1372 assert!(!receiver.has_changed().unwrap());
1373 manager.schedule_free_mem(500);
1374 assert!(!receiver.has_changed().unwrap());
1375 manager.free_mem(500);
1376 assert!(receiver.has_changed().unwrap());
1377 }
1378
1379 #[tokio::test]
1380 async fn test_schedule_empty() {
1381 let env = SchedulerEnv::new().await;
1382 let (tx, _rx) = mpsc::channel(4);
1383 let mut scheduler = env.mock_flush_scheduler();
1384 let builder = VersionControlBuilder::new();
1385
1386 let version_control = Arc::new(builder.build());
1387 let (output_tx, output_rx) = oneshot::channel();
1388 let mut task = RegionFlushTask {
1389 region_id: builder.region_id(),
1390 reason: FlushReason::Others,
1391 senders: Vec::new(),
1392 request_sender: tx,
1393 access_layer: env.access_layer.clone(),
1394 listener: WorkerListener::default(),
1395 engine_config: Arc::new(MitoConfig::default()),
1396 row_group_size: None,
1397 cache_manager: Arc::new(CacheManager::default()),
1398 manifest_ctx: env
1399 .mock_manifest_context(version_control.current().version.metadata.clone())
1400 .await,
1401 index_options: IndexOptions::default(),
1402 flush_semaphore: Arc::new(Semaphore::new(2)),
1403 is_staging: false,
1404 partition_expr: None,
1405 };
1406 task.push_sender(OptionOutputTx::from(output_tx));
1407 scheduler
1408 .schedule_flush(builder.region_id(), &version_control, task)
1409 .unwrap();
1410 assert!(scheduler.region_status.is_empty());
1411 let output = output_rx.await.unwrap().unwrap();
1412 assert_eq!(output, 0);
1413 assert!(scheduler.region_status.is_empty());
1414 }
1415
1416 #[tokio::test]
1417 async fn test_schedule_pending_request() {
1418 let job_scheduler = Arc::new(VecScheduler::default());
1419 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1420 let (tx, _rx) = mpsc::channel(4);
1421 let mut scheduler = env.mock_flush_scheduler();
1422 let mut builder = VersionControlBuilder::new();
1423 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1425 let version_control = Arc::new(builder.build());
1426 let version_data = version_control.current();
1428 write_rows_to_version(&version_data.version, "host0", 0, 10);
1429 let manifest_ctx = env
1430 .mock_manifest_context(version_data.version.metadata.clone())
1431 .await;
1432 let mut tasks: Vec<_> = (0..3)
1434 .map(|_| RegionFlushTask {
1435 region_id: builder.region_id(),
1436 reason: FlushReason::Others,
1437 senders: Vec::new(),
1438 request_sender: tx.clone(),
1439 access_layer: env.access_layer.clone(),
1440 listener: WorkerListener::default(),
1441 engine_config: Arc::new(MitoConfig::default()),
1442 row_group_size: None,
1443 cache_manager: Arc::new(CacheManager::default()),
1444 manifest_ctx: manifest_ctx.clone(),
1445 index_options: IndexOptions::default(),
1446 flush_semaphore: Arc::new(Semaphore::new(2)),
1447 is_staging: false,
1448 partition_expr: None,
1449 })
1450 .collect();
1451 let task = tasks.pop().unwrap();
1453 scheduler
1454 .schedule_flush(builder.region_id(), &version_control, task)
1455 .unwrap();
1456 assert_eq!(1, scheduler.region_status.len());
1458 assert_eq!(1, job_scheduler.num_jobs());
1459 let version_data = version_control.current();
1461 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1462 let output_rxs: Vec<_> = tasks
1464 .into_iter()
1465 .map(|mut task| {
1466 let (output_tx, output_rx) = oneshot::channel();
1467 task.push_sender(OptionOutputTx::from(output_tx));
1468 scheduler
1469 .schedule_flush(builder.region_id(), &version_control, task)
1470 .unwrap();
1471 output_rx
1472 })
1473 .collect();
1474 version_control.apply_edit(
1476 Some(RegionEdit {
1477 files_to_add: Vec::new(),
1478 files_to_remove: Vec::new(),
1479 timestamp_ms: None,
1480 compaction_time_window: None,
1481 flushed_entry_id: None,
1482 flushed_sequence: None,
1483 committed_sequence: None,
1484 }),
1485 &[0],
1486 builder.file_purger(),
1487 );
1488 scheduler.on_flush_success(builder.region_id());
1489 assert_eq!(1, job_scheduler.num_jobs());
1491 assert!(scheduler.region_status.is_empty());
1493 for output_rx in output_rxs {
1494 let output = output_rx.await.unwrap().unwrap();
1495 assert_eq!(output, 0);
1496 }
1497 }
1498
1499 #[test]
1501 fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1502 let metadata = metadata_for_test();
1504 let schema = to_flat_sst_arrow_schema(
1505 &metadata,
1506 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1507 );
1508
1509 let capacity = 16;
1512 let pk_codec = build_primary_key_codec(&metadata);
1513 let mut converter =
1514 BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1515 let kvs = build_key_values_with_ts_seq_values(
1516 &metadata,
1517 "dup_key".to_string(),
1518 1,
1519 vec![1000i64, 1000i64].into_iter(),
1520 vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1521 1,
1522 );
1523 converter.append_key_values(&kvs).unwrap();
1524 let part = converter.convert().unwrap();
1525
1526 let build_ranges = |append_mode: bool| -> MemtableRanges {
1529 let memtable = crate::memtable::bulk::BulkMemtable::new(
1530 1,
1531 crate::memtable::bulk::BulkMemtableConfig::default(),
1532 metadata.clone(),
1533 None,
1534 None,
1535 append_mode,
1536 MergeMode::LastRow,
1537 );
1538 memtable.write_bulk(part.clone()).unwrap();
1539 memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1540 };
1541
1542 {
1544 let mem_ranges = build_ranges(false);
1545 assert_eq!(1, mem_ranges.ranges.len());
1546
1547 let options = RegionOptions {
1548 append_mode: false,
1549 merge_mode: Some(MergeMode::LastRow),
1550 ..Default::default()
1551 };
1552
1553 let flat_sources = memtable_flat_sources(
1554 schema.clone(),
1555 mem_ranges,
1556 &options,
1557 metadata.primary_key.len(),
1558 )
1559 .unwrap();
1560 assert!(flat_sources.encoded.is_empty());
1561 assert_eq!(1, flat_sources.sources.len());
1562
1563 let mut total_rows = 0usize;
1565 for (source, _sequence) in flat_sources.sources {
1566 match source {
1567 crate::read::FlatSource::Iter(iter) => {
1568 for rb in iter {
1569 total_rows += rb.unwrap().num_rows();
1570 }
1571 }
1572 crate::read::FlatSource::Stream(_) => unreachable!(),
1573 }
1574 }
1575 assert_eq!(1, total_rows, "dedup should keep a single row");
1576 }
1577
1578 {
1580 let mem_ranges = build_ranges(true);
1581 assert_eq!(1, mem_ranges.ranges.len());
1582
1583 let options = RegionOptions {
1584 append_mode: true,
1585 ..Default::default()
1586 };
1587
1588 let flat_sources =
1589 memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1590 .unwrap();
1591 assert!(flat_sources.encoded.is_empty());
1592 assert_eq!(1, flat_sources.sources.len());
1593
1594 let mut total_rows = 0usize;
1595 for (source, _sequence) in flat_sources.sources {
1596 match source {
1597 crate::read::FlatSource::Iter(iter) => {
1598 for rb in iter {
1599 total_rows += rb.unwrap().num_rows();
1600 }
1601 }
1602 crate::read::FlatSource::Stream(_) => unreachable!(),
1603 }
1604 }
1605 assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1606 }
1607 }
1608
1609 #[tokio::test]
1610 async fn test_schedule_pending_request_on_flush_success() {
1611 common_telemetry::init_default_ut_logging();
1612 let job_scheduler = Arc::new(VecScheduler::default());
1613 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1614 let (tx, _rx) = mpsc::channel(4);
1615 let mut scheduler = env.mock_flush_scheduler();
1616 let mut builder = VersionControlBuilder::new();
1617 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1619 let version_control = Arc::new(builder.build());
1620 let version_data = version_control.current();
1622 write_rows_to_version(&version_data.version, "host0", 0, 10);
1623 let manifest_ctx = env
1624 .mock_manifest_context(version_data.version.metadata.clone())
1625 .await;
1626 let mut tasks: Vec<_> = (0..2)
1628 .map(|_| RegionFlushTask {
1629 region_id: builder.region_id(),
1630 reason: FlushReason::Others,
1631 senders: Vec::new(),
1632 request_sender: tx.clone(),
1633 access_layer: env.access_layer.clone(),
1634 listener: WorkerListener::default(),
1635 engine_config: Arc::new(MitoConfig::default()),
1636 row_group_size: None,
1637 cache_manager: Arc::new(CacheManager::default()),
1638 manifest_ctx: manifest_ctx.clone(),
1639 index_options: IndexOptions::default(),
1640 flush_semaphore: Arc::new(Semaphore::new(2)),
1641 is_staging: false,
1642 partition_expr: None,
1643 })
1644 .collect();
1645 let task = tasks.pop().unwrap();
1647 scheduler
1648 .schedule_flush(builder.region_id(), &version_control, task)
1649 .unwrap();
1650 assert_eq!(1, scheduler.region_status.len());
1652 assert_eq!(1, job_scheduler.num_jobs());
1653 let task = tasks.pop().unwrap();
1655 scheduler
1656 .schedule_flush(builder.region_id(), &version_control, task)
1657 .unwrap();
1658 assert!(
1659 scheduler
1660 .region_status
1661 .get(&builder.region_id())
1662 .unwrap()
1663 .pending_task
1664 .is_some()
1665 );
1666
1667 let version_data = version_control.current();
1669 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1670 version_control.apply_edit(
1672 Some(RegionEdit {
1673 files_to_add: Vec::new(),
1674 files_to_remove: Vec::new(),
1675 timestamp_ms: None,
1676 compaction_time_window: None,
1677 flushed_entry_id: None,
1678 flushed_sequence: None,
1679 committed_sequence: None,
1680 }),
1681 &[0],
1682 builder.file_purger(),
1683 );
1684 write_rows_to_version(&version_data.version, "host1", 0, 10);
1685 scheduler.on_flush_success(builder.region_id());
1686 assert_eq!(2, job_scheduler.num_jobs());
1687 assert!(
1689 scheduler
1690 .region_status
1691 .get(&builder.region_id())
1692 .unwrap()
1693 .pending_task
1694 .is_none()
1695 );
1696 }
1697}