1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39 Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44 BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48 INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
58use crate::request::{
59 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72 fn should_flush_engine(&self) -> bool;
74
75 fn should_stall(&self) -> bool;
77
78 fn reserve_mem(&self, mem: usize);
80
81 fn schedule_free_mem(&self, mem: usize);
86
87 fn free_mem(&self, mem: usize);
89
90 fn memory_usage(&self) -> usize;
92
93 fn flush_limit(&self) -> usize;
98}
99
100pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
101
102#[derive(Debug)]
107pub struct WriteBufferManagerImpl {
108 global_write_buffer_size: usize,
110 mutable_limit: usize,
112 memory_used: AtomicUsize,
114 memory_active: AtomicUsize,
116 notifier: Option<watch::Sender<()>>,
119}
120
121impl WriteBufferManagerImpl {
122 pub fn new(global_write_buffer_size: usize) -> Self {
124 Self {
125 global_write_buffer_size,
126 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
127 memory_used: AtomicUsize::new(0),
128 memory_active: AtomicUsize::new(0),
129 notifier: None,
130 }
131 }
132
133 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
135 self.notifier = Some(notifier);
136 self
137 }
138
139 pub fn mutable_usage(&self) -> usize {
141 self.memory_active.load(Ordering::Relaxed)
142 }
143
144 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
146 global_write_buffer_size / 2
148 }
149}
150
151impl WriteBufferManager for WriteBufferManagerImpl {
152 fn should_flush_engine(&self) -> bool {
153 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
154 if mutable_memtable_memory_usage >= self.mutable_limit {
155 debug!(
156 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
157 mutable_memtable_memory_usage,
158 self.memory_usage(),
159 self.mutable_limit,
160 self.global_write_buffer_size,
161 );
162 return true;
163 }
164
165 let memory_usage = self.memory_used.load(Ordering::Relaxed);
166 if memory_usage >= self.global_write_buffer_size {
167 return true;
168 }
169
170 false
171 }
172
173 fn should_stall(&self) -> bool {
174 self.memory_usage() >= self.global_write_buffer_size
175 }
176
177 fn reserve_mem(&self, mem: usize) {
178 self.memory_used.fetch_add(mem, Ordering::Relaxed);
179 self.memory_active.fetch_add(mem, Ordering::Relaxed);
180 }
181
182 fn schedule_free_mem(&self, mem: usize) {
183 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
184 }
185
186 fn free_mem(&self, mem: usize) {
187 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
188 if let Some(notifier) = &self.notifier {
189 let _ = notifier.send(());
193 }
194 }
195
196 fn memory_usage(&self) -> usize {
197 self.memory_used.load(Ordering::Relaxed)
198 }
199
200 fn flush_limit(&self) -> usize {
201 self.mutable_limit
202 }
203}
204
205#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
207pub enum FlushReason {
208 Others,
210 EngineFull,
212 Manual,
214 Alter,
216 Periodically,
218 Downgrading,
220 EnterStaging,
222}
223
224impl FlushReason {
225 fn as_str(&self) -> &'static str {
227 self.into()
228 }
229}
230
231pub(crate) struct RegionFlushTask {
233 pub(crate) region_id: RegionId,
235 pub(crate) reason: FlushReason,
237 pub(crate) senders: Vec<OutputTx>,
239 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
241
242 pub(crate) access_layer: AccessLayerRef,
243 pub(crate) listener: WorkerListener,
244 pub(crate) engine_config: Arc<MitoConfig>,
245 pub(crate) row_group_size: Option<usize>,
246 pub(crate) cache_manager: CacheManagerRef,
247 pub(crate) manifest_ctx: ManifestContextRef,
248
249 pub(crate) index_options: IndexOptions,
251 pub(crate) flush_semaphore: Arc<Semaphore>,
253 pub(crate) is_staging: bool,
255}
256
257impl RegionFlushTask {
258 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
260 if let Some(sender) = sender.take_inner() {
261 self.senders.push(sender);
262 }
263 }
264
265 fn on_success(self) {
267 for sender in self.senders {
268 sender.send(Ok(0));
269 }
270 }
271
272 fn on_failure(&mut self, err: Arc<Error>) {
274 for sender in self.senders.drain(..) {
275 sender.send(Err(err.clone()).context(FlushRegionSnafu {
276 region_id: self.region_id,
277 }));
278 }
279 }
280
281 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
285 let version_data = version_control.current();
288
289 Box::pin(async move {
290 INFLIGHT_FLUSH_COUNT.inc();
291 self.do_flush(version_data).await;
292 INFLIGHT_FLUSH_COUNT.dec();
293 })
294 }
295
296 async fn do_flush(&mut self, version_data: VersionControlData) {
298 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
299 self.listener.on_flush_begin(self.region_id).await;
300
301 let worker_request = match self.flush_memtables(&version_data).await {
302 Ok(edit) => {
303 let memtables_to_remove = version_data
304 .version
305 .memtables
306 .immutables()
307 .iter()
308 .map(|m| m.id())
309 .collect();
310 let flush_finished = FlushFinished {
311 region_id: self.region_id,
312 flushed_entry_id: version_data.last_entry_id,
314 senders: std::mem::take(&mut self.senders),
315 _timer: timer,
316 edit,
317 memtables_to_remove,
318 is_staging: self.is_staging,
319 };
320 WorkerRequest::Background {
321 region_id: self.region_id,
322 notify: BackgroundNotify::FlushFinished(flush_finished),
323 }
324 }
325 Err(e) => {
326 error!(e; "Failed to flush region {}", self.region_id);
327 timer.stop_and_discard();
329
330 let err = Arc::new(e);
331 self.on_failure(err.clone());
332 WorkerRequest::Background {
333 region_id: self.region_id,
334 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
335 }
336 }
337 };
338 self.send_worker_request(worker_request).await;
339 }
340
341 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
344 let version = &version_data.version;
347 let timer = FLUSH_ELAPSED
348 .with_label_values(&["flush_memtables"])
349 .start_timer();
350
351 let mut write_opts = WriteOptions {
352 write_buffer_size: self.engine_config.sst_write_buffer_size,
353 ..Default::default()
354 };
355 if let Some(row_group_size) = self.row_group_size {
356 write_opts.row_group_size = row_group_size;
357 }
358
359 let DoFlushMemtablesResult {
360 file_metas,
361 flushed_bytes,
362 series_count,
363 flush_metrics,
364 } = self.do_flush_memtables(version, write_opts).await?;
365
366 if !file_metas.is_empty() {
367 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
368 }
369
370 let mut file_ids = Vec::with_capacity(file_metas.len());
371 let mut total_rows = 0;
372 let mut total_bytes = 0;
373 for meta in &file_metas {
374 file_ids.push(meta.file_id);
375 total_rows += meta.num_rows;
376 total_bytes += meta.file_size;
377 }
378 info!(
379 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
380 self.region_id,
381 self.reason.as_str(),
382 file_ids,
383 series_count,
384 total_rows,
385 total_bytes,
386 timer.stop_and_record(),
387 flush_metrics,
388 );
389 flush_metrics.observe();
390
391 let edit = RegionEdit {
392 files_to_add: file_metas,
393 files_to_remove: Vec::new(),
394 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
395 compaction_time_window: None,
396 flushed_entry_id: Some(version_data.last_entry_id),
398 flushed_sequence: Some(version_data.committed_sequence),
399 committed_sequence: None,
400 };
401 info!(
402 "Applying {edit:?} to region {}, is_staging: {}",
403 self.region_id, self.is_staging
404 );
405
406 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
407
408 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
409 RegionLeaderState::Downgrading
410 } else {
411 let current_state = self.manifest_ctx.current_state();
413 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
414 RegionLeaderState::Staging
415 } else {
416 RegionLeaderState::Writable
417 }
418 };
419 let version = self
422 .manifest_ctx
423 .update_manifest(expected_state, action_list, self.is_staging)
424 .await?;
425 info!(
426 "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
427 self.region_id,
428 self.is_staging,
429 self.reason.as_str()
430 );
431
432 Ok(edit)
433 }
434
435 async fn do_flush_memtables(
436 &self,
437 version: &VersionRef,
438 write_opts: WriteOptions,
439 ) -> Result<DoFlushMemtablesResult> {
440 let memtables = version.memtables.immutables();
441 let mut file_metas = Vec::with_capacity(memtables.len());
442 let mut flushed_bytes = 0;
443 let mut series_count = 0;
444 let partition_expr = match &version.metadata.partition_expr {
446 None => None,
447 Some(json_expr) if json_expr.is_empty() => None,
448 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
449 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
450 };
451 let mut flush_metrics = Metrics::new(WriteType::Flush);
452 for mem in memtables {
453 if mem.is_empty() {
454 continue;
456 }
457
458 let compact_start = std::time::Instant::now();
460 if let Err(e) = mem.compact(true) {
461 common_telemetry::error!(e; "Failed to compact memtable before flush");
462 }
463 let compact_cost = compact_start.elapsed();
464 flush_metrics.compact_memtable += compact_cost;
465
466 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
468 let num_mem_ranges = mem_ranges.ranges.len();
469 let num_mem_rows = mem_ranges.stats.num_rows();
470 let memtable_id = mem.id();
471 series_count += mem_ranges.stats.series_count();
474
475 if mem_ranges.is_record_batch() {
476 let flush_start = Instant::now();
477 let FlushFlatMemResult {
478 num_encoded,
479 max_sequence,
480 num_sources,
481 results,
482 } = self
483 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
484 .await?;
485 for (source_idx, result) in results.into_iter().enumerate() {
486 let (ssts_written, metrics) = result?;
487 if ssts_written.is_empty() {
488 continue;
490 }
491
492 common_telemetry::debug!(
493 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
494 self.region_id,
495 memtable_id,
496 source_idx,
497 num_sources,
498 metrics
499 );
500
501 flush_metrics = flush_metrics.merge(metrics);
502
503 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
504 flushed_bytes += sst_info.file_size;
505 Self::new_file_meta(
506 self.region_id,
507 max_sequence,
508 sst_info,
509 partition_expr.clone(),
510 )
511 }));
512 }
513
514 common_telemetry::debug!(
515 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
516 self.region_id,
517 num_sources,
518 memtable_id,
519 num_mem_ranges,
520 num_encoded,
521 num_mem_rows,
522 flush_start.elapsed(),
523 compact_cost,
524 );
525 } else {
526 let max_sequence = mem_ranges.stats.max_sequence();
527 let source = memtable_source(mem_ranges, &version.options).await?;
528
529 let source = Either::Left(source);
531 let write_request = self.new_write_request(version, max_sequence, source);
532
533 let mut metrics = Metrics::new(WriteType::Flush);
534 let ssts_written = self
535 .access_layer
536 .write_sst(write_request, &write_opts, &mut metrics)
537 .await?;
538 if ssts_written.is_empty() {
539 continue;
541 }
542
543 debug!(
544 "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
545 self.region_id, num_mem_ranges, num_mem_rows, metrics
546 );
547
548 flush_metrics = flush_metrics.merge(metrics);
549
550 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
551 flushed_bytes += sst_info.file_size;
552 Self::new_file_meta(
553 self.region_id,
554 max_sequence,
555 sst_info,
556 partition_expr.clone(),
557 )
558 }));
559 };
560 }
561
562 Ok(DoFlushMemtablesResult {
563 file_metas,
564 flushed_bytes,
565 series_count,
566 flush_metrics,
567 })
568 }
569
570 async fn flush_flat_mem_ranges(
571 &self,
572 version: &VersionRef,
573 write_opts: &WriteOptions,
574 mem_ranges: MemtableRanges,
575 ) -> Result<FlushFlatMemResult> {
576 let batch_schema = to_flat_sst_arrow_schema(
577 &version.metadata,
578 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
579 );
580 let flat_sources = memtable_flat_sources(
581 batch_schema,
582 mem_ranges,
583 &version.options,
584 version.metadata.primary_key.len(),
585 )?;
586 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
587 let num_encoded = flat_sources.encoded.len();
588 let max_sequence = flat_sources.max_sequence;
589 for source in flat_sources.sources {
590 let source = Either::Right(source);
591 let write_request = self.new_write_request(version, max_sequence, source);
592 let access_layer = self.access_layer.clone();
593 let write_opts = write_opts.clone();
594 let semaphore = self.flush_semaphore.clone();
595 let task = common_runtime::spawn_global(async move {
596 let _permit = semaphore.acquire().await.unwrap();
597 let mut metrics = Metrics::new(WriteType::Flush);
598 let ssts = access_layer
599 .write_sst(write_request, &write_opts, &mut metrics)
600 .await?;
601 Ok((ssts, metrics))
602 });
603 tasks.push(task);
604 }
605 for encoded in flat_sources.encoded {
606 let access_layer = self.access_layer.clone();
607 let cache_manager = self.cache_manager.clone();
608 let region_id = version.metadata.region_id;
609 let semaphore = self.flush_semaphore.clone();
610 let task = common_runtime::spawn_global(async move {
611 let _permit = semaphore.acquire().await.unwrap();
612 let metrics = access_layer
613 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
614 .await?;
615 Ok((smallvec![encoded.sst_info], metrics))
616 });
617 tasks.push(task);
618 }
619 let num_sources = tasks.len();
620 let results = futures::future::try_join_all(tasks)
621 .await
622 .context(JoinSnafu)?;
623 Ok(FlushFlatMemResult {
624 num_encoded,
625 max_sequence,
626 num_sources,
627 results,
628 })
629 }
630
631 fn new_file_meta(
632 region_id: RegionId,
633 max_sequence: u64,
634 sst_info: SstInfo,
635 partition_expr: Option<PartitionExpr>,
636 ) -> FileMeta {
637 FileMeta {
638 region_id,
639 file_id: sst_info.file_id,
640 time_range: sst_info.time_range,
641 level: 0,
642 file_size: sst_info.file_size,
643 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
644 available_indexes: sst_info.index_metadata.build_available_indexes(),
645 indexes: sst_info.index_metadata.build_indexes(),
646 index_file_size: sst_info.index_metadata.file_size,
647 index_version: 0,
648 num_rows: sst_info.num_rows as u64,
649 num_row_groups: sst_info.num_row_groups,
650 sequence: NonZeroU64::new(max_sequence),
651 partition_expr,
652 num_series: sst_info.num_series,
653 }
654 }
655
656 fn new_write_request(
657 &self,
658 version: &VersionRef,
659 max_sequence: u64,
660 source: Either<Source, FlatSource>,
661 ) -> SstWriteRequest {
662 SstWriteRequest {
663 op_type: OperationType::Flush,
664 metadata: version.metadata.clone(),
665 source,
666 cache_manager: self.cache_manager.clone(),
667 storage: version.options.storage.clone(),
668 max_sequence: Some(max_sequence),
669 index_options: self.index_options.clone(),
670 index_config: self.engine_config.index.clone(),
671 inverted_index_config: self.engine_config.inverted_index.clone(),
672 fulltext_index_config: self.engine_config.fulltext_index.clone(),
673 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
674 }
675 }
676
677 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
679 if let Err(e) = self
680 .request_sender
681 .send(WorkerRequestWithTime::new(request))
682 .await
683 {
684 error!(
685 "Failed to notify flush job status for region {}, request: {:?}",
686 self.region_id, e.0
687 );
688 }
689 }
690
691 fn merge(&mut self, mut other: RegionFlushTask) {
693 assert_eq!(self.region_id, other.region_id);
694 self.senders.append(&mut other.senders);
696 }
697}
698
699struct FlushFlatMemResult {
700 num_encoded: usize,
701 max_sequence: u64,
702 num_sources: usize,
703 results: Vec<Result<(SstInfoArray, Metrics)>>,
704}
705
706struct DoFlushMemtablesResult {
707 file_metas: Vec<FileMeta>,
708 flushed_bytes: u64,
709 series_count: usize,
710 flush_metrics: Metrics,
711}
712
713async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
715 let source = if mem_ranges.ranges.len() == 1 {
716 let only_range = mem_ranges.ranges.into_values().next().unwrap();
717 let iter = only_range.build_iter()?;
718 Source::Iter(iter)
719 } else {
720 let sources = mem_ranges
722 .ranges
723 .into_values()
724 .map(|r| r.build_iter().map(Source::Iter))
725 .collect::<Result<Vec<_>>>()?;
726 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
727 let maybe_dedup = if options.append_mode {
728 Box::new(merge_reader) as _
730 } else {
731 match options.merge_mode.unwrap_or(MergeMode::LastRow) {
733 MergeMode::LastRow => {
734 Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
735 }
736 MergeMode::LastNonNull => Box::new(DedupReader::new(
737 merge_reader,
738 LastNonNull::new(false),
739 None,
740 )) as _,
741 }
742 };
743 Source::Reader(maybe_dedup)
744 };
745 Ok(source)
746}
747
748struct FlatSources {
749 max_sequence: u64,
750 sources: SmallVec<[FlatSource; 4]>,
751 encoded: SmallVec<[EncodedRange; 4]>,
752}
753
754fn memtable_flat_sources(
756 schema: SchemaRef,
757 mem_ranges: MemtableRanges,
758 options: &RegionOptions,
759 field_column_start: usize,
760) -> Result<FlatSources> {
761 let MemtableRanges { ranges, stats } = mem_ranges;
762 let max_sequence = stats.max_sequence();
763 let mut flat_sources = FlatSources {
764 max_sequence,
765 sources: SmallVec::new(),
766 encoded: SmallVec::new(),
767 };
768
769 if ranges.len() == 1 {
770 let only_range = ranges.into_values().next().unwrap();
771 if let Some(encoded) = only_range.encoded() {
772 flat_sources.encoded.push(encoded);
773 } else {
774 let iter = only_range.build_record_batch_iter(None)?;
775 let iter = maybe_dedup_one(options, field_column_start, iter);
778 flat_sources.sources.push(FlatSource::Iter(iter));
779 };
780 } else {
781 let min_flush_rows = stats.num_rows / 8;
782 let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
783 let mut last_iter_rows = 0;
784 let num_ranges = ranges.len();
785 let mut input_iters = Vec::with_capacity(num_ranges);
786 for (_range_id, range) in ranges {
787 if let Some(encoded) = range.encoded() {
788 flat_sources.encoded.push(encoded);
789 continue;
790 }
791
792 let iter = range.build_record_batch_iter(None)?;
793 input_iters.push(iter);
794 last_iter_rows += range.num_rows();
795
796 if last_iter_rows > min_flush_rows {
797 let maybe_dedup = merge_and_dedup(
798 &schema,
799 options,
800 field_column_start,
801 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
802 )?;
803
804 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
805 last_iter_rows = 0;
806 }
807 }
808
809 if !input_iters.is_empty() {
811 let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
812
813 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
814 }
815 }
816
817 Ok(flat_sources)
818}
819
820fn merge_and_dedup(
821 schema: &SchemaRef,
822 options: &RegionOptions,
823 field_column_start: usize,
824 input_iters: Vec<BoxedRecordBatchIterator>,
825) -> Result<BoxedRecordBatchIterator> {
826 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
827 let maybe_dedup = if options.append_mode {
828 Box::new(merge_iter) as _
830 } else {
831 match options.merge_mode() {
833 MergeMode::LastRow => {
834 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
835 }
836 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
837 merge_iter,
838 FlatLastNonNull::new(field_column_start, false),
839 )) as _,
840 }
841 };
842 Ok(maybe_dedup)
843}
844
845fn maybe_dedup_one(
846 options: &RegionOptions,
847 field_column_start: usize,
848 input_iter: BoxedRecordBatchIterator,
849) -> BoxedRecordBatchIterator {
850 if options.append_mode {
851 input_iter
853 } else {
854 match options.merge_mode() {
856 MergeMode::LastRow => {
857 Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
858 }
859 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
860 input_iter,
861 FlatLastNonNull::new(field_column_start, false),
862 )),
863 }
864 }
865}
866
867pub(crate) struct FlushScheduler {
869 region_status: HashMap<RegionId, FlushStatus>,
871 scheduler: SchedulerRef,
873}
874
875impl FlushScheduler {
876 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
878 FlushScheduler {
879 region_status: HashMap::new(),
880 scheduler,
881 }
882 }
883
884 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
886 self.region_status.contains_key(®ion_id)
887 }
888
889 fn schedule_flush_task(
890 &mut self,
891 version_control: &VersionControlRef,
892 task: RegionFlushTask,
893 ) -> Result<()> {
894 let region_id = task.region_id;
895
896 if let Err(e) = version_control.freeze_mutable() {
898 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
899
900 return Err(e);
901 }
902 let job = task.into_flush_job(version_control);
904 if let Err(e) = self.scheduler.schedule(job) {
905 error!(e; "Failed to schedule flush job for region {}", region_id);
908
909 return Err(e);
910 }
911 Ok(())
912 }
913
914 pub(crate) fn schedule_flush(
916 &mut self,
917 region_id: RegionId,
918 version_control: &VersionControlRef,
919 task: RegionFlushTask,
920 ) -> Result<()> {
921 debug_assert_eq!(region_id, task.region_id);
922
923 let version = version_control.current().version;
924 if version.memtables.is_empty() {
925 debug_assert!(!self.region_status.contains_key(®ion_id));
926 task.on_success();
928 return Ok(());
929 }
930
931 FLUSH_REQUESTS_TOTAL
933 .with_label_values(&[task.reason.as_str()])
934 .inc();
935
936 if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
938 debug!("Merging flush task for region {}", region_id);
940 flush_status.merge_task(task);
941 return Ok(());
942 }
943
944 self.schedule_flush_task(version_control, task)?;
945
946 let _ = self.region_status.insert(
948 region_id,
949 FlushStatus::new(region_id, version_control.clone()),
950 );
951
952 Ok(())
953 }
954
955 pub(crate) fn on_flush_success(
959 &mut self,
960 region_id: RegionId,
961 ) -> Option<(
962 Vec<SenderDdlRequest>,
963 Vec<SenderWriteRequest>,
964 Vec<SenderBulkRequest>,
965 )> {
966 let flush_status = self.region_status.get_mut(®ion_id)?;
967 if flush_status.pending_task.is_none() {
969 debug!(
972 "Region {} doesn't have any pending flush task, removing it from the status",
973 region_id
974 );
975 let flush_status = self.region_status.remove(®ion_id).unwrap();
976 return Some((
977 flush_status.pending_ddls,
978 flush_status.pending_writes,
979 flush_status.pending_bulk_writes,
980 ));
981 }
982
983 let version_data = flush_status.version_control.current();
985 if version_data.version.memtables.is_empty() {
986 let task = flush_status.pending_task.take().unwrap();
989 task.on_success();
991 debug!(
992 "Region {} has nothing to flush, removing it from the status",
993 region_id
994 );
995 let flush_status = self.region_status.remove(®ion_id).unwrap();
997 return Some((
998 flush_status.pending_ddls,
999 flush_status.pending_writes,
1000 flush_status.pending_bulk_writes,
1001 ));
1002 }
1003
1004 debug!("Scheduling pending flush task for region {}", region_id);
1006 let task = flush_status.pending_task.take().unwrap();
1008 let version_control = flush_status.version_control.clone();
1009 if let Err(err) = self.schedule_flush_task(&version_control, task) {
1010 error!(
1011 err;
1012 "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1013 );
1014 }
1015 None
1017 }
1018
1019 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1021 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1022
1023 FLUSH_FAILURE_TOTAL.inc();
1024
1025 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1027 return;
1028 };
1029
1030 flush_status.on_failure(err);
1032 }
1033
1034 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1036 self.remove_region_on_failure(
1037 region_id,
1038 Arc::new(RegionDroppedSnafu { region_id }.build()),
1039 );
1040 }
1041
1042 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1044 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1045 }
1046
1047 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1049 self.remove_region_on_failure(
1050 region_id,
1051 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1052 );
1053 }
1054
1055 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1056 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1058 return;
1059 };
1060
1061 flush_status.on_failure(err);
1063 }
1064
1065 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1070 let status = self.region_status.get_mut(&request.region_id).unwrap();
1071 status.pending_ddls.push(request);
1072 }
1073
1074 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1079 let status = self
1080 .region_status
1081 .get_mut(&request.request.region_id)
1082 .unwrap();
1083 status.pending_writes.push(request);
1084 }
1085
1086 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1091 let status = self.region_status.get_mut(&request.region_id).unwrap();
1092 status.pending_bulk_writes.push(request);
1093 }
1094
1095 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1097 self.region_status
1098 .get(®ion_id)
1099 .map(|status| !status.pending_ddls.is_empty())
1100 .unwrap_or(false)
1101 }
1102}
1103
1104impl Drop for FlushScheduler {
1105 fn drop(&mut self) {
1106 for (region_id, flush_status) in self.region_status.drain() {
1107 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1109 }
1110 }
1111}
1112
1113struct FlushStatus {
1117 region_id: RegionId,
1119 version_control: VersionControlRef,
1121 pending_task: Option<RegionFlushTask>,
1123 pending_ddls: Vec<SenderDdlRequest>,
1125 pending_writes: Vec<SenderWriteRequest>,
1127 pending_bulk_writes: Vec<SenderBulkRequest>,
1129}
1130
1131impl FlushStatus {
1132 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1133 FlushStatus {
1134 region_id,
1135 version_control,
1136 pending_task: None,
1137 pending_ddls: Vec::new(),
1138 pending_writes: Vec::new(),
1139 pending_bulk_writes: Vec::new(),
1140 }
1141 }
1142
1143 fn merge_task(&mut self, task: RegionFlushTask) {
1145 if let Some(pending) = &mut self.pending_task {
1146 pending.merge(task);
1147 } else {
1148 self.pending_task = Some(task);
1149 }
1150 }
1151
1152 fn on_failure(self, err: Arc<Error>) {
1153 if let Some(mut task) = self.pending_task {
1154 task.on_failure(err.clone());
1155 }
1156 for ddl in self.pending_ddls {
1157 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1158 region_id: self.region_id,
1159 }));
1160 }
1161 for write_req in self.pending_writes {
1162 write_req
1163 .sender
1164 .send(Err(err.clone()).context(FlushRegionSnafu {
1165 region_id: self.region_id,
1166 }));
1167 }
1168 }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 use mito_codec::row_converter::build_primary_key_codec;
1174 use tokio::sync::oneshot;
1175
1176 use super::*;
1177 use crate::cache::CacheManager;
1178 use crate::memtable::bulk::part::BulkPartConverter;
1179 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1180 use crate::memtable::{Memtable, RangesOptions};
1181 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1182 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1183 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1184 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1185
1186 #[test]
1187 fn test_get_mutable_limit() {
1188 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1189 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1190 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1191 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1192 }
1193
1194 #[test]
1195 fn test_over_mutable_limit() {
1196 let manager = WriteBufferManagerImpl::new(1000);
1198 manager.reserve_mem(400);
1199 assert!(!manager.should_flush_engine());
1200 assert!(!manager.should_stall());
1201
1202 manager.reserve_mem(400);
1204 assert!(manager.should_flush_engine());
1205
1206 manager.schedule_free_mem(400);
1208 assert!(!manager.should_flush_engine());
1209 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1210 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1211
1212 manager.free_mem(400);
1214 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1215 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1216 }
1217
1218 #[test]
1219 fn test_over_global() {
1220 let manager = WriteBufferManagerImpl::new(1000);
1222 manager.reserve_mem(1100);
1223 assert!(manager.should_stall());
1224 manager.schedule_free_mem(200);
1226 assert!(manager.should_flush_engine());
1227 assert!(manager.should_stall());
1228
1229 manager.schedule_free_mem(450);
1231 assert!(manager.should_flush_engine());
1232 assert!(manager.should_stall());
1233
1234 manager.reserve_mem(50);
1236 assert!(manager.should_flush_engine());
1237 manager.reserve_mem(100);
1238 assert!(manager.should_flush_engine());
1239 }
1240
1241 #[test]
1242 fn test_manager_notify() {
1243 let (sender, receiver) = watch::channel(());
1244 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1245 manager.reserve_mem(500);
1246 assert!(!receiver.has_changed().unwrap());
1247 manager.schedule_free_mem(500);
1248 assert!(!receiver.has_changed().unwrap());
1249 manager.free_mem(500);
1250 assert!(receiver.has_changed().unwrap());
1251 }
1252
1253 #[tokio::test]
1254 async fn test_schedule_empty() {
1255 let env = SchedulerEnv::new().await;
1256 let (tx, _rx) = mpsc::channel(4);
1257 let mut scheduler = env.mock_flush_scheduler();
1258 let builder = VersionControlBuilder::new();
1259
1260 let version_control = Arc::new(builder.build());
1261 let (output_tx, output_rx) = oneshot::channel();
1262 let mut task = RegionFlushTask {
1263 region_id: builder.region_id(),
1264 reason: FlushReason::Others,
1265 senders: Vec::new(),
1266 request_sender: tx,
1267 access_layer: env.access_layer.clone(),
1268 listener: WorkerListener::default(),
1269 engine_config: Arc::new(MitoConfig::default()),
1270 row_group_size: None,
1271 cache_manager: Arc::new(CacheManager::default()),
1272 manifest_ctx: env
1273 .mock_manifest_context(version_control.current().version.metadata.clone())
1274 .await,
1275 index_options: IndexOptions::default(),
1276 flush_semaphore: Arc::new(Semaphore::new(2)),
1277 is_staging: false,
1278 };
1279 task.push_sender(OptionOutputTx::from(output_tx));
1280 scheduler
1281 .schedule_flush(builder.region_id(), &version_control, task)
1282 .unwrap();
1283 assert!(scheduler.region_status.is_empty());
1284 let output = output_rx.await.unwrap().unwrap();
1285 assert_eq!(output, 0);
1286 assert!(scheduler.region_status.is_empty());
1287 }
1288
1289 #[tokio::test]
1290 async fn test_schedule_pending_request() {
1291 let job_scheduler = Arc::new(VecScheduler::default());
1292 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1293 let (tx, _rx) = mpsc::channel(4);
1294 let mut scheduler = env.mock_flush_scheduler();
1295 let mut builder = VersionControlBuilder::new();
1296 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1298 let version_control = Arc::new(builder.build());
1299 let version_data = version_control.current();
1301 write_rows_to_version(&version_data.version, "host0", 0, 10);
1302 let manifest_ctx = env
1303 .mock_manifest_context(version_data.version.metadata.clone())
1304 .await;
1305 let mut tasks: Vec<_> = (0..3)
1307 .map(|_| RegionFlushTask {
1308 region_id: builder.region_id(),
1309 reason: FlushReason::Others,
1310 senders: Vec::new(),
1311 request_sender: tx.clone(),
1312 access_layer: env.access_layer.clone(),
1313 listener: WorkerListener::default(),
1314 engine_config: Arc::new(MitoConfig::default()),
1315 row_group_size: None,
1316 cache_manager: Arc::new(CacheManager::default()),
1317 manifest_ctx: manifest_ctx.clone(),
1318 index_options: IndexOptions::default(),
1319 flush_semaphore: Arc::new(Semaphore::new(2)),
1320 is_staging: false,
1321 })
1322 .collect();
1323 let task = tasks.pop().unwrap();
1325 scheduler
1326 .schedule_flush(builder.region_id(), &version_control, task)
1327 .unwrap();
1328 assert_eq!(1, scheduler.region_status.len());
1330 assert_eq!(1, job_scheduler.num_jobs());
1331 let version_data = version_control.current();
1333 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1334 let output_rxs: Vec<_> = tasks
1336 .into_iter()
1337 .map(|mut task| {
1338 let (output_tx, output_rx) = oneshot::channel();
1339 task.push_sender(OptionOutputTx::from(output_tx));
1340 scheduler
1341 .schedule_flush(builder.region_id(), &version_control, task)
1342 .unwrap();
1343 output_rx
1344 })
1345 .collect();
1346 version_control.apply_edit(
1348 Some(RegionEdit {
1349 files_to_add: Vec::new(),
1350 files_to_remove: Vec::new(),
1351 timestamp_ms: None,
1352 compaction_time_window: None,
1353 flushed_entry_id: None,
1354 flushed_sequence: None,
1355 committed_sequence: None,
1356 }),
1357 &[0],
1358 builder.file_purger(),
1359 );
1360 scheduler.on_flush_success(builder.region_id());
1361 assert_eq!(1, job_scheduler.num_jobs());
1363 assert!(scheduler.region_status.is_empty());
1365 for output_rx in output_rxs {
1366 let output = output_rx.await.unwrap().unwrap();
1367 assert_eq!(output, 0);
1368 }
1369 }
1370
1371 #[test]
1373 fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1374 let metadata = metadata_for_test();
1376 let schema = to_flat_sst_arrow_schema(
1377 &metadata,
1378 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1379 );
1380
1381 let capacity = 16;
1384 let pk_codec = build_primary_key_codec(&metadata);
1385 let mut converter =
1386 BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1387 let kvs = build_key_values_with_ts_seq_values(
1388 &metadata,
1389 "dup_key".to_string(),
1390 1,
1391 vec![1000i64, 1000i64].into_iter(),
1392 vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1393 1,
1394 );
1395 converter.append_key_values(&kvs).unwrap();
1396 let part = converter.convert().unwrap();
1397
1398 let build_ranges = |append_mode: bool| -> MemtableRanges {
1401 let memtable = crate::memtable::bulk::BulkMemtable::new(
1402 1,
1403 metadata.clone(),
1404 None,
1405 None,
1406 append_mode,
1407 MergeMode::LastRow,
1408 );
1409 memtable.write_bulk(part.clone()).unwrap();
1410 memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1411 };
1412
1413 {
1415 let mem_ranges = build_ranges(false);
1416 assert_eq!(1, mem_ranges.ranges.len());
1417
1418 let options = RegionOptions {
1419 append_mode: false,
1420 merge_mode: Some(MergeMode::LastRow),
1421 ..Default::default()
1422 };
1423
1424 let flat_sources = memtable_flat_sources(
1425 schema.clone(),
1426 mem_ranges,
1427 &options,
1428 metadata.primary_key.len(),
1429 )
1430 .unwrap();
1431 assert!(flat_sources.encoded.is_empty());
1432 assert_eq!(1, flat_sources.sources.len());
1433
1434 let mut total_rows = 0usize;
1436 for source in flat_sources.sources {
1437 match source {
1438 crate::read::FlatSource::Iter(iter) => {
1439 for rb in iter {
1440 total_rows += rb.unwrap().num_rows();
1441 }
1442 }
1443 crate::read::FlatSource::Stream(_) => unreachable!(),
1444 }
1445 }
1446 assert_eq!(1, total_rows, "dedup should keep a single row");
1447 }
1448
1449 {
1451 let mem_ranges = build_ranges(true);
1452 assert_eq!(1, mem_ranges.ranges.len());
1453
1454 let options = RegionOptions {
1455 append_mode: true,
1456 ..Default::default()
1457 };
1458
1459 let flat_sources =
1460 memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1461 .unwrap();
1462 assert!(flat_sources.encoded.is_empty());
1463 assert_eq!(1, flat_sources.sources.len());
1464
1465 let mut total_rows = 0usize;
1466 for source in flat_sources.sources {
1467 match source {
1468 crate::read::FlatSource::Iter(iter) => {
1469 for rb in iter {
1470 total_rows += rb.unwrap().num_rows();
1471 }
1472 }
1473 crate::read::FlatSource::Stream(_) => unreachable!(),
1474 }
1475 }
1476 assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1477 }
1478 }
1479
1480 #[tokio::test]
1481 async fn test_schedule_pending_request_on_flush_success() {
1482 common_telemetry::init_default_ut_logging();
1483 let job_scheduler = Arc::new(VecScheduler::default());
1484 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1485 let (tx, _rx) = mpsc::channel(4);
1486 let mut scheduler = env.mock_flush_scheduler();
1487 let mut builder = VersionControlBuilder::new();
1488 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1490 let version_control = Arc::new(builder.build());
1491 let version_data = version_control.current();
1493 write_rows_to_version(&version_data.version, "host0", 0, 10);
1494 let manifest_ctx = env
1495 .mock_manifest_context(version_data.version.metadata.clone())
1496 .await;
1497 let mut tasks: Vec<_> = (0..2)
1499 .map(|_| RegionFlushTask {
1500 region_id: builder.region_id(),
1501 reason: FlushReason::Others,
1502 senders: Vec::new(),
1503 request_sender: tx.clone(),
1504 access_layer: env.access_layer.clone(),
1505 listener: WorkerListener::default(),
1506 engine_config: Arc::new(MitoConfig::default()),
1507 row_group_size: None,
1508 cache_manager: Arc::new(CacheManager::default()),
1509 manifest_ctx: manifest_ctx.clone(),
1510 index_options: IndexOptions::default(),
1511 flush_semaphore: Arc::new(Semaphore::new(2)),
1512 is_staging: false,
1513 })
1514 .collect();
1515 let task = tasks.pop().unwrap();
1517 scheduler
1518 .schedule_flush(builder.region_id(), &version_control, task)
1519 .unwrap();
1520 assert_eq!(1, scheduler.region_status.len());
1522 assert_eq!(1, job_scheduler.num_jobs());
1523 let task = tasks.pop().unwrap();
1525 scheduler
1526 .schedule_flush(builder.region_id(), &version_control, task)
1527 .unwrap();
1528 assert!(
1529 scheduler
1530 .region_status
1531 .get(&builder.region_id())
1532 .unwrap()
1533 .pending_task
1534 .is_some()
1535 );
1536
1537 let version_data = version_control.current();
1539 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1540 version_control.apply_edit(
1542 Some(RegionEdit {
1543 files_to_add: Vec::new(),
1544 files_to_remove: Vec::new(),
1545 timestamp_ms: None,
1546 compaction_time_window: None,
1547 flushed_entry_id: None,
1548 flushed_sequence: None,
1549 committed_sequence: None,
1550 }),
1551 &[0],
1552 builder.file_purger(),
1553 );
1554 write_rows_to_version(&version_data.version, "host1", 0, 10);
1555 scheduler.on_flush_success(builder.region_id());
1556 assert_eq!(2, job_scheduler.num_jobs());
1557 assert!(
1559 scheduler
1560 .region_status
1561 .get(&builder.region_id())
1562 .unwrap()
1563 .pending_task
1564 .is_none()
1565 );
1566 }
1567}