1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use bytes::Bytes;
24use common_telemetry::{debug, error, info};
25use datatypes::arrow::datatypes::SchemaRef;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::region_request::RegionFlushReason;
30use store_api::storage::{RegionId, SequenceNumber};
31use strum::IntoStaticStr;
32use tokio::sync::{Semaphore, mpsc, watch};
33
34use crate::access_layer::{
35 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
36};
37use crate::cache::CacheManagerRef;
38use crate::config::MitoConfig;
39use crate::engine::region_hook::SstFileInfo;
40use crate::error::{
41 Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
42 RegionTruncatedSnafu, Result,
43};
44use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
45use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
46use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
47use crate::metrics::{
48 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
49 INFLIGHT_FLUSH_COUNT,
50};
51use crate::read::FlatSource;
52use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
53use crate::read::flat_merge::FlatMergeIterator;
54use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
55use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
56use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
57use crate::request::{
58 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
59 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
60};
61use crate::schedule::scheduler::{Job, SchedulerRef};
62use crate::sst::file::FileMeta;
63use crate::sst::parquet::metadata::extract_primary_key_range;
64use crate::sst::parquet::{
65 DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
66};
67use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
68use crate::worker::WorkerListener;
69
70pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
74 fn should_flush_engine(&self) -> bool;
76
77 fn should_stall(&self) -> bool;
79
80 fn reserve_mem(&self, mem: usize);
82
83 fn schedule_free_mem(&self, mem: usize);
88
89 fn free_mem(&self, mem: usize);
91
92 fn memory_usage(&self) -> usize;
94
95 fn flush_limit(&self) -> usize;
100}
101
102pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
103
104#[derive(Debug)]
109pub struct WriteBufferManagerImpl {
110 global_write_buffer_size: usize,
112 mutable_limit: usize,
114 memory_used: AtomicUsize,
116 memory_active: AtomicUsize,
118 notifier: Option<watch::Sender<()>>,
121}
122
123impl WriteBufferManagerImpl {
124 pub fn new(global_write_buffer_size: usize) -> Self {
126 Self {
127 global_write_buffer_size,
128 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
129 memory_used: AtomicUsize::new(0),
130 memory_active: AtomicUsize::new(0),
131 notifier: None,
132 }
133 }
134
135 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
137 self.notifier = Some(notifier);
138 self
139 }
140
141 pub fn mutable_usage(&self) -> usize {
143 self.memory_active.load(Ordering::Relaxed)
144 }
145
146 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
148 global_write_buffer_size / 2
150 }
151}
152
153impl WriteBufferManager for WriteBufferManagerImpl {
154 fn should_flush_engine(&self) -> bool {
155 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
156 if mutable_memtable_memory_usage >= self.mutable_limit {
157 debug!(
158 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
159 mutable_memtable_memory_usage,
160 self.memory_usage(),
161 self.mutable_limit,
162 self.global_write_buffer_size,
163 );
164 return true;
165 }
166
167 let memory_usage = self.memory_used.load(Ordering::Relaxed);
168 if memory_usage >= self.global_write_buffer_size {
169 return true;
170 }
171
172 false
173 }
174
175 fn should_stall(&self) -> bool {
176 self.memory_usage() >= self.global_write_buffer_size
177 }
178
179 fn reserve_mem(&self, mem: usize) {
180 self.memory_used.fetch_add(mem, Ordering::Relaxed);
181 self.memory_active.fetch_add(mem, Ordering::Relaxed);
182 }
183
184 fn schedule_free_mem(&self, mem: usize) {
185 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
186 }
187
188 fn free_mem(&self, mem: usize) {
189 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
190 if let Some(notifier) = &self.notifier {
191 let _ = notifier.send(());
195 }
196 }
197
198 fn memory_usage(&self) -> usize {
199 self.memory_used.load(Ordering::Relaxed)
200 }
201
202 fn flush_limit(&self) -> usize {
203 self.mutable_limit
204 }
205}
206
207#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
209pub enum FlushReason {
210 EngineFull,
212 Manual,
214 Alter,
216 Periodically,
218 Downgrading,
220 EnterStaging,
222 Closing,
224 RegionMigration,
226 Repartition,
228 RemoteWalPrune,
230}
231
232impl FlushReason {
233 fn as_str(&self) -> &'static str {
235 self.into()
236 }
237}
238
239impl From<RegionFlushReason> for FlushReason {
240 fn from(reason: RegionFlushReason) -> Self {
241 match reason {
242 RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
243 RegionFlushReason::Repartition => FlushReason::Repartition,
244 RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
245 RegionFlushReason::Closing => FlushReason::Closing,
246 RegionFlushReason::Downgrading => FlushReason::Downgrading,
247 }
248 }
249}
250
251pub(crate) struct RegionFlushTask {
253 pub(crate) region_id: RegionId,
255 pub(crate) reason: FlushReason,
257 pub(crate) senders: Vec<OutputTx>,
259 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
261
262 pub(crate) access_layer: AccessLayerRef,
263 pub(crate) listener: WorkerListener,
264 pub(crate) engine_config: Arc<MitoConfig>,
265 pub(crate) row_group_size: Option<usize>,
266 pub(crate) cache_manager: CacheManagerRef,
267 pub(crate) manifest_ctx: ManifestContextRef,
268
269 pub(crate) index_options: IndexOptions,
271 pub(crate) flush_semaphore: Arc<Semaphore>,
273 pub(crate) is_staging: bool,
275 pub(crate) partition_expr: Option<String>,
279}
280
281impl RegionFlushTask {
282 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
284 if let Some(sender) = sender.take_inner() {
285 self.senders.push(sender);
286 }
287 }
288
289 fn on_success(self) {
291 for sender in self.senders {
292 sender.send(Ok(0));
293 }
294 }
295
296 fn on_failure(&mut self, err: Arc<Error>) {
298 for sender in self.senders.drain(..) {
299 sender.send(Err(err.clone()).context(FlushRegionSnafu {
300 region_id: self.region_id,
301 }));
302 }
303 }
304
305 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
309 let version_data = version_control.current();
312
313 Box::pin(async move {
314 INFLIGHT_FLUSH_COUNT.inc();
315 self.do_flush(version_data).await;
316 INFLIGHT_FLUSH_COUNT.dec();
317 })
318 }
319
320 async fn do_flush(&mut self, version_data: VersionControlData) {
322 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
323 self.listener.on_flush_begin(self.region_id).await;
324
325 let worker_request = match self.flush_memtables(&version_data).await {
326 Ok(edit) => {
327 let memtables_to_remove = version_data
328 .version
329 .memtables
330 .immutables()
331 .iter()
332 .map(|m| m.id())
333 .collect();
334 let flush_finished = FlushFinished {
335 region_id: self.region_id,
336 flushed_entry_id: version_data.last_entry_id,
338 senders: std::mem::take(&mut self.senders),
339 _timer: timer,
340 edit,
341 memtables_to_remove,
342 is_staging: self.is_staging,
343 flush_reason: self.reason,
344 };
345 WorkerRequest::Background {
346 region_id: self.region_id,
347 notify: BackgroundNotify::FlushFinished(flush_finished),
348 }
349 }
350 Err(e) => {
351 error!(e; "Failed to flush region {}", self.region_id);
352 timer.stop_and_discard();
354
355 let err = Arc::new(e);
356 self.on_failure(err.clone());
357 WorkerRequest::Background {
358 region_id: self.region_id,
359 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
360 }
361 }
362 };
363 self.send_worker_request(worker_request).await;
364 }
365
366 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
369 let version = &version_data.version;
372 let timer = FLUSH_ELAPSED
373 .with_label_values(&["flush_memtables"])
374 .start_timer();
375
376 let mut write_opts = WriteOptions {
377 write_buffer_size: self.engine_config.sst_write_buffer_size,
378 ..Default::default()
379 };
380 if let Some(row_group_size) = self.row_group_size {
381 write_opts.row_group_size = row_group_size;
382 }
383
384 let DoFlushMemtablesResult {
385 file_metas,
386 flushed_bytes,
387 series_count,
388 encoded_part_count,
389 flush_metrics,
390 sst_infos,
391 } = self.do_flush_memtables(version, write_opts).await?;
392
393 if !file_metas.is_empty() {
394 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
395 }
396
397 let mut file_ids = Vec::with_capacity(file_metas.len());
398 let mut total_rows = 0;
399 let mut total_bytes = 0;
400 for meta in &file_metas {
401 file_ids.push(meta.file_id);
402 total_rows += meta.num_rows;
403 total_bytes += meta.file_size;
404 }
405 info!(
406 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
407 self.region_id,
408 self.reason.as_str(),
409 file_ids,
410 series_count,
411 total_rows,
412 total_bytes,
413 timer.stop_and_record(),
414 encoded_part_count,
415 flush_metrics,
416 );
417 flush_metrics.observe();
418
419 let hook = self.manifest_ctx.hook();
420 if let Some(hook) = &hook {
421 let files: Vec<SstFileInfo<'_>> = sst_infos
422 .iter()
423 .zip(file_metas.iter())
424 .map(|(sst_info, file_meta)| SstFileInfo {
425 sst_info_ref: sst_info,
426 file_meta,
427 })
428 .collect();
429 hook.on_sst_files_written(self.region_id, &version.metadata, &files)
430 .await;
431 }
432
433 let edit = RegionEdit {
434 files_to_add: file_metas,
435 files_to_remove: Vec::new(),
436 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
437 compaction_time_window: None,
438 flushed_entry_id: Some(version_data.last_entry_id),
440 flushed_sequence: Some(version_data.committed_sequence),
441 committed_sequence: None,
442 };
443 info!(
444 "Applying {edit:?} to region {}, is_staging: {}",
445 self.region_id, self.is_staging
446 );
447
448 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
449
450 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
451 RegionLeaderState::Downgrading
452 } else {
453 let current_state = self.manifest_ctx.current_state();
455 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
456 RegionLeaderState::Staging
457 } else {
458 RegionLeaderState::Writable
459 }
460 };
461 let manifest_version = self
464 .manifest_ctx
465 .update_manifest(expected_state, action_list, self.is_staging)
466 .await?;
467 info!(
468 "Successfully update manifest version to {manifest_version}, region: {}, is_staging: {}, reason: {}",
469 self.region_id,
470 self.is_staging,
471 self.reason.as_str()
472 );
473
474 Ok(edit)
475 }
476
477 async fn do_flush_memtables(
478 &self,
479 version: &VersionRef,
480 write_opts: WriteOptions,
481 ) -> Result<DoFlushMemtablesResult> {
482 let memtables = version.memtables.immutables();
483 let mut file_metas = Vec::with_capacity(memtables.len());
484 let mut flushed_bytes = 0;
485 let mut series_count = 0;
486 let mut encoded_part_count = 0;
487 let mut flush_metrics = Metrics::new(WriteType::Flush);
488 let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
489 let hook = self.manifest_ctx.hook();
490 let mut all_sst_infos = Vec::new();
491 for mem in memtables {
492 if mem.is_empty() {
493 continue;
495 }
496
497 let compact_start = std::time::Instant::now();
499 if let Err(e) = mem.compact(true) {
500 common_telemetry::error!(e; "Failed to compact memtable before flush");
501 }
502 let compact_cost = compact_start.elapsed();
503 flush_metrics.compact_memtable += compact_cost;
504
505 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
507 let num_mem_ranges = mem_ranges.ranges.len();
508
509 let num_mem_rows = mem_ranges.num_rows();
511 let memtable_series_count = mem_ranges.series_count();
512 let memtable_id = mem.id();
513 series_count += memtable_series_count;
516
517 let flush_start = Instant::now();
518 let FlushFlatMemResult {
519 num_encoded,
520 num_sources,
521 results,
522 } = self
523 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
524 .await?;
525 encoded_part_count += num_encoded;
526 for (source_idx, result) in results.into_iter().enumerate() {
527 let (max_sequence, ssts_written, metrics) = result?;
528 if ssts_written.is_empty() {
529 continue;
531 }
532
533 common_telemetry::debug!(
534 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
535 self.region_id,
536 memtable_id,
537 source_idx,
538 num_sources,
539 metrics
540 );
541
542 flush_metrics = flush_metrics.merge(metrics);
543
544 for sst_info in &ssts_written {
545 flushed_bytes += sst_info.file_size;
546 let pk_range = sst_info
547 .file_metadata
548 .as_ref()
549 .and_then(|meta| extract_primary_key_range(meta, &version.metadata));
550 file_metas.push(Self::new_file_meta(
551 self.region_id,
552 max_sequence,
553 sst_info,
554 partition_expr.clone(),
555 pk_range,
556 ));
557 }
558 if hook.is_some() {
559 all_sst_infos.extend(ssts_written);
560 }
561 }
562
563 common_telemetry::debug!(
564 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
565 self.region_id,
566 num_sources,
567 memtable_id,
568 num_mem_ranges,
569 num_encoded,
570 num_mem_rows,
571 flush_start.elapsed(),
572 compact_cost,
573 );
574 }
575
576 Ok(DoFlushMemtablesResult {
577 file_metas,
578 flushed_bytes,
579 series_count,
580 encoded_part_count,
581 flush_metrics,
582 sst_infos: all_sst_infos,
583 })
584 }
585
586 async fn flush_flat_mem_ranges(
587 &self,
588 version: &VersionRef,
589 write_opts: &WriteOptions,
590 mem_ranges: MemtableRanges,
591 ) -> Result<FlushFlatMemResult> {
592 let batch_schema = to_flat_sst_arrow_schema(
593 &version.metadata,
594 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
595 );
596 let field_column_start =
597 flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
598 let flat_sources = memtable_flat_sources(
599 batch_schema,
600 mem_ranges,
601 &version.options,
602 field_column_start,
603 )?;
604 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
605 let num_encoded = flat_sources.encoded.len();
606 for (source, max_sequence) in flat_sources.sources {
607 let write_request = self.new_write_request(version, max_sequence, source);
608 let access_layer = self.access_layer.clone();
609 let write_opts = write_opts.clone();
610 let semaphore = self.flush_semaphore.clone();
611 let task = common_runtime::spawn_global(async move {
612 let _permit = semaphore.acquire().await.unwrap();
613 let mut metrics = Metrics::new(WriteType::Flush);
614 let ssts = access_layer
615 .write_sst(write_request, &write_opts, &mut metrics)
616 .await?;
617 FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
618 Ok((max_sequence, ssts, metrics))
619 });
620 tasks.push(task);
621 }
622 for (encoded, max_sequence) in flat_sources.encoded {
623 let access_layer = self.access_layer.clone();
624 let cache_manager = self.cache_manager.clone();
625 let region_id = version.metadata.region_id;
626 let semaphore = self.flush_semaphore.clone();
627 let task = common_runtime::spawn_global(async move {
628 let _permit = semaphore.acquire().await.unwrap();
629 let metrics = access_layer
630 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
631 .await?;
632 FLUSH_FILE_TOTAL.inc();
633 Ok((max_sequence, smallvec![encoded.sst_info], metrics))
634 });
635 tasks.push(task);
636 }
637 let num_sources = tasks.len();
638 let results = futures::future::try_join_all(tasks)
639 .await
640 .context(JoinSnafu)?;
641 Ok(FlushFlatMemResult {
642 num_encoded,
643 num_sources,
644 results,
645 })
646 }
647
648 fn new_file_meta(
649 region_id: RegionId,
650 max_sequence: u64,
651 sst_info: &SstInfo,
652 partition_expr: Option<PartitionExpr>,
653 primary_key_range: Option<(Bytes, Bytes)>,
654 ) -> FileMeta {
655 let (primary_key_min, primary_key_max) = match primary_key_range {
656 Some((min, max)) => (Some(min), Some(max)),
657 None => (None, None),
658 };
659 FileMeta {
660 region_id,
661 file_id: sst_info.file_id,
662 time_range: sst_info.time_range,
663 level: 0,
664 file_size: sst_info.file_size,
665 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
666 available_indexes: sst_info.index_metadata.build_available_indexes(),
667 indexes: sst_info.index_metadata.build_indexes(),
668 index_file_size: sst_info.index_metadata.file_size,
669 index_version: 0,
670 num_rows: sst_info.num_rows as u64,
671 num_row_groups: sst_info.num_row_groups,
672 sequence: NonZeroU64::new(max_sequence),
673 partition_expr,
674 num_series: sst_info.num_series,
675 primary_key_min,
676 primary_key_max,
677 }
678 }
679
680 fn new_write_request(
681 &self,
682 version: &VersionRef,
683 max_sequence: u64,
684 source: FlatSource,
685 ) -> SstWriteRequest {
686 let flat_format = version
687 .options
688 .sst_format
689 .map(|f| f == FormatType::Flat)
690 .unwrap_or(self.engine_config.default_flat_format);
691 SstWriteRequest {
692 op_type: OperationType::Flush,
693 metadata: version.metadata.clone(),
694 source,
695 cache_manager: self.cache_manager.clone(),
696 storage: version.options.storage.clone(),
697 max_sequence: Some(max_sequence),
698 sst_write_format: if flat_format {
699 FormatType::Flat
700 } else {
701 FormatType::PrimaryKey
702 },
703 index_options: self.index_options.clone(),
704 index_config: self.engine_config.index.clone(),
705 inverted_index_config: self.engine_config.inverted_index.clone(),
706 fulltext_index_config: self.engine_config.fulltext_index.clone(),
707 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
708 #[cfg(feature = "vector_index")]
709 vector_index_config: self.engine_config.vector_index.clone(),
710 }
711 }
712
713 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
715 if let Err(e) = self
716 .request_sender
717 .send(WorkerRequestWithTime::new(request))
718 .await
719 {
720 error!(
721 "Failed to notify flush job status for region {}, request: {:?}",
722 self.region_id, e.0
723 );
724 }
725 }
726
727 fn merge(&mut self, mut other: RegionFlushTask) {
729 assert_eq!(self.region_id, other.region_id);
730 self.senders.append(&mut other.senders);
732 }
733}
734
735struct FlushFlatMemResult {
736 num_encoded: usize,
737 num_sources: usize,
738 results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
739}
740
741struct DoFlushMemtablesResult {
742 file_metas: Vec<FileMeta>,
743 flushed_bytes: u64,
744 series_count: usize,
745 encoded_part_count: usize,
746 flush_metrics: Metrics,
747 sst_infos: Vec<SstInfo>,
748}
749
750struct FlatSources {
751 sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
752 encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
753}
754
755fn memtable_flat_sources(
757 schema: SchemaRef,
758 mem_ranges: MemtableRanges,
759 options: &RegionOptions,
760 field_column_start: usize,
761) -> Result<FlatSources> {
762 let MemtableRanges { ranges } = mem_ranges;
763 let mut flat_sources = FlatSources {
764 sources: SmallVec::new(),
765 encoded: SmallVec::new(),
766 };
767
768 if ranges.len() == 1 {
769 debug!("Flushing single flat range");
770
771 let only_range = ranges.into_values().next().unwrap();
772 let max_sequence = only_range.stats().max_sequence();
773 if let Some(encoded) = only_range.encoded() {
774 flat_sources.encoded.push((encoded, max_sequence));
775 } else {
776 let iter = only_range.build_record_batch_iter(None, None)?;
777 let iter = maybe_dedup_one(
780 options.append_mode,
781 options.merge_mode(),
782 field_column_start,
783 iter,
784 );
785 flat_sources
786 .sources
787 .push((FlatSource::new_iter(schema, iter), max_sequence));
788 };
789 } else {
790 let min_flush_rows = *ENCODE_ROW_THRESHOLD;
791 let total_rows: usize = ranges
793 .values()
794 .filter(|r| r.encoded().is_none())
795 .map(|r| r.num_rows())
796 .sum();
797 debug!(
798 "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
799 total_rows,
800 min_flush_rows,
801 ranges.len()
802 );
803 let mut rows_remaining = total_rows;
804 let mut last_iter_rows = 0;
805 let num_ranges = ranges.len();
806 let mut input_iters = Vec::with_capacity(num_ranges);
807 let mut current_ranges = Vec::new();
808 for (_range_id, range) in ranges {
809 if let Some(encoded) = range.encoded() {
810 let max_sequence = range.stats().max_sequence();
811 flat_sources.encoded.push((encoded, max_sequence));
812 continue;
813 }
814
815 let iter = range.build_record_batch_iter(None, None)?;
816 input_iters.push(iter);
817 let range_rows = range.num_rows();
818 last_iter_rows += range_rows;
819 rows_remaining -= range_rows;
820 current_ranges.push(range);
821
822 if last_iter_rows >= min_flush_rows
825 && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
826 {
827 debug!(
828 "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
829 last_iter_rows,
830 min_flush_rows,
831 input_iters.len(),
832 rows_remaining
833 );
834
835 let max_sequence = current_ranges
837 .iter()
838 .map(|r| r.stats().max_sequence())
839 .max()
840 .unwrap_or(0);
841
842 let maybe_dedup = merge_and_dedup(
843 &schema,
844 options.append_mode,
845 options.merge_mode(),
846 field_column_start,
847 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
848 )?;
849
850 flat_sources.sources.push((
851 FlatSource::new_iter(schema.clone(), maybe_dedup),
852 max_sequence,
853 ));
854 last_iter_rows = 0;
855 current_ranges.clear();
856 }
857 }
858
859 if !input_iters.is_empty() {
861 debug!(
862 "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
863 last_iter_rows,
864 min_flush_rows,
865 input_iters.len(),
866 rows_remaining
867 );
868 let max_sequence = current_ranges
869 .iter()
870 .map(|r| r.stats().max_sequence())
871 .max()
872 .unwrap_or(0);
873
874 let maybe_dedup = merge_and_dedup(
875 &schema,
876 options.append_mode,
877 options.merge_mode(),
878 field_column_start,
879 input_iters,
880 )?;
881
882 flat_sources
883 .sources
884 .push((FlatSource::new_iter(schema, maybe_dedup), max_sequence));
885 }
886 }
887
888 Ok(flat_sources)
889}
890
891pub fn merge_and_dedup(
936 schema: &SchemaRef,
937 append_mode: bool,
938 merge_mode: MergeMode,
939 field_column_start: usize,
940 input_iters: Vec<BoxedRecordBatchIterator>,
941) -> Result<BoxedRecordBatchIterator> {
942 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
943 let maybe_dedup = if append_mode {
944 Box::new(merge_iter) as _
946 } else {
947 match merge_mode {
949 MergeMode::LastRow => {
950 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
951 }
952 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
953 merge_iter,
954 FlatLastNonNull::new(field_column_start, false),
955 )) as _,
956 }
957 };
958 Ok(maybe_dedup)
959}
960
961pub fn maybe_dedup_one(
962 append_mode: bool,
963 merge_mode: MergeMode,
964 field_column_start: usize,
965 input_iter: BoxedRecordBatchIterator,
966) -> BoxedRecordBatchIterator {
967 if append_mode {
968 input_iter
970 } else {
971 match merge_mode {
973 MergeMode::LastRow => {
974 Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
975 }
976 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
977 input_iter,
978 FlatLastNonNull::new(field_column_start, false),
979 )),
980 }
981 }
982}
983
984pub(crate) struct FlushScheduler {
986 region_status: HashMap<RegionId, FlushStatus>,
988 scheduler: SchedulerRef,
990}
991
992impl FlushScheduler {
993 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
995 FlushScheduler {
996 region_status: HashMap::new(),
997 scheduler,
998 }
999 }
1000
1001 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
1003 self.region_status.contains_key(®ion_id)
1004 }
1005
1006 fn schedule_flush_task(
1007 &mut self,
1008 version_control: &VersionControlRef,
1009 task: RegionFlushTask,
1010 ) -> Result<()> {
1011 let region_id = task.region_id;
1012
1013 if let Err(e) = version_control.freeze_mutable() {
1015 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
1016
1017 return Err(e);
1018 }
1019 let job = task.into_flush_job(version_control);
1021 if let Err(e) = self.scheduler.schedule(job) {
1022 error!(e; "Failed to schedule flush job for region {}", region_id);
1025
1026 return Err(e);
1027 }
1028 Ok(())
1029 }
1030
1031 pub(crate) fn schedule_flush(
1033 &mut self,
1034 region_id: RegionId,
1035 version_control: &VersionControlRef,
1036 task: RegionFlushTask,
1037 ) -> Result<()> {
1038 debug_assert_eq!(region_id, task.region_id);
1039
1040 let version = version_control.current().version;
1041 if version.memtables.is_empty() {
1042 debug_assert!(!self.region_status.contains_key(®ion_id));
1043 task.on_success();
1045 return Ok(());
1046 }
1047
1048 FLUSH_REQUESTS_TOTAL
1050 .with_label_values(&[task.reason.as_str()])
1051 .inc();
1052
1053 if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
1055 debug!("Merging flush task for region {}", region_id);
1057 flush_status.merge_task(task);
1058 return Ok(());
1059 }
1060
1061 self.schedule_flush_task(version_control, task)?;
1062
1063 let _ = self.region_status.insert(
1065 region_id,
1066 FlushStatus::new(region_id, version_control.clone()),
1067 );
1068
1069 Ok(())
1070 }
1071
1072 pub(crate) fn on_flush_success(
1076 &mut self,
1077 region_id: RegionId,
1078 ) -> Option<(
1079 Vec<SenderDdlRequest>,
1080 Vec<SenderWriteRequest>,
1081 Vec<SenderBulkRequest>,
1082 )> {
1083 let flush_status = self.region_status.get_mut(®ion_id)?;
1084 if flush_status.pending_task.is_none() {
1086 debug!(
1089 "Region {} doesn't have any pending flush task, removing it from the status",
1090 region_id
1091 );
1092 let flush_status = self.region_status.remove(®ion_id).unwrap();
1093 return Some((
1094 flush_status.pending_ddls,
1095 flush_status.pending_writes,
1096 flush_status.pending_bulk_writes,
1097 ));
1098 }
1099
1100 let version_data = flush_status.version_control.current();
1102 if version_data.version.memtables.is_empty() {
1103 let task = flush_status.pending_task.take().unwrap();
1106 task.on_success();
1108 debug!(
1109 "Region {} has nothing to flush, removing it from the status",
1110 region_id
1111 );
1112 let flush_status = self.region_status.remove(®ion_id).unwrap();
1114 return Some((
1115 flush_status.pending_ddls,
1116 flush_status.pending_writes,
1117 flush_status.pending_bulk_writes,
1118 ));
1119 }
1120
1121 debug!("Scheduling pending flush task for region {}", region_id);
1123 let task = flush_status.pending_task.take().unwrap();
1125 let version_control = flush_status.version_control.clone();
1126 if let Err(err) = self.schedule_flush_task(&version_control, task) {
1127 error!(
1128 err;
1129 "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1130 );
1131 }
1132 None
1134 }
1135
1136 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1138 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1139
1140 FLUSH_FAILURE_TOTAL.inc();
1141
1142 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1144 return;
1145 };
1146
1147 flush_status.on_failure(err);
1149 }
1150
1151 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1153 self.remove_region_on_failure(
1154 region_id,
1155 Arc::new(RegionDroppedSnafu { region_id }.build()),
1156 );
1157 }
1158
1159 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1161 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1162 }
1163
1164 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1166 self.remove_region_on_failure(
1167 region_id,
1168 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1169 );
1170 }
1171
1172 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1173 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1175 return;
1176 };
1177
1178 flush_status.on_failure(err);
1180 }
1181
1182 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1187 let status = self.region_status.get_mut(&request.region_id).unwrap();
1188 status.pending_ddls.push(request);
1189 }
1190
1191 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1196 let status = self
1197 .region_status
1198 .get_mut(&request.request.region_id)
1199 .unwrap();
1200 status.pending_writes.push(request);
1201 }
1202
1203 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1208 let status = self.region_status.get_mut(&request.region_id).unwrap();
1209 status.pending_bulk_writes.push(request);
1210 }
1211
1212 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1214 self.region_status
1215 .get(®ion_id)
1216 .map(|status| !status.pending_ddls.is_empty())
1217 .unwrap_or(false)
1218 }
1219}
1220
1221impl Drop for FlushScheduler {
1222 fn drop(&mut self) {
1223 for (region_id, flush_status) in self.region_status.drain() {
1224 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1226 }
1227 }
1228}
1229
1230struct FlushStatus {
1234 region_id: RegionId,
1236 version_control: VersionControlRef,
1238 pending_task: Option<RegionFlushTask>,
1240 pending_ddls: Vec<SenderDdlRequest>,
1242 pending_writes: Vec<SenderWriteRequest>,
1244 pending_bulk_writes: Vec<SenderBulkRequest>,
1246}
1247
1248impl FlushStatus {
1249 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1250 FlushStatus {
1251 region_id,
1252 version_control,
1253 pending_task: None,
1254 pending_ddls: Vec::new(),
1255 pending_writes: Vec::new(),
1256 pending_bulk_writes: Vec::new(),
1257 }
1258 }
1259
1260 fn merge_task(&mut self, task: RegionFlushTask) {
1262 if let Some(pending) = &mut self.pending_task {
1263 pending.merge(task);
1264 } else {
1265 self.pending_task = Some(task);
1266 }
1267 }
1268
1269 fn on_failure(self, err: Arc<Error>) {
1270 if let Some(mut task) = self.pending_task {
1271 task.on_failure(err.clone());
1272 }
1273 for ddl in self.pending_ddls {
1274 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1275 region_id: self.region_id,
1276 }));
1277 }
1278 for write_req in self.pending_writes {
1279 write_req
1280 .sender
1281 .send(Err(err.clone()).context(FlushRegionSnafu {
1282 region_id: self.region_id,
1283 }));
1284 }
1285 }
1286}
1287
1288#[cfg(test)]
1289mod tests {
1290 use mito_codec::row_converter::build_primary_key_codec;
1291 use tokio::sync::oneshot;
1292
1293 use super::*;
1294 use crate::cache::CacheManager;
1295 use crate::memtable::bulk::part::BulkPartConverter;
1296 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1297 use crate::memtable::{Memtable, RangesOptions};
1298 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1299 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1300 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1301 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1302
1303 #[test]
1304 fn test_get_mutable_limit() {
1305 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1306 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1307 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1308 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1309 }
1310
1311 #[test]
1312 fn test_over_mutable_limit() {
1313 let manager = WriteBufferManagerImpl::new(1000);
1315 manager.reserve_mem(400);
1316 assert!(!manager.should_flush_engine());
1317 assert!(!manager.should_stall());
1318
1319 manager.reserve_mem(400);
1321 assert!(manager.should_flush_engine());
1322
1323 manager.schedule_free_mem(400);
1325 assert!(!manager.should_flush_engine());
1326 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1327 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1328
1329 manager.free_mem(400);
1331 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1332 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1333 }
1334
1335 #[test]
1336 fn test_over_global() {
1337 let manager = WriteBufferManagerImpl::new(1000);
1339 manager.reserve_mem(1100);
1340 assert!(manager.should_stall());
1341 manager.schedule_free_mem(200);
1343 assert!(manager.should_flush_engine());
1344 assert!(manager.should_stall());
1345
1346 manager.schedule_free_mem(450);
1348 assert!(manager.should_flush_engine());
1349 assert!(manager.should_stall());
1350
1351 manager.reserve_mem(50);
1353 assert!(manager.should_flush_engine());
1354 manager.reserve_mem(100);
1355 assert!(manager.should_flush_engine());
1356 }
1357
1358 #[test]
1359 fn test_manager_notify() {
1360 let (sender, receiver) = watch::channel(());
1361 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1362 manager.reserve_mem(500);
1363 assert!(!receiver.has_changed().unwrap());
1364 manager.schedule_free_mem(500);
1365 assert!(!receiver.has_changed().unwrap());
1366 manager.free_mem(500);
1367 assert!(receiver.has_changed().unwrap());
1368 }
1369
1370 #[tokio::test]
1371 async fn test_schedule_empty() {
1372 let env = SchedulerEnv::new().await;
1373 let (tx, _rx) = mpsc::channel(4);
1374 let mut scheduler = env.mock_flush_scheduler();
1375 let builder = VersionControlBuilder::new();
1376
1377 let version_control = Arc::new(builder.build());
1378 let (output_tx, output_rx) = oneshot::channel();
1379 let mut task = RegionFlushTask {
1380 region_id: builder.region_id(),
1381 reason: FlushReason::Manual,
1382 senders: Vec::new(),
1383 request_sender: tx,
1384 access_layer: env.access_layer.clone(),
1385 listener: WorkerListener::default(),
1386 engine_config: Arc::new(MitoConfig::default()),
1387 row_group_size: None,
1388 cache_manager: Arc::new(CacheManager::default()),
1389 manifest_ctx: env
1390 .mock_manifest_context(version_control.current().version.metadata.clone())
1391 .await,
1392 index_options: IndexOptions::default(),
1393 flush_semaphore: Arc::new(Semaphore::new(2)),
1394 is_staging: false,
1395 partition_expr: None,
1396 };
1397 task.push_sender(OptionOutputTx::from(output_tx));
1398 scheduler
1399 .schedule_flush(builder.region_id(), &version_control, task)
1400 .unwrap();
1401 assert!(scheduler.region_status.is_empty());
1402 let output = output_rx.await.unwrap().unwrap();
1403 assert_eq!(output, 0);
1404 assert!(scheduler.region_status.is_empty());
1405 }
1406
1407 #[tokio::test]
1408 async fn test_schedule_pending_request() {
1409 let job_scheduler = Arc::new(VecScheduler::default());
1410 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1411 let (tx, _rx) = mpsc::channel(4);
1412 let mut scheduler = env.mock_flush_scheduler();
1413 let mut builder = VersionControlBuilder::new();
1414 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1416 let version_control = Arc::new(builder.build());
1417 let version_data = version_control.current();
1419 write_rows_to_version(&version_data.version, "host0", 0, 10);
1420 let manifest_ctx = env
1421 .mock_manifest_context(version_data.version.metadata.clone())
1422 .await;
1423 let mut tasks: Vec<_> = (0..3)
1425 .map(|_| RegionFlushTask {
1426 region_id: builder.region_id(),
1427 reason: FlushReason::Manual,
1428 senders: Vec::new(),
1429 request_sender: tx.clone(),
1430 access_layer: env.access_layer.clone(),
1431 listener: WorkerListener::default(),
1432 engine_config: Arc::new(MitoConfig::default()),
1433 row_group_size: None,
1434 cache_manager: Arc::new(CacheManager::default()),
1435 manifest_ctx: manifest_ctx.clone(),
1436 index_options: IndexOptions::default(),
1437 flush_semaphore: Arc::new(Semaphore::new(2)),
1438 is_staging: false,
1439 partition_expr: None,
1440 })
1441 .collect();
1442 let task = tasks.pop().unwrap();
1444 scheduler
1445 .schedule_flush(builder.region_id(), &version_control, task)
1446 .unwrap();
1447 assert_eq!(1, scheduler.region_status.len());
1449 assert_eq!(1, job_scheduler.num_jobs());
1450 let version_data = version_control.current();
1452 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1453 let output_rxs: Vec<_> = tasks
1455 .into_iter()
1456 .map(|mut task| {
1457 let (output_tx, output_rx) = oneshot::channel();
1458 task.push_sender(OptionOutputTx::from(output_tx));
1459 scheduler
1460 .schedule_flush(builder.region_id(), &version_control, task)
1461 .unwrap();
1462 output_rx
1463 })
1464 .collect();
1465 version_control.apply_edit(
1467 Some(RegionEdit {
1468 files_to_add: Vec::new(),
1469 files_to_remove: Vec::new(),
1470 timestamp_ms: None,
1471 compaction_time_window: None,
1472 flushed_entry_id: None,
1473 flushed_sequence: None,
1474 committed_sequence: None,
1475 }),
1476 &[0],
1477 builder.file_purger(),
1478 );
1479 scheduler.on_flush_success(builder.region_id());
1480 assert_eq!(1, job_scheduler.num_jobs());
1482 assert!(scheduler.region_status.is_empty());
1484 for output_rx in output_rxs {
1485 let output = output_rx.await.unwrap().unwrap();
1486 assert_eq!(output, 0);
1487 }
1488 }
1489
1490 #[test]
1492 fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1493 let metadata = metadata_for_test();
1495 let schema = to_flat_sst_arrow_schema(
1496 &metadata,
1497 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1498 );
1499
1500 let capacity = 16;
1503 let pk_codec = build_primary_key_codec(&metadata);
1504 let mut converter =
1505 BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1506 let kvs = build_key_values_with_ts_seq_values(
1507 &metadata,
1508 "dup_key".to_string(),
1509 1,
1510 vec![1000i64, 1000i64].into_iter(),
1511 vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1512 1,
1513 );
1514 converter.append_key_values(&kvs).unwrap();
1515 let part = converter.convert().unwrap();
1516
1517 let build_ranges = |append_mode: bool| -> MemtableRanges {
1520 let memtable = crate::memtable::bulk::BulkMemtable::new(
1521 1,
1522 crate::memtable::bulk::BulkMemtableConfig::default(),
1523 metadata.clone(),
1524 None,
1525 None,
1526 append_mode,
1527 MergeMode::LastRow,
1528 );
1529 memtable.write_bulk(part.clone()).unwrap();
1530 memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1531 };
1532
1533 {
1535 let mem_ranges = build_ranges(false);
1536 assert_eq!(1, mem_ranges.ranges.len());
1537
1538 let options = RegionOptions {
1539 append_mode: false,
1540 merge_mode: Some(MergeMode::LastRow),
1541 ..Default::default()
1542 };
1543
1544 let flat_sources = memtable_flat_sources(
1545 schema.clone(),
1546 mem_ranges,
1547 &options,
1548 metadata.primary_key.len(),
1549 )
1550 .unwrap();
1551 assert!(flat_sources.encoded.is_empty());
1552 assert_eq!(1, flat_sources.sources.len());
1553
1554 let mut total_rows = 0usize;
1556 for (source, _sequence) in flat_sources.sources {
1557 total_rows += source
1558 .take_iter()
1559 .map(|x| x.unwrap().num_rows())
1560 .sum::<usize>();
1561 }
1562 assert_eq!(1, total_rows, "dedup should keep a single row");
1563 }
1564
1565 {
1567 let mem_ranges = build_ranges(true);
1568 assert_eq!(1, mem_ranges.ranges.len());
1569
1570 let options = RegionOptions {
1571 append_mode: true,
1572 ..Default::default()
1573 };
1574
1575 let flat_sources =
1576 memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1577 .unwrap();
1578 assert!(flat_sources.encoded.is_empty());
1579 assert_eq!(1, flat_sources.sources.len());
1580
1581 let mut total_rows = 0usize;
1582 for (source, _sequence) in flat_sources.sources {
1583 total_rows += source
1584 .take_iter()
1585 .map(|x| x.unwrap().num_rows())
1586 .sum::<usize>();
1587 }
1588 assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1589 }
1590 }
1591
1592 #[tokio::test]
1593 async fn test_schedule_pending_request_on_flush_success() {
1594 common_telemetry::init_default_ut_logging();
1595 let job_scheduler = Arc::new(VecScheduler::default());
1596 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1597 let (tx, _rx) = mpsc::channel(4);
1598 let mut scheduler = env.mock_flush_scheduler();
1599 let mut builder = VersionControlBuilder::new();
1600 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1602 let version_control = Arc::new(builder.build());
1603 let version_data = version_control.current();
1605 write_rows_to_version(&version_data.version, "host0", 0, 10);
1606 let manifest_ctx = env
1607 .mock_manifest_context(version_data.version.metadata.clone())
1608 .await;
1609 let mut tasks: Vec<_> = (0..2)
1611 .map(|_| RegionFlushTask {
1612 region_id: builder.region_id(),
1613 reason: FlushReason::Manual,
1614 senders: Vec::new(),
1615 request_sender: tx.clone(),
1616 access_layer: env.access_layer.clone(),
1617 listener: WorkerListener::default(),
1618 engine_config: Arc::new(MitoConfig::default()),
1619 row_group_size: None,
1620 cache_manager: Arc::new(CacheManager::default()),
1621 manifest_ctx: manifest_ctx.clone(),
1622 index_options: IndexOptions::default(),
1623 flush_semaphore: Arc::new(Semaphore::new(2)),
1624 is_staging: false,
1625 partition_expr: None,
1626 })
1627 .collect();
1628 let task = tasks.pop().unwrap();
1630 scheduler
1631 .schedule_flush(builder.region_id(), &version_control, task)
1632 .unwrap();
1633 assert_eq!(1, scheduler.region_status.len());
1635 assert_eq!(1, job_scheduler.num_jobs());
1636 let task = tasks.pop().unwrap();
1638 scheduler
1639 .schedule_flush(builder.region_id(), &version_control, task)
1640 .unwrap();
1641 assert!(
1642 scheduler
1643 .region_status
1644 .get(&builder.region_id())
1645 .unwrap()
1646 .pending_task
1647 .is_some()
1648 );
1649
1650 let version_data = version_control.current();
1652 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1653 version_control.apply_edit(
1655 Some(RegionEdit {
1656 files_to_add: Vec::new(),
1657 files_to_remove: Vec::new(),
1658 timestamp_ms: None,
1659 compaction_time_window: None,
1660 flushed_entry_id: None,
1661 flushed_sequence: None,
1662 committed_sequence: None,
1663 }),
1664 &[0],
1665 builder.file_purger(),
1666 );
1667 write_rows_to_version(&version_data.version, "host1", 0, 10);
1668 scheduler.on_flush_success(builder.region_id());
1669 assert_eq!(2, job_scheduler.num_jobs());
1670 assert!(
1672 scheduler
1673 .region_status
1674 .get(&builder.region_id())
1675 .unwrap()
1676 .pending_task
1677 .is_none()
1678 );
1679 }
1680}