1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, trace};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39 Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44 BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48 INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
58use crate::request::{
59 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72 fn should_flush_engine(&self) -> bool;
74
75 fn should_stall(&self) -> bool;
77
78 fn reserve_mem(&self, mem: usize);
80
81 fn schedule_free_mem(&self, mem: usize);
86
87 fn free_mem(&self, mem: usize);
89
90 fn memory_usage(&self) -> usize;
92}
93
94pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
95
96#[derive(Debug)]
101pub struct WriteBufferManagerImpl {
102 global_write_buffer_size: usize,
104 mutable_limit: usize,
106 memory_used: AtomicUsize,
108 memory_active: AtomicUsize,
110 notifier: Option<watch::Sender<()>>,
113}
114
115impl WriteBufferManagerImpl {
116 pub fn new(global_write_buffer_size: usize) -> Self {
118 Self {
119 global_write_buffer_size,
120 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
121 memory_used: AtomicUsize::new(0),
122 memory_active: AtomicUsize::new(0),
123 notifier: None,
124 }
125 }
126
127 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
129 self.notifier = Some(notifier);
130 self
131 }
132
133 pub fn mutable_usage(&self) -> usize {
135 self.memory_active.load(Ordering::Relaxed)
136 }
137
138 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
140 global_write_buffer_size / 2
142 }
143}
144
145impl WriteBufferManager for WriteBufferManagerImpl {
146 fn should_flush_engine(&self) -> bool {
147 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
148 if mutable_memtable_memory_usage > self.mutable_limit {
149 debug!(
150 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
151 mutable_memtable_memory_usage,
152 self.memory_usage(),
153 self.mutable_limit,
154 self.global_write_buffer_size,
155 );
156 return true;
157 }
158
159 let memory_usage = self.memory_used.load(Ordering::Relaxed);
160 if memory_usage >= self.global_write_buffer_size {
164 if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
165 debug!(
166 "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
167 mutable_usage: {}.",
168 memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
169 );
170 return true;
171 } else {
172 trace!(
173 "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
174 memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
175 );
176 }
177 }
178
179 false
180 }
181
182 fn should_stall(&self) -> bool {
183 self.memory_usage() >= self.global_write_buffer_size
184 }
185
186 fn reserve_mem(&self, mem: usize) {
187 self.memory_used.fetch_add(mem, Ordering::Relaxed);
188 self.memory_active.fetch_add(mem, Ordering::Relaxed);
189 }
190
191 fn schedule_free_mem(&self, mem: usize) {
192 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
193 }
194
195 fn free_mem(&self, mem: usize) {
196 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
197 if let Some(notifier) = &self.notifier {
198 let _ = notifier.send(());
202 }
203 }
204
205 fn memory_usage(&self) -> usize {
206 self.memory_used.load(Ordering::Relaxed)
207 }
208}
209
210#[derive(Debug, IntoStaticStr)]
212pub enum FlushReason {
213 Others,
215 EngineFull,
217 Manual,
219 Alter,
221 Periodically,
223 Downgrading,
225}
226
227impl FlushReason {
228 fn as_str(&self) -> &'static str {
230 self.into()
231 }
232}
233
234pub(crate) struct RegionFlushTask {
236 pub(crate) region_id: RegionId,
238 pub(crate) reason: FlushReason,
240 pub(crate) senders: Vec<OutputTx>,
242 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
244
245 pub(crate) access_layer: AccessLayerRef,
246 pub(crate) listener: WorkerListener,
247 pub(crate) engine_config: Arc<MitoConfig>,
248 pub(crate) row_group_size: Option<usize>,
249 pub(crate) cache_manager: CacheManagerRef,
250 pub(crate) manifest_ctx: ManifestContextRef,
251
252 pub(crate) index_options: IndexOptions,
254 pub(crate) flush_semaphore: Arc<Semaphore>,
256}
257
258impl RegionFlushTask {
259 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
261 if let Some(sender) = sender.take_inner() {
262 self.senders.push(sender);
263 }
264 }
265
266 fn on_success(self) {
268 for sender in self.senders {
269 sender.send(Ok(0));
270 }
271 }
272
273 fn on_failure(&mut self, err: Arc<Error>) {
275 for sender in self.senders.drain(..) {
276 sender.send(Err(err.clone()).context(FlushRegionSnafu {
277 region_id: self.region_id,
278 }));
279 }
280 }
281
282 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
286 let version_data = version_control.current();
289
290 Box::pin(async move {
291 INFLIGHT_FLUSH_COUNT.inc();
292 self.do_flush(version_data).await;
293 INFLIGHT_FLUSH_COUNT.dec();
294 })
295 }
296
297 async fn do_flush(&mut self, version_data: VersionControlData) {
299 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
300 self.listener.on_flush_begin(self.region_id).await;
301
302 let worker_request = match self.flush_memtables(&version_data).await {
303 Ok(edit) => {
304 let memtables_to_remove = version_data
305 .version
306 .memtables
307 .immutables()
308 .iter()
309 .map(|m| m.id())
310 .collect();
311 let flush_finished = FlushFinished {
312 region_id: self.region_id,
313 flushed_entry_id: version_data.last_entry_id,
315 senders: std::mem::take(&mut self.senders),
316 _timer: timer,
317 edit,
318 memtables_to_remove,
319 };
320 WorkerRequest::Background {
321 region_id: self.region_id,
322 notify: BackgroundNotify::FlushFinished(flush_finished),
323 }
324 }
325 Err(e) => {
326 error!(e; "Failed to flush region {}", self.region_id);
327 timer.stop_and_discard();
329
330 let err = Arc::new(e);
331 self.on_failure(err.clone());
332 WorkerRequest::Background {
333 region_id: self.region_id,
334 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
335 }
336 }
337 };
338 self.send_worker_request(worker_request).await;
339 }
340
341 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
344 let version = &version_data.version;
347 let timer = FLUSH_ELAPSED
348 .with_label_values(&["flush_memtables"])
349 .start_timer();
350
351 let mut write_opts = WriteOptions {
352 write_buffer_size: self.engine_config.sst_write_buffer_size,
353 ..Default::default()
354 };
355 if let Some(row_group_size) = self.row_group_size {
356 write_opts.row_group_size = row_group_size;
357 }
358
359 let DoFlushMemtablesResult {
360 file_metas,
361 flushed_bytes,
362 series_count,
363 flush_metrics,
364 } = self.do_flush_memtables(version, write_opts).await?;
365
366 if !file_metas.is_empty() {
367 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
368 }
369
370 let mut file_ids = Vec::with_capacity(file_metas.len());
371 let mut total_rows = 0;
372 let mut total_bytes = 0;
373 for meta in &file_metas {
374 file_ids.push(meta.file_id);
375 total_rows += meta.num_rows;
376 total_bytes += meta.file_size;
377 }
378 info!(
379 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
380 self.region_id,
381 self.reason.as_str(),
382 file_ids,
383 series_count,
384 total_rows,
385 total_bytes,
386 timer.stop_and_record(),
387 flush_metrics,
388 );
389 flush_metrics.observe();
390
391 let edit = RegionEdit {
392 files_to_add: file_metas,
393 files_to_remove: Vec::new(),
394 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
395 compaction_time_window: None,
396 flushed_entry_id: Some(version_data.last_entry_id),
398 flushed_sequence: Some(version_data.committed_sequence),
399 committed_sequence: None,
400 };
401 info!("Applying {edit:?} to region {}", self.region_id);
402
403 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
404
405 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
406 RegionLeaderState::Downgrading
407 } else {
408 let current_state = self.manifest_ctx.current_state();
410 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
411 RegionLeaderState::Staging
412 } else {
413 RegionLeaderState::Writable
414 }
415 };
416 let version = self
419 .manifest_ctx
420 .update_manifest(expected_state, action_list)
421 .await?;
422 info!(
423 "Successfully update manifest version to {version}, region: {}, reason: {}",
424 self.region_id,
425 self.reason.as_str()
426 );
427
428 Ok(edit)
429 }
430
431 async fn do_flush_memtables(
432 &self,
433 version: &VersionRef,
434 write_opts: WriteOptions,
435 ) -> Result<DoFlushMemtablesResult> {
436 let memtables = version.memtables.immutables();
437 let mut file_metas = Vec::with_capacity(memtables.len());
438 let mut flushed_bytes = 0;
439 let mut series_count = 0;
440 let partition_expr = match &version.metadata.partition_expr {
442 None => None,
443 Some(json_expr) if json_expr.is_empty() => None,
444 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
445 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
446 };
447 let mut flush_metrics = Metrics::new(WriteType::Flush);
448 for mem in memtables {
449 if mem.is_empty() {
450 continue;
452 }
453
454 let compact_start = std::time::Instant::now();
456 if let Err(e) = mem.compact(true) {
457 common_telemetry::error!(e; "Failed to compact memtable before flush");
458 }
459 let compact_cost = compact_start.elapsed();
460 flush_metrics.compact_memtable += compact_cost;
461
462 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
464 let num_mem_ranges = mem_ranges.ranges.len();
465 let num_mem_rows = mem_ranges.stats.num_rows();
466 let memtable_id = mem.id();
467 series_count += mem_ranges.stats.series_count();
470
471 if mem_ranges.is_record_batch() {
472 let flush_start = Instant::now();
473 let FlushFlatMemResult {
474 num_encoded,
475 max_sequence,
476 num_sources,
477 results,
478 } = self
479 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
480 .await?;
481 for (source_idx, result) in results.into_iter().enumerate() {
482 let (ssts_written, metrics) = result?;
483 if ssts_written.is_empty() {
484 continue;
486 }
487
488 common_telemetry::debug!(
489 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
490 self.region_id,
491 memtable_id,
492 source_idx,
493 num_sources,
494 metrics
495 );
496
497 flush_metrics = flush_metrics.merge(metrics);
498
499 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
500 flushed_bytes += sst_info.file_size;
501 Self::new_file_meta(
502 self.region_id,
503 max_sequence,
504 sst_info,
505 partition_expr.clone(),
506 )
507 }));
508 }
509
510 common_telemetry::debug!(
511 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
512 self.region_id,
513 num_sources,
514 memtable_id,
515 num_mem_ranges,
516 num_encoded,
517 num_mem_rows,
518 flush_start.elapsed(),
519 compact_cost,
520 );
521 } else {
522 let max_sequence = mem_ranges.stats.max_sequence();
523 let source = memtable_source(mem_ranges, &version.options).await?;
524
525 let source = Either::Left(source);
527 let write_request = self.new_write_request(version, max_sequence, source);
528
529 let mut metrics = Metrics::new(WriteType::Flush);
530 let ssts_written = self
531 .access_layer
532 .write_sst(write_request, &write_opts, &mut metrics)
533 .await?;
534 if ssts_written.is_empty() {
535 continue;
537 }
538
539 debug!(
540 "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
541 self.region_id, num_mem_ranges, num_mem_rows, metrics
542 );
543
544 flush_metrics = flush_metrics.merge(metrics);
545
546 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
547 flushed_bytes += sst_info.file_size;
548 Self::new_file_meta(
549 self.region_id,
550 max_sequence,
551 sst_info,
552 partition_expr.clone(),
553 )
554 }));
555 };
556 }
557
558 Ok(DoFlushMemtablesResult {
559 file_metas,
560 flushed_bytes,
561 series_count,
562 flush_metrics,
563 })
564 }
565
566 async fn flush_flat_mem_ranges(
567 &self,
568 version: &VersionRef,
569 write_opts: &WriteOptions,
570 mem_ranges: MemtableRanges,
571 ) -> Result<FlushFlatMemResult> {
572 let batch_schema = to_flat_sst_arrow_schema(
573 &version.metadata,
574 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
575 );
576 let flat_sources = memtable_flat_sources(
577 batch_schema,
578 mem_ranges,
579 &version.options,
580 version.metadata.primary_key.len(),
581 )?;
582 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
583 let num_encoded = flat_sources.encoded.len();
584 let max_sequence = flat_sources.max_sequence;
585 for source in flat_sources.sources {
586 let source = Either::Right(source);
587 let write_request = self.new_write_request(version, max_sequence, source);
588 let access_layer = self.access_layer.clone();
589 let write_opts = write_opts.clone();
590 let semaphore = self.flush_semaphore.clone();
591 let task = common_runtime::spawn_global(async move {
592 let _permit = semaphore.acquire().await.unwrap();
593 let mut metrics = Metrics::new(WriteType::Flush);
594 let ssts = access_layer
595 .write_sst(write_request, &write_opts, &mut metrics)
596 .await?;
597 Ok((ssts, metrics))
598 });
599 tasks.push(task);
600 }
601 for encoded in flat_sources.encoded {
602 let access_layer = self.access_layer.clone();
603 let cache_manager = self.cache_manager.clone();
604 let region_id = version.metadata.region_id;
605 let semaphore = self.flush_semaphore.clone();
606 let task = common_runtime::spawn_global(async move {
607 let _permit = semaphore.acquire().await.unwrap();
608 let metrics = access_layer
609 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
610 .await?;
611 Ok((smallvec![encoded.sst_info], metrics))
612 });
613 tasks.push(task);
614 }
615 let num_sources = tasks.len();
616 let results = futures::future::try_join_all(tasks)
617 .await
618 .context(JoinSnafu)?;
619 Ok(FlushFlatMemResult {
620 num_encoded,
621 max_sequence,
622 num_sources,
623 results,
624 })
625 }
626
627 fn new_file_meta(
628 region_id: RegionId,
629 max_sequence: u64,
630 sst_info: SstInfo,
631 partition_expr: Option<PartitionExpr>,
632 ) -> FileMeta {
633 FileMeta {
634 region_id,
635 file_id: sst_info.file_id,
636 time_range: sst_info.time_range,
637 level: 0,
638 file_size: sst_info.file_size,
639 available_indexes: sst_info.index_metadata.build_available_indexes(),
640 index_file_size: sst_info.index_metadata.file_size,
641 index_file_id: None,
642 num_rows: sst_info.num_rows as u64,
643 num_row_groups: sst_info.num_row_groups,
644 sequence: NonZeroU64::new(max_sequence),
645 partition_expr,
646 num_series: sst_info.num_series,
647 }
648 }
649
650 fn new_write_request(
651 &self,
652 version: &VersionRef,
653 max_sequence: u64,
654 source: Either<Source, FlatSource>,
655 ) -> SstWriteRequest {
656 SstWriteRequest {
657 op_type: OperationType::Flush,
658 metadata: version.metadata.clone(),
659 source,
660 cache_manager: self.cache_manager.clone(),
661 storage: version.options.storage.clone(),
662 max_sequence: Some(max_sequence),
663 index_options: self.index_options.clone(),
664 index_config: self.engine_config.index.clone(),
665 inverted_index_config: self.engine_config.inverted_index.clone(),
666 fulltext_index_config: self.engine_config.fulltext_index.clone(),
667 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
668 }
669 }
670
671 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
673 if let Err(e) = self
674 .request_sender
675 .send(WorkerRequestWithTime::new(request))
676 .await
677 {
678 error!(
679 "Failed to notify flush job status for region {}, request: {:?}",
680 self.region_id, e.0
681 );
682 }
683 }
684
685 fn merge(&mut self, mut other: RegionFlushTask) {
687 assert_eq!(self.region_id, other.region_id);
688 self.senders.append(&mut other.senders);
690 }
691}
692
693struct FlushFlatMemResult {
694 num_encoded: usize,
695 max_sequence: u64,
696 num_sources: usize,
697 results: Vec<Result<(SstInfoArray, Metrics)>>,
698}
699
700struct DoFlushMemtablesResult {
701 file_metas: Vec<FileMeta>,
702 flushed_bytes: u64,
703 series_count: usize,
704 flush_metrics: Metrics,
705}
706
707async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
709 let source = if mem_ranges.ranges.len() == 1 {
710 let only_range = mem_ranges.ranges.into_values().next().unwrap();
711 let iter = only_range.build_iter()?;
712 Source::Iter(iter)
713 } else {
714 let sources = mem_ranges
716 .ranges
717 .into_values()
718 .map(|r| r.build_iter().map(Source::Iter))
719 .collect::<Result<Vec<_>>>()?;
720 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
721 let maybe_dedup = if options.append_mode {
722 Box::new(merge_reader) as _
724 } else {
725 match options.merge_mode.unwrap_or(MergeMode::LastRow) {
727 MergeMode::LastRow => {
728 Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
729 }
730 MergeMode::LastNonNull => {
731 Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
732 }
733 }
734 };
735 Source::Reader(maybe_dedup)
736 };
737 Ok(source)
738}
739
740struct FlatSources {
741 max_sequence: u64,
742 sources: SmallVec<[FlatSource; 4]>,
743 encoded: SmallVec<[EncodedRange; 4]>,
744}
745
746fn memtable_flat_sources(
749 schema: SchemaRef,
750 mem_ranges: MemtableRanges,
751 options: &RegionOptions,
752 field_column_start: usize,
753) -> Result<FlatSources> {
754 let MemtableRanges { ranges, stats } = mem_ranges;
755 let max_sequence = stats.max_sequence();
756 let mut flat_sources = FlatSources {
757 max_sequence,
758 sources: SmallVec::new(),
759 encoded: SmallVec::new(),
760 };
761
762 if ranges.len() == 1 {
763 let only_range = ranges.into_values().next().unwrap();
764 if let Some(encoded) = only_range.encoded() {
765 flat_sources.encoded.push(encoded);
766 } else {
767 let iter = only_range.build_record_batch_iter(None)?;
768 flat_sources.sources.push(FlatSource::Iter(iter));
769 };
770 } else {
771 let min_flush_rows = stats.num_rows / 8;
772 let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
773 let mut last_iter_rows = 0;
774 let num_ranges = ranges.len();
775 let mut input_iters = Vec::with_capacity(num_ranges);
776 for (_range_id, range) in ranges {
777 if let Some(encoded) = range.encoded() {
778 flat_sources.encoded.push(encoded);
779 continue;
780 }
781
782 let iter = range.build_record_batch_iter(None)?;
783 input_iters.push(iter);
784 last_iter_rows += range.num_rows();
785
786 if last_iter_rows > min_flush_rows {
787 let maybe_dedup = merge_and_dedup(
788 &schema,
789 options,
790 field_column_start,
791 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
792 )?;
793
794 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
795 last_iter_rows = 0;
796 }
797 }
798
799 if !input_iters.is_empty() {
801 let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
802
803 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
804 }
805 }
806
807 Ok(flat_sources)
808}
809
810fn merge_and_dedup(
811 schema: &SchemaRef,
812 options: &RegionOptions,
813 field_column_start: usize,
814 input_iters: Vec<BoxedRecordBatchIterator>,
815) -> Result<BoxedRecordBatchIterator> {
816 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
817 let maybe_dedup = if options.append_mode {
818 Box::new(merge_iter) as _
820 } else {
821 match options.merge_mode() {
823 MergeMode::LastRow => {
824 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
825 }
826 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
827 merge_iter,
828 FlatLastNonNull::new(field_column_start, false),
829 )) as _,
830 }
831 };
832 Ok(maybe_dedup)
833}
834
835pub(crate) struct FlushScheduler {
837 region_status: HashMap<RegionId, FlushStatus>,
839 scheduler: SchedulerRef,
841}
842
843impl FlushScheduler {
844 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
846 FlushScheduler {
847 region_status: HashMap::new(),
848 scheduler,
849 }
850 }
851
852 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
854 self.region_status.contains_key(®ion_id)
855 }
856
857 pub(crate) fn schedule_flush(
859 &mut self,
860 region_id: RegionId,
861 version_control: &VersionControlRef,
862 task: RegionFlushTask,
863 ) -> Result<()> {
864 debug_assert_eq!(region_id, task.region_id);
865
866 let version = version_control.current().version;
867 if version.memtables.is_empty() {
868 debug_assert!(!self.region_status.contains_key(®ion_id));
869 task.on_success();
871 return Ok(());
872 }
873
874 FLUSH_REQUESTS_TOTAL
876 .with_label_values(&[task.reason.as_str()])
877 .inc();
878
879 let flush_status = self
881 .region_status
882 .entry(region_id)
883 .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
884 if flush_status.flushing {
886 flush_status.merge_task(task);
888 return Ok(());
889 }
890
891 if flush_status.pending_task.is_some() {
894 flush_status.merge_task(task);
895 return Ok(());
896 }
897
898 if let Err(e) = version_control.freeze_mutable() {
900 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
901
902 self.region_status.remove(®ion_id);
904 return Err(e);
905 }
906 let job = task.into_flush_job(version_control);
908 if let Err(e) = self.scheduler.schedule(job) {
909 error!(e; "Failed to schedule flush job for region {}", region_id);
912
913 self.region_status.remove(®ion_id);
915 return Err(e);
916 }
917
918 flush_status.flushing = true;
919
920 Ok(())
921 }
922
923 pub(crate) fn on_flush_success(
927 &mut self,
928 region_id: RegionId,
929 ) -> Option<(
930 Vec<SenderDdlRequest>,
931 Vec<SenderWriteRequest>,
932 Vec<SenderBulkRequest>,
933 )> {
934 let flush_status = self.region_status.get_mut(®ion_id)?;
935
936 flush_status.flushing = false;
938
939 let pending_requests = if flush_status.pending_task.is_none() {
940 let flush_status = self.region_status.remove(®ion_id).unwrap();
943 Some((
944 flush_status.pending_ddls,
945 flush_status.pending_writes,
946 flush_status.pending_bulk_writes,
947 ))
948 } else {
949 let version_data = flush_status.version_control.current();
950 if version_data.version.memtables.is_empty() {
951 let task = flush_status.pending_task.take().unwrap();
954 task.on_success();
956 let flush_status = self.region_status.remove(®ion_id).unwrap();
960 Some((
961 flush_status.pending_ddls,
962 flush_status.pending_writes,
963 flush_status.pending_bulk_writes,
964 ))
965 } else {
966 None
968 }
969 };
970
971 if let Err(e) = self.schedule_next_flush() {
973 error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
974 }
975
976 pending_requests
977 }
978
979 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
981 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
982
983 FLUSH_FAILURE_TOTAL.inc();
984
985 let Some(flush_status) = self.region_status.remove(®ion_id) else {
987 return;
988 };
989
990 flush_status.on_failure(err);
992
993 if let Err(e) = self.schedule_next_flush() {
995 error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
996 }
997 }
998
999 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1001 self.remove_region_on_failure(
1002 region_id,
1003 Arc::new(RegionDroppedSnafu { region_id }.build()),
1004 );
1005 }
1006
1007 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1009 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1010 }
1011
1012 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1014 self.remove_region_on_failure(
1015 region_id,
1016 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1017 );
1018 }
1019
1020 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1021 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1023 return;
1024 };
1025
1026 flush_status.on_failure(err);
1028 }
1029
1030 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1035 let status = self.region_status.get_mut(&request.region_id).unwrap();
1036 status.pending_ddls.push(request);
1037 }
1038
1039 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1044 let status = self
1045 .region_status
1046 .get_mut(&request.request.region_id)
1047 .unwrap();
1048 status.pending_writes.push(request);
1049 }
1050
1051 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1056 let status = self.region_status.get_mut(&request.region_id).unwrap();
1057 status.pending_bulk_writes.push(request);
1058 }
1059
1060 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1062 self.region_status
1063 .get(®ion_id)
1064 .map(|status| !status.pending_ddls.is_empty())
1065 .unwrap_or(false)
1066 }
1067
1068 pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
1070 debug_assert!(
1071 self.region_status
1072 .values()
1073 .all(|status| status.flushing || status.pending_task.is_some())
1074 );
1075
1076 let Some(flush_status) = self
1078 .region_status
1079 .values_mut()
1080 .find(|status| status.pending_task.is_some())
1081 else {
1082 return Ok(());
1083 };
1084 debug_assert!(!flush_status.flushing);
1085 let task = flush_status.pending_task.take().unwrap();
1086 let region_id = flush_status.region_id;
1087 let version_control = flush_status.version_control.clone();
1088
1089 self.schedule_flush(region_id, &version_control, task)
1090 }
1091}
1092
1093impl Drop for FlushScheduler {
1094 fn drop(&mut self) {
1095 for (region_id, flush_status) in self.region_status.drain() {
1096 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1098 }
1099 }
1100}
1101
1102struct FlushStatus {
1106 region_id: RegionId,
1108 version_control: VersionControlRef,
1110 flushing: bool,
1115 pending_task: Option<RegionFlushTask>,
1117 pending_ddls: Vec<SenderDdlRequest>,
1119 pending_writes: Vec<SenderWriteRequest>,
1121 pending_bulk_writes: Vec<SenderBulkRequest>,
1123}
1124
1125impl FlushStatus {
1126 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1127 FlushStatus {
1128 region_id,
1129 version_control,
1130 flushing: false,
1131 pending_task: None,
1132 pending_ddls: Vec::new(),
1133 pending_writes: Vec::new(),
1134 pending_bulk_writes: Vec::new(),
1135 }
1136 }
1137
1138 fn merge_task(&mut self, task: RegionFlushTask) {
1140 if let Some(pending) = &mut self.pending_task {
1141 pending.merge(task);
1142 } else {
1143 self.pending_task = Some(task);
1144 }
1145 }
1146
1147 fn on_failure(self, err: Arc<Error>) {
1148 if let Some(mut task) = self.pending_task {
1149 task.on_failure(err.clone());
1150 }
1151 for ddl in self.pending_ddls {
1152 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1153 region_id: self.region_id,
1154 }));
1155 }
1156 for write_req in self.pending_writes {
1157 write_req
1158 .sender
1159 .send(Err(err.clone()).context(FlushRegionSnafu {
1160 region_id: self.region_id,
1161 }));
1162 }
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use tokio::sync::oneshot;
1169
1170 use super::*;
1171 use crate::cache::CacheManager;
1172 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1173 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1174 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1175
1176 #[test]
1177 fn test_get_mutable_limit() {
1178 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1179 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1180 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1181 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1182 }
1183
1184 #[test]
1185 fn test_over_mutable_limit() {
1186 let manager = WriteBufferManagerImpl::new(1000);
1188 manager.reserve_mem(400);
1189 assert!(!manager.should_flush_engine());
1190 assert!(!manager.should_stall());
1191
1192 manager.reserve_mem(400);
1194 assert!(manager.should_flush_engine());
1195
1196 manager.schedule_free_mem(400);
1198 assert!(!manager.should_flush_engine());
1199 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1200 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1201
1202 manager.free_mem(400);
1204 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1205 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1206 }
1207
1208 #[test]
1209 fn test_over_global() {
1210 let manager = WriteBufferManagerImpl::new(1000);
1212 manager.reserve_mem(1100);
1213 assert!(manager.should_stall());
1214 manager.schedule_free_mem(200);
1216 assert!(manager.should_flush_engine());
1217
1218 manager.schedule_free_mem(450);
1220 assert!(!manager.should_flush_engine());
1221
1222 manager.reserve_mem(50);
1224 assert!(manager.should_flush_engine());
1225 manager.reserve_mem(100);
1226 assert!(manager.should_flush_engine());
1227 }
1228
1229 #[test]
1230 fn test_manager_notify() {
1231 let (sender, receiver) = watch::channel(());
1232 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1233 manager.reserve_mem(500);
1234 assert!(!receiver.has_changed().unwrap());
1235 manager.schedule_free_mem(500);
1236 assert!(!receiver.has_changed().unwrap());
1237 manager.free_mem(500);
1238 assert!(receiver.has_changed().unwrap());
1239 }
1240
1241 #[tokio::test]
1242 async fn test_schedule_empty() {
1243 let env = SchedulerEnv::new().await;
1244 let (tx, _rx) = mpsc::channel(4);
1245 let mut scheduler = env.mock_flush_scheduler();
1246 let builder = VersionControlBuilder::new();
1247
1248 let version_control = Arc::new(builder.build());
1249 let (output_tx, output_rx) = oneshot::channel();
1250 let mut task = RegionFlushTask {
1251 region_id: builder.region_id(),
1252 reason: FlushReason::Others,
1253 senders: Vec::new(),
1254 request_sender: tx,
1255 access_layer: env.access_layer.clone(),
1256 listener: WorkerListener::default(),
1257 engine_config: Arc::new(MitoConfig::default()),
1258 row_group_size: None,
1259 cache_manager: Arc::new(CacheManager::default()),
1260 manifest_ctx: env
1261 .mock_manifest_context(version_control.current().version.metadata.clone())
1262 .await,
1263 index_options: IndexOptions::default(),
1264 flush_semaphore: Arc::new(Semaphore::new(2)),
1265 };
1266 task.push_sender(OptionOutputTx::from(output_tx));
1267 scheduler
1268 .schedule_flush(builder.region_id(), &version_control, task)
1269 .unwrap();
1270 assert!(scheduler.region_status.is_empty());
1271 let output = output_rx.await.unwrap().unwrap();
1272 assert_eq!(output, 0);
1273 assert!(scheduler.region_status.is_empty());
1274 }
1275
1276 #[tokio::test]
1277 async fn test_schedule_pending_request() {
1278 let job_scheduler = Arc::new(VecScheduler::default());
1279 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1280 let (tx, _rx) = mpsc::channel(4);
1281 let mut scheduler = env.mock_flush_scheduler();
1282 let mut builder = VersionControlBuilder::new();
1283 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1285 let version_control = Arc::new(builder.build());
1286 let version_data = version_control.current();
1288 write_rows_to_version(&version_data.version, "host0", 0, 10);
1289 let manifest_ctx = env
1290 .mock_manifest_context(version_data.version.metadata.clone())
1291 .await;
1292 let mut tasks: Vec<_> = (0..3)
1294 .map(|_| RegionFlushTask {
1295 region_id: builder.region_id(),
1296 reason: FlushReason::Others,
1297 senders: Vec::new(),
1298 request_sender: tx.clone(),
1299 access_layer: env.access_layer.clone(),
1300 listener: WorkerListener::default(),
1301 engine_config: Arc::new(MitoConfig::default()),
1302 row_group_size: None,
1303 cache_manager: Arc::new(CacheManager::default()),
1304 manifest_ctx: manifest_ctx.clone(),
1305 index_options: IndexOptions::default(),
1306 flush_semaphore: Arc::new(Semaphore::new(2)),
1307 })
1308 .collect();
1309 let task = tasks.pop().unwrap();
1311 scheduler
1312 .schedule_flush(builder.region_id(), &version_control, task)
1313 .unwrap();
1314 assert_eq!(1, scheduler.region_status.len());
1316 assert_eq!(1, job_scheduler.num_jobs());
1317 let version_data = version_control.current();
1319 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1320 let output_rxs: Vec<_> = tasks
1322 .into_iter()
1323 .map(|mut task| {
1324 let (output_tx, output_rx) = oneshot::channel();
1325 task.push_sender(OptionOutputTx::from(output_tx));
1326 scheduler
1327 .schedule_flush(builder.region_id(), &version_control, task)
1328 .unwrap();
1329 output_rx
1330 })
1331 .collect();
1332 version_control.apply_edit(
1334 Some(RegionEdit {
1335 files_to_add: Vec::new(),
1336 files_to_remove: Vec::new(),
1337 timestamp_ms: None,
1338 compaction_time_window: None,
1339 flushed_entry_id: None,
1340 flushed_sequence: None,
1341 committed_sequence: None,
1342 }),
1343 &[0],
1344 builder.file_purger(),
1345 );
1346 scheduler.on_flush_success(builder.region_id());
1347 assert_eq!(1, job_scheduler.num_jobs());
1349 assert!(scheduler.region_status.is_empty());
1351 for output_rx in output_rxs {
1352 let output = output_rx.await.unwrap().unwrap();
1353 assert_eq!(output, 0);
1354 }
1355 }
1356}