1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, trace};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39 Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges};
44use crate::metrics::{
45 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
46 INFLIGHT_FLUSH_COUNT,
47};
48use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::read::merge::MergeReaderBuilder;
52use crate::read::scan_region::PredicateGroup;
53use crate::read::{FlatSource, Source};
54use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
55use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
56use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
57use crate::request::{
58 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
59 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
60};
61use crate::schedule::scheduler::{Job, SchedulerRef};
62use crate::sst::file::FileMeta;
63use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
64use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
65use crate::worker::WorkerListener;
66
67pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
71 fn should_flush_engine(&self) -> bool;
73
74 fn should_stall(&self) -> bool;
76
77 fn reserve_mem(&self, mem: usize);
79
80 fn schedule_free_mem(&self, mem: usize);
85
86 fn free_mem(&self, mem: usize);
88
89 fn memory_usage(&self) -> usize;
91}
92
93pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
94
95#[derive(Debug)]
100pub struct WriteBufferManagerImpl {
101 global_write_buffer_size: usize,
103 mutable_limit: usize,
105 memory_used: AtomicUsize,
107 memory_active: AtomicUsize,
109 notifier: Option<watch::Sender<()>>,
112}
113
114impl WriteBufferManagerImpl {
115 pub fn new(global_write_buffer_size: usize) -> Self {
117 Self {
118 global_write_buffer_size,
119 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
120 memory_used: AtomicUsize::new(0),
121 memory_active: AtomicUsize::new(0),
122 notifier: None,
123 }
124 }
125
126 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
128 self.notifier = Some(notifier);
129 self
130 }
131
132 pub fn mutable_usage(&self) -> usize {
134 self.memory_active.load(Ordering::Relaxed)
135 }
136
137 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
139 global_write_buffer_size / 2
141 }
142}
143
144impl WriteBufferManager for WriteBufferManagerImpl {
145 fn should_flush_engine(&self) -> bool {
146 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
147 if mutable_memtable_memory_usage > self.mutable_limit {
148 debug!(
149 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
150 mutable_memtable_memory_usage,
151 self.memory_usage(),
152 self.mutable_limit,
153 self.global_write_buffer_size,
154 );
155 return true;
156 }
157
158 let memory_usage = self.memory_used.load(Ordering::Relaxed);
159 if memory_usage >= self.global_write_buffer_size {
163 if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
164 debug!(
165 "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
166 mutable_usage: {}.",
167 memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
168 );
169 return true;
170 } else {
171 trace!(
172 "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
173 memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
174 );
175 }
176 }
177
178 false
179 }
180
181 fn should_stall(&self) -> bool {
182 self.memory_usage() >= self.global_write_buffer_size
183 }
184
185 fn reserve_mem(&self, mem: usize) {
186 self.memory_used.fetch_add(mem, Ordering::Relaxed);
187 self.memory_active.fetch_add(mem, Ordering::Relaxed);
188 }
189
190 fn schedule_free_mem(&self, mem: usize) {
191 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
192 }
193
194 fn free_mem(&self, mem: usize) {
195 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
196 if let Some(notifier) = &self.notifier {
197 let _ = notifier.send(());
201 }
202 }
203
204 fn memory_usage(&self) -> usize {
205 self.memory_used.load(Ordering::Relaxed)
206 }
207}
208
209#[derive(Debug, IntoStaticStr)]
211pub enum FlushReason {
212 Others,
214 EngineFull,
216 Manual,
218 Alter,
220 Periodically,
222 Downgrading,
224}
225
226impl FlushReason {
227 fn as_str(&self) -> &'static str {
229 self.into()
230 }
231}
232
233pub(crate) struct RegionFlushTask {
235 pub(crate) region_id: RegionId,
237 pub(crate) reason: FlushReason,
239 pub(crate) senders: Vec<OutputTx>,
241 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
243
244 pub(crate) access_layer: AccessLayerRef,
245 pub(crate) listener: WorkerListener,
246 pub(crate) engine_config: Arc<MitoConfig>,
247 pub(crate) row_group_size: Option<usize>,
248 pub(crate) cache_manager: CacheManagerRef,
249 pub(crate) manifest_ctx: ManifestContextRef,
250
251 pub(crate) index_options: IndexOptions,
253 pub(crate) flush_semaphore: Arc<Semaphore>,
255}
256
257impl RegionFlushTask {
258 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
260 if let Some(sender) = sender.take_inner() {
261 self.senders.push(sender);
262 }
263 }
264
265 fn on_success(self) {
267 for sender in self.senders {
268 sender.send(Ok(0));
269 }
270 }
271
272 fn on_failure(&mut self, err: Arc<Error>) {
274 for sender in self.senders.drain(..) {
275 sender.send(Err(err.clone()).context(FlushRegionSnafu {
276 region_id: self.region_id,
277 }));
278 }
279 }
280
281 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
285 let version_data = version_control.current();
288
289 Box::pin(async move {
290 INFLIGHT_FLUSH_COUNT.inc();
291 self.do_flush(version_data).await;
292 INFLIGHT_FLUSH_COUNT.dec();
293 })
294 }
295
296 async fn do_flush(&mut self, version_data: VersionControlData) {
298 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
299 self.listener.on_flush_begin(self.region_id).await;
300
301 let worker_request = match self.flush_memtables(&version_data).await {
302 Ok(edit) => {
303 let memtables_to_remove = version_data
304 .version
305 .memtables
306 .immutables()
307 .iter()
308 .map(|m| m.id())
309 .collect();
310 let flush_finished = FlushFinished {
311 region_id: self.region_id,
312 flushed_entry_id: version_data.last_entry_id,
314 senders: std::mem::take(&mut self.senders),
315 _timer: timer,
316 edit,
317 memtables_to_remove,
318 };
319 WorkerRequest::Background {
320 region_id: self.region_id,
321 notify: BackgroundNotify::FlushFinished(flush_finished),
322 }
323 }
324 Err(e) => {
325 error!(e; "Failed to flush region {}", self.region_id);
326 timer.stop_and_discard();
328
329 let err = Arc::new(e);
330 self.on_failure(err.clone());
331 WorkerRequest::Background {
332 region_id: self.region_id,
333 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
334 }
335 }
336 };
337 self.send_worker_request(worker_request).await;
338 }
339
340 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
343 let version = &version_data.version;
346 let timer = FLUSH_ELAPSED
347 .with_label_values(&["flush_memtables"])
348 .start_timer();
349
350 let mut write_opts = WriteOptions {
351 write_buffer_size: self.engine_config.sst_write_buffer_size,
352 ..Default::default()
353 };
354 if let Some(row_group_size) = self.row_group_size {
355 write_opts.row_group_size = row_group_size;
356 }
357
358 let DoFlushMemtablesResult {
359 file_metas,
360 flushed_bytes,
361 series_count,
362 flush_metrics,
363 } = self.do_flush_memtables(version, write_opts).await?;
364
365 if !file_metas.is_empty() {
366 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
367 }
368
369 let mut file_ids = Vec::with_capacity(file_metas.len());
370 let mut total_rows = 0;
371 let mut total_bytes = 0;
372 for meta in &file_metas {
373 file_ids.push(meta.file_id);
374 total_rows += meta.num_rows;
375 total_bytes += meta.file_size;
376 }
377 info!(
378 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
379 self.region_id,
380 self.reason.as_str(),
381 file_ids,
382 series_count,
383 total_rows,
384 total_bytes,
385 timer.stop_and_record(),
386 flush_metrics,
387 );
388 flush_metrics.observe();
389
390 let edit = RegionEdit {
391 files_to_add: file_metas,
392 files_to_remove: Vec::new(),
393 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
394 compaction_time_window: None,
395 flushed_entry_id: Some(version_data.last_entry_id),
397 flushed_sequence: Some(version_data.committed_sequence),
398 committed_sequence: None,
399 };
400 info!("Applying {edit:?} to region {}", self.region_id);
401
402 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
403
404 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
405 RegionLeaderState::Downgrading
406 } else {
407 let current_state = self.manifest_ctx.current_state();
409 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
410 RegionLeaderState::Staging
411 } else {
412 RegionLeaderState::Writable
413 }
414 };
415 let version = self
418 .manifest_ctx
419 .update_manifest(expected_state, action_list)
420 .await?;
421 info!(
422 "Successfully update manifest version to {version}, region: {}, reason: {}",
423 self.region_id,
424 self.reason.as_str()
425 );
426
427 Ok(edit)
428 }
429
430 async fn do_flush_memtables(
431 &self,
432 version: &VersionRef,
433 write_opts: WriteOptions,
434 ) -> Result<DoFlushMemtablesResult> {
435 let memtables = version.memtables.immutables();
436 let mut file_metas = Vec::with_capacity(memtables.len());
437 let mut flushed_bytes = 0;
438 let mut series_count = 0;
439 let partition_expr = match &version.metadata.partition_expr {
441 None => None,
442 Some(json_expr) if json_expr.is_empty() => None,
443 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
444 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
445 };
446 let mut flush_metrics = Metrics::new(WriteType::Flush);
447 for mem in memtables {
448 if mem.is_empty() {
449 continue;
451 }
452
453 let compact_start = std::time::Instant::now();
455 if let Err(e) = mem.compact(true) {
456 common_telemetry::error!(e; "Failed to compact memtable before flush");
457 }
458 let compact_cost = compact_start.elapsed();
459 flush_metrics.compact_memtable += compact_cost;
460
461 let mem_ranges = mem.ranges(None, PredicateGroup::default(), None, true)?;
463 let num_mem_ranges = mem_ranges.ranges.len();
464 let num_mem_rows = mem_ranges.stats.num_rows();
465 let memtable_id = mem.id();
466 series_count += mem_ranges.stats.series_count();
469
470 if mem_ranges.is_record_batch() {
471 let flush_start = Instant::now();
472 let FlushFlatMemResult {
473 num_encoded,
474 max_sequence,
475 num_sources,
476 results,
477 } = self
478 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
479 .await?;
480 for (source_idx, result) in results.into_iter().enumerate() {
481 let (ssts_written, metrics) = result?;
482 if ssts_written.is_empty() {
483 continue;
485 }
486
487 common_telemetry::debug!(
488 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
489 self.region_id,
490 memtable_id,
491 source_idx,
492 num_sources,
493 metrics
494 );
495
496 flush_metrics = flush_metrics.merge(metrics);
497
498 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
499 flushed_bytes += sst_info.file_size;
500 Self::new_file_meta(
501 self.region_id,
502 max_sequence,
503 sst_info,
504 partition_expr.clone(),
505 )
506 }));
507 }
508
509 common_telemetry::debug!(
510 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
511 self.region_id,
512 num_sources,
513 memtable_id,
514 num_mem_ranges,
515 num_encoded,
516 num_mem_rows,
517 flush_start.elapsed(),
518 compact_cost,
519 );
520 } else {
521 let max_sequence = mem_ranges.stats.max_sequence();
522 let source = memtable_source(mem_ranges, &version.options).await?;
523
524 let source = Either::Left(source);
526 let write_request = self.new_write_request(version, max_sequence, source);
527
528 let (ssts_written, metrics) = self
529 .access_layer
530 .write_sst(write_request, &write_opts, WriteType::Flush)
531 .await?;
532 if ssts_written.is_empty() {
533 continue;
535 }
536
537 common_telemetry::debug!(
538 "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
539 self.region_id,
540 num_mem_ranges,
541 num_mem_rows,
542 metrics
543 );
544
545 flush_metrics = flush_metrics.merge(metrics);
546
547 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
548 flushed_bytes += sst_info.file_size;
549 Self::new_file_meta(
550 self.region_id,
551 max_sequence,
552 sst_info,
553 partition_expr.clone(),
554 )
555 }));
556 };
557 }
558
559 Ok(DoFlushMemtablesResult {
560 file_metas,
561 flushed_bytes,
562 series_count,
563 flush_metrics,
564 })
565 }
566
567 async fn flush_flat_mem_ranges(
568 &self,
569 version: &VersionRef,
570 write_opts: &WriteOptions,
571 mem_ranges: MemtableRanges,
572 ) -> Result<FlushFlatMemResult> {
573 let batch_schema = to_flat_sst_arrow_schema(
574 &version.metadata,
575 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
576 );
577 let flat_sources = memtable_flat_sources(
578 batch_schema,
579 mem_ranges,
580 &version.options,
581 version.metadata.primary_key.len(),
582 )?;
583 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
584 let num_encoded = flat_sources.encoded.len();
585 let max_sequence = flat_sources.max_sequence;
586 for source in flat_sources.sources {
587 let source = Either::Right(source);
588 let write_request = self.new_write_request(version, max_sequence, source);
589 let access_layer = self.access_layer.clone();
590 let write_opts = write_opts.clone();
591 let semaphore = self.flush_semaphore.clone();
592 let task = common_runtime::spawn_global(async move {
593 let _permit = semaphore.acquire().await.unwrap();
594 access_layer
595 .write_sst(write_request, &write_opts, WriteType::Flush)
596 .await
597 });
598 tasks.push(task);
599 }
600 for encoded in flat_sources.encoded {
601 let access_layer = self.access_layer.clone();
602 let cache_manager = self.cache_manager.clone();
603 let region_id = version.metadata.region_id;
604 let semaphore = self.flush_semaphore.clone();
605 let task = common_runtime::spawn_global(async move {
606 let _permit = semaphore.acquire().await.unwrap();
607 let metrics = access_layer
608 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
609 .await?;
610 Ok((smallvec![encoded.sst_info], metrics))
611 });
612 tasks.push(task);
613 }
614 let num_sources = tasks.len();
615 let results = futures::future::try_join_all(tasks)
616 .await
617 .context(JoinSnafu)?;
618 Ok(FlushFlatMemResult {
619 num_encoded,
620 max_sequence,
621 num_sources,
622 results,
623 })
624 }
625
626 fn new_file_meta(
627 region_id: RegionId,
628 max_sequence: u64,
629 sst_info: SstInfo,
630 partition_expr: Option<PartitionExpr>,
631 ) -> FileMeta {
632 FileMeta {
633 region_id,
634 file_id: sst_info.file_id,
635 time_range: sst_info.time_range,
636 level: 0,
637 file_size: sst_info.file_size,
638 available_indexes: sst_info.index_metadata.build_available_indexes(),
639 index_file_size: sst_info.index_metadata.file_size,
640 num_rows: sst_info.num_rows as u64,
641 num_row_groups: sst_info.num_row_groups,
642 sequence: NonZeroU64::new(max_sequence),
643 partition_expr,
644 }
645 }
646
647 fn new_write_request(
648 &self,
649 version: &VersionRef,
650 max_sequence: u64,
651 source: Either<Source, FlatSource>,
652 ) -> SstWriteRequest {
653 SstWriteRequest {
654 op_type: OperationType::Flush,
655 metadata: version.metadata.clone(),
656 source,
657 cache_manager: self.cache_manager.clone(),
658 storage: version.options.storage.clone(),
659 max_sequence: Some(max_sequence),
660 index_options: self.index_options.clone(),
661 index_config: self.engine_config.index.clone(),
662 inverted_index_config: self.engine_config.inverted_index.clone(),
663 fulltext_index_config: self.engine_config.fulltext_index.clone(),
664 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
665 }
666 }
667
668 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
670 if let Err(e) = self
671 .request_sender
672 .send(WorkerRequestWithTime::new(request))
673 .await
674 {
675 error!(
676 "Failed to notify flush job status for region {}, request: {:?}",
677 self.region_id, e.0
678 );
679 }
680 }
681
682 fn merge(&mut self, mut other: RegionFlushTask) {
684 assert_eq!(self.region_id, other.region_id);
685 self.senders.append(&mut other.senders);
687 }
688}
689
690struct FlushFlatMemResult {
691 num_encoded: usize,
692 max_sequence: u64,
693 num_sources: usize,
694 results: Vec<Result<(SstInfoArray, Metrics)>>,
695}
696
697struct DoFlushMemtablesResult {
698 file_metas: Vec<FileMeta>,
699 flushed_bytes: u64,
700 series_count: usize,
701 flush_metrics: Metrics,
702}
703
704async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
706 let source = if mem_ranges.ranges.len() == 1 {
707 let only_range = mem_ranges.ranges.into_values().next().unwrap();
708 let iter = only_range.build_iter()?;
709 Source::Iter(iter)
710 } else {
711 let sources = mem_ranges
713 .ranges
714 .into_values()
715 .map(|r| r.build_iter().map(Source::Iter))
716 .collect::<Result<Vec<_>>>()?;
717 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
718 let maybe_dedup = if options.append_mode {
719 Box::new(merge_reader) as _
721 } else {
722 match options.merge_mode.unwrap_or(MergeMode::LastRow) {
724 MergeMode::LastRow => {
725 Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
726 }
727 MergeMode::LastNonNull => {
728 Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
729 }
730 }
731 };
732 Source::Reader(maybe_dedup)
733 };
734 Ok(source)
735}
736
737struct FlatSources {
738 max_sequence: u64,
739 sources: SmallVec<[FlatSource; 4]>,
740 encoded: SmallVec<[EncodedRange; 4]>,
741}
742
743fn memtable_flat_sources(
746 schema: SchemaRef,
747 mem_ranges: MemtableRanges,
748 options: &RegionOptions,
749 field_column_start: usize,
750) -> Result<FlatSources> {
751 let MemtableRanges { ranges, stats } = mem_ranges;
752 let max_sequence = stats.max_sequence();
753 let mut flat_sources = FlatSources {
754 max_sequence,
755 sources: SmallVec::new(),
756 encoded: SmallVec::new(),
757 };
758
759 if ranges.len() == 1 {
760 let only_range = ranges.into_values().next().unwrap();
761 if let Some(encoded) = only_range.encoded() {
762 flat_sources.encoded.push(encoded);
763 } else {
764 let iter = only_range.build_record_batch_iter(None)?;
765 flat_sources.sources.push(FlatSource::Iter(iter));
766 };
767 } else {
768 let min_flush_rows = stats.num_rows / 8;
769 let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
770 let mut last_iter_rows = 0;
771 let num_ranges = ranges.len();
772 let mut input_iters = Vec::with_capacity(num_ranges);
773 for (_range_id, range) in ranges {
774 if let Some(encoded) = range.encoded() {
775 flat_sources.encoded.push(encoded);
776 continue;
777 }
778
779 let iter = range.build_record_batch_iter(None)?;
780 input_iters.push(iter);
781 last_iter_rows += range.num_rows();
782
783 if last_iter_rows > min_flush_rows {
784 let maybe_dedup = merge_and_dedup(
785 &schema,
786 options,
787 field_column_start,
788 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
789 )?;
790
791 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
792 last_iter_rows = 0;
793 }
794 }
795
796 if !input_iters.is_empty() {
798 let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
799
800 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
801 }
802 }
803
804 Ok(flat_sources)
805}
806
807fn merge_and_dedup(
808 schema: &SchemaRef,
809 options: &RegionOptions,
810 field_column_start: usize,
811 input_iters: Vec<BoxedRecordBatchIterator>,
812) -> Result<BoxedRecordBatchIterator> {
813 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
814 let maybe_dedup = if options.append_mode {
815 Box::new(merge_iter) as _
817 } else {
818 match options.merge_mode() {
820 MergeMode::LastRow => {
821 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
822 }
823 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
824 merge_iter,
825 FlatLastNonNull::new(field_column_start, false),
826 )) as _,
827 }
828 };
829 Ok(maybe_dedup)
830}
831
832pub(crate) struct FlushScheduler {
834 region_status: HashMap<RegionId, FlushStatus>,
836 scheduler: SchedulerRef,
838}
839
840impl FlushScheduler {
841 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
843 FlushScheduler {
844 region_status: HashMap::new(),
845 scheduler,
846 }
847 }
848
849 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
851 self.region_status.contains_key(®ion_id)
852 }
853
854 pub(crate) fn schedule_flush(
856 &mut self,
857 region_id: RegionId,
858 version_control: &VersionControlRef,
859 task: RegionFlushTask,
860 ) -> Result<()> {
861 debug_assert_eq!(region_id, task.region_id);
862
863 let version = version_control.current().version;
864 if version.memtables.is_empty() {
865 debug_assert!(!self.region_status.contains_key(®ion_id));
866 task.on_success();
868 return Ok(());
869 }
870
871 FLUSH_REQUESTS_TOTAL
873 .with_label_values(&[task.reason.as_str()])
874 .inc();
875
876 let flush_status = self
878 .region_status
879 .entry(region_id)
880 .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
881 if flush_status.flushing {
883 flush_status.merge_task(task);
885 return Ok(());
886 }
887
888 if flush_status.pending_task.is_some() {
891 flush_status.merge_task(task);
892 return Ok(());
893 }
894
895 if let Err(e) = version_control.freeze_mutable() {
897 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
898
899 self.region_status.remove(®ion_id);
901 return Err(e);
902 }
903 let job = task.into_flush_job(version_control);
905 if let Err(e) = self.scheduler.schedule(job) {
906 error!(e; "Failed to schedule flush job for region {}", region_id);
909
910 self.region_status.remove(®ion_id);
912 return Err(e);
913 }
914
915 flush_status.flushing = true;
916
917 Ok(())
918 }
919
920 pub(crate) fn on_flush_success(
924 &mut self,
925 region_id: RegionId,
926 ) -> Option<(
927 Vec<SenderDdlRequest>,
928 Vec<SenderWriteRequest>,
929 Vec<SenderBulkRequest>,
930 )> {
931 let flush_status = self.region_status.get_mut(®ion_id)?;
932
933 flush_status.flushing = false;
935
936 let pending_requests = if flush_status.pending_task.is_none() {
937 let flush_status = self.region_status.remove(®ion_id).unwrap();
940 Some((
941 flush_status.pending_ddls,
942 flush_status.pending_writes,
943 flush_status.pending_bulk_writes,
944 ))
945 } else {
946 let version_data = flush_status.version_control.current();
947 if version_data.version.memtables.is_empty() {
948 let task = flush_status.pending_task.take().unwrap();
951 task.on_success();
953 let flush_status = self.region_status.remove(®ion_id).unwrap();
957 Some((
958 flush_status.pending_ddls,
959 flush_status.pending_writes,
960 flush_status.pending_bulk_writes,
961 ))
962 } else {
963 None
965 }
966 };
967
968 if let Err(e) = self.schedule_next_flush() {
970 error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
971 }
972
973 pending_requests
974 }
975
976 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
978 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
979
980 FLUSH_FAILURE_TOTAL.inc();
981
982 let Some(flush_status) = self.region_status.remove(®ion_id) else {
984 return;
985 };
986
987 flush_status.on_failure(err);
989
990 if let Err(e) = self.schedule_next_flush() {
992 error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
993 }
994 }
995
996 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
998 self.remove_region_on_failure(
999 region_id,
1000 Arc::new(RegionDroppedSnafu { region_id }.build()),
1001 );
1002 }
1003
1004 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1006 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1007 }
1008
1009 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1011 self.remove_region_on_failure(
1012 region_id,
1013 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1014 );
1015 }
1016
1017 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1018 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1020 return;
1021 };
1022
1023 flush_status.on_failure(err);
1025 }
1026
1027 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1032 let status = self.region_status.get_mut(&request.region_id).unwrap();
1033 status.pending_ddls.push(request);
1034 }
1035
1036 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1041 let status = self
1042 .region_status
1043 .get_mut(&request.request.region_id)
1044 .unwrap();
1045 status.pending_writes.push(request);
1046 }
1047
1048 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1053 let status = self.region_status.get_mut(&request.region_id).unwrap();
1054 status.pending_bulk_writes.push(request);
1055 }
1056
1057 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1059 self.region_status
1060 .get(®ion_id)
1061 .map(|status| !status.pending_ddls.is_empty())
1062 .unwrap_or(false)
1063 }
1064
1065 pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
1067 debug_assert!(
1068 self.region_status
1069 .values()
1070 .all(|status| status.flushing || status.pending_task.is_some())
1071 );
1072
1073 let Some(flush_status) = self
1075 .region_status
1076 .values_mut()
1077 .find(|status| status.pending_task.is_some())
1078 else {
1079 return Ok(());
1080 };
1081 debug_assert!(!flush_status.flushing);
1082 let task = flush_status.pending_task.take().unwrap();
1083 let region_id = flush_status.region_id;
1084 let version_control = flush_status.version_control.clone();
1085
1086 self.schedule_flush(region_id, &version_control, task)
1087 }
1088}
1089
1090impl Drop for FlushScheduler {
1091 fn drop(&mut self) {
1092 for (region_id, flush_status) in self.region_status.drain() {
1093 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1095 }
1096 }
1097}
1098
1099struct FlushStatus {
1103 region_id: RegionId,
1105 version_control: VersionControlRef,
1107 flushing: bool,
1112 pending_task: Option<RegionFlushTask>,
1114 pending_ddls: Vec<SenderDdlRequest>,
1116 pending_writes: Vec<SenderWriteRequest>,
1118 pending_bulk_writes: Vec<SenderBulkRequest>,
1120}
1121
1122impl FlushStatus {
1123 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1124 FlushStatus {
1125 region_id,
1126 version_control,
1127 flushing: false,
1128 pending_task: None,
1129 pending_ddls: Vec::new(),
1130 pending_writes: Vec::new(),
1131 pending_bulk_writes: Vec::new(),
1132 }
1133 }
1134
1135 fn merge_task(&mut self, task: RegionFlushTask) {
1137 if let Some(pending) = &mut self.pending_task {
1138 pending.merge(task);
1139 } else {
1140 self.pending_task = Some(task);
1141 }
1142 }
1143
1144 fn on_failure(self, err: Arc<Error>) {
1145 if let Some(mut task) = self.pending_task {
1146 task.on_failure(err.clone());
1147 }
1148 for ddl in self.pending_ddls {
1149 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1150 region_id: self.region_id,
1151 }));
1152 }
1153 for write_req in self.pending_writes {
1154 write_req
1155 .sender
1156 .send(Err(err.clone()).context(FlushRegionSnafu {
1157 region_id: self.region_id,
1158 }));
1159 }
1160 }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165 use tokio::sync::oneshot;
1166
1167 use super::*;
1168 use crate::cache::CacheManager;
1169 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1170 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1171 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1172
1173 #[test]
1174 fn test_get_mutable_limit() {
1175 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1176 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1177 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1178 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1179 }
1180
1181 #[test]
1182 fn test_over_mutable_limit() {
1183 let manager = WriteBufferManagerImpl::new(1000);
1185 manager.reserve_mem(400);
1186 assert!(!manager.should_flush_engine());
1187 assert!(!manager.should_stall());
1188
1189 manager.reserve_mem(400);
1191 assert!(manager.should_flush_engine());
1192
1193 manager.schedule_free_mem(400);
1195 assert!(!manager.should_flush_engine());
1196 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1197 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1198
1199 manager.free_mem(400);
1201 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1202 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1203 }
1204
1205 #[test]
1206 fn test_over_global() {
1207 let manager = WriteBufferManagerImpl::new(1000);
1209 manager.reserve_mem(1100);
1210 assert!(manager.should_stall());
1211 manager.schedule_free_mem(200);
1213 assert!(manager.should_flush_engine());
1214
1215 manager.schedule_free_mem(450);
1217 assert!(!manager.should_flush_engine());
1218
1219 manager.reserve_mem(50);
1221 assert!(manager.should_flush_engine());
1222 manager.reserve_mem(100);
1223 assert!(manager.should_flush_engine());
1224 }
1225
1226 #[test]
1227 fn test_manager_notify() {
1228 let (sender, receiver) = watch::channel(());
1229 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1230 manager.reserve_mem(500);
1231 assert!(!receiver.has_changed().unwrap());
1232 manager.schedule_free_mem(500);
1233 assert!(!receiver.has_changed().unwrap());
1234 manager.free_mem(500);
1235 assert!(receiver.has_changed().unwrap());
1236 }
1237
1238 #[tokio::test]
1239 async fn test_schedule_empty() {
1240 let env = SchedulerEnv::new().await;
1241 let (tx, _rx) = mpsc::channel(4);
1242 let mut scheduler = env.mock_flush_scheduler();
1243 let builder = VersionControlBuilder::new();
1244
1245 let version_control = Arc::new(builder.build());
1246 let (output_tx, output_rx) = oneshot::channel();
1247 let mut task = RegionFlushTask {
1248 region_id: builder.region_id(),
1249 reason: FlushReason::Others,
1250 senders: Vec::new(),
1251 request_sender: tx,
1252 access_layer: env.access_layer.clone(),
1253 listener: WorkerListener::default(),
1254 engine_config: Arc::new(MitoConfig::default()),
1255 row_group_size: None,
1256 cache_manager: Arc::new(CacheManager::default()),
1257 manifest_ctx: env
1258 .mock_manifest_context(version_control.current().version.metadata.clone())
1259 .await,
1260 index_options: IndexOptions::default(),
1261 flush_semaphore: Arc::new(Semaphore::new(2)),
1262 };
1263 task.push_sender(OptionOutputTx::from(output_tx));
1264 scheduler
1265 .schedule_flush(builder.region_id(), &version_control, task)
1266 .unwrap();
1267 assert!(scheduler.region_status.is_empty());
1268 let output = output_rx.await.unwrap().unwrap();
1269 assert_eq!(output, 0);
1270 assert!(scheduler.region_status.is_empty());
1271 }
1272
1273 #[tokio::test]
1274 async fn test_schedule_pending_request() {
1275 let job_scheduler = Arc::new(VecScheduler::default());
1276 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1277 let (tx, _rx) = mpsc::channel(4);
1278 let mut scheduler = env.mock_flush_scheduler();
1279 let mut builder = VersionControlBuilder::new();
1280 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1282 let version_control = Arc::new(builder.build());
1283 let version_data = version_control.current();
1285 write_rows_to_version(&version_data.version, "host0", 0, 10);
1286 let manifest_ctx = env
1287 .mock_manifest_context(version_data.version.metadata.clone())
1288 .await;
1289 let mut tasks: Vec<_> = (0..3)
1291 .map(|_| RegionFlushTask {
1292 region_id: builder.region_id(),
1293 reason: FlushReason::Others,
1294 senders: Vec::new(),
1295 request_sender: tx.clone(),
1296 access_layer: env.access_layer.clone(),
1297 listener: WorkerListener::default(),
1298 engine_config: Arc::new(MitoConfig::default()),
1299 row_group_size: None,
1300 cache_manager: Arc::new(CacheManager::default()),
1301 manifest_ctx: manifest_ctx.clone(),
1302 index_options: IndexOptions::default(),
1303 flush_semaphore: Arc::new(Semaphore::new(2)),
1304 })
1305 .collect();
1306 let task = tasks.pop().unwrap();
1308 scheduler
1309 .schedule_flush(builder.region_id(), &version_control, task)
1310 .unwrap();
1311 assert_eq!(1, scheduler.region_status.len());
1313 assert_eq!(1, job_scheduler.num_jobs());
1314 let version_data = version_control.current();
1316 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1317 let output_rxs: Vec<_> = tasks
1319 .into_iter()
1320 .map(|mut task| {
1321 let (output_tx, output_rx) = oneshot::channel();
1322 task.push_sender(OptionOutputTx::from(output_tx));
1323 scheduler
1324 .schedule_flush(builder.region_id(), &version_control, task)
1325 .unwrap();
1326 output_rx
1327 })
1328 .collect();
1329 version_control.apply_edit(
1331 Some(RegionEdit {
1332 files_to_add: Vec::new(),
1333 files_to_remove: Vec::new(),
1334 timestamp_ms: None,
1335 compaction_time_window: None,
1336 flushed_entry_id: None,
1337 flushed_sequence: None,
1338 committed_sequence: None,
1339 }),
1340 &[0],
1341 builder.file_purger(),
1342 );
1343 scheduler.on_flush_success(builder.region_id());
1344 assert_eq!(1, job_scheduler.num_jobs());
1346 assert!(scheduler.region_status.is_empty());
1348 for output_rx in output_rxs {
1349 let output = output_rx.await.unwrap().unwrap();
1350 assert_eq!(output, 0);
1351 }
1352 }
1353}