1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, trace};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39 Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44 BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48 INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
58use crate::request::{
59 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72 fn should_flush_engine(&self) -> bool;
74
75 fn should_stall(&self) -> bool;
77
78 fn reserve_mem(&self, mem: usize);
80
81 fn schedule_free_mem(&self, mem: usize);
86
87 fn free_mem(&self, mem: usize);
89
90 fn memory_usage(&self) -> usize;
92}
93
94pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
95
96#[derive(Debug)]
101pub struct WriteBufferManagerImpl {
102 global_write_buffer_size: usize,
104 mutable_limit: usize,
106 memory_used: AtomicUsize,
108 memory_active: AtomicUsize,
110 notifier: Option<watch::Sender<()>>,
113}
114
115impl WriteBufferManagerImpl {
116 pub fn new(global_write_buffer_size: usize) -> Self {
118 Self {
119 global_write_buffer_size,
120 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
121 memory_used: AtomicUsize::new(0),
122 memory_active: AtomicUsize::new(0),
123 notifier: None,
124 }
125 }
126
127 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
129 self.notifier = Some(notifier);
130 self
131 }
132
133 pub fn mutable_usage(&self) -> usize {
135 self.memory_active.load(Ordering::Relaxed)
136 }
137
138 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
140 global_write_buffer_size / 2
142 }
143}
144
145impl WriteBufferManager for WriteBufferManagerImpl {
146 fn should_flush_engine(&self) -> bool {
147 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
148 if mutable_memtable_memory_usage > self.mutable_limit {
149 debug!(
150 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
151 mutable_memtable_memory_usage,
152 self.memory_usage(),
153 self.mutable_limit,
154 self.global_write_buffer_size,
155 );
156 return true;
157 }
158
159 let memory_usage = self.memory_used.load(Ordering::Relaxed);
160 if memory_usage >= self.global_write_buffer_size {
164 if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
165 debug!(
166 "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
167 mutable_usage: {}.",
168 memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
169 );
170 return true;
171 } else {
172 trace!(
173 "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
174 memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
175 );
176 }
177 }
178
179 false
180 }
181
182 fn should_stall(&self) -> bool {
183 self.memory_usage() >= self.global_write_buffer_size
184 }
185
186 fn reserve_mem(&self, mem: usize) {
187 self.memory_used.fetch_add(mem, Ordering::Relaxed);
188 self.memory_active.fetch_add(mem, Ordering::Relaxed);
189 }
190
191 fn schedule_free_mem(&self, mem: usize) {
192 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
193 }
194
195 fn free_mem(&self, mem: usize) {
196 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
197 if let Some(notifier) = &self.notifier {
198 let _ = notifier.send(());
202 }
203 }
204
205 fn memory_usage(&self) -> usize {
206 self.memory_used.load(Ordering::Relaxed)
207 }
208}
209
210#[derive(Debug, IntoStaticStr)]
212pub enum FlushReason {
213 Others,
215 EngineFull,
217 Manual,
219 Alter,
221 Periodically,
223 Downgrading,
225}
226
227impl FlushReason {
228 fn as_str(&self) -> &'static str {
230 self.into()
231 }
232}
233
234pub(crate) struct RegionFlushTask {
236 pub(crate) region_id: RegionId,
238 pub(crate) reason: FlushReason,
240 pub(crate) senders: Vec<OutputTx>,
242 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
244
245 pub(crate) access_layer: AccessLayerRef,
246 pub(crate) listener: WorkerListener,
247 pub(crate) engine_config: Arc<MitoConfig>,
248 pub(crate) row_group_size: Option<usize>,
249 pub(crate) cache_manager: CacheManagerRef,
250 pub(crate) manifest_ctx: ManifestContextRef,
251
252 pub(crate) index_options: IndexOptions,
254 pub(crate) flush_semaphore: Arc<Semaphore>,
256}
257
258impl RegionFlushTask {
259 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
261 if let Some(sender) = sender.take_inner() {
262 self.senders.push(sender);
263 }
264 }
265
266 fn on_success(self) {
268 for sender in self.senders {
269 sender.send(Ok(0));
270 }
271 }
272
273 fn on_failure(&mut self, err: Arc<Error>) {
275 for sender in self.senders.drain(..) {
276 sender.send(Err(err.clone()).context(FlushRegionSnafu {
277 region_id: self.region_id,
278 }));
279 }
280 }
281
282 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
286 let version_data = version_control.current();
289
290 Box::pin(async move {
291 INFLIGHT_FLUSH_COUNT.inc();
292 self.do_flush(version_data).await;
293 INFLIGHT_FLUSH_COUNT.dec();
294 })
295 }
296
297 async fn do_flush(&mut self, version_data: VersionControlData) {
299 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
300 self.listener.on_flush_begin(self.region_id).await;
301
302 let worker_request = match self.flush_memtables(&version_data).await {
303 Ok(edit) => {
304 let memtables_to_remove = version_data
305 .version
306 .memtables
307 .immutables()
308 .iter()
309 .map(|m| m.id())
310 .collect();
311 let flush_finished = FlushFinished {
312 region_id: self.region_id,
313 flushed_entry_id: version_data.last_entry_id,
315 senders: std::mem::take(&mut self.senders),
316 _timer: timer,
317 edit,
318 memtables_to_remove,
319 };
320 WorkerRequest::Background {
321 region_id: self.region_id,
322 notify: BackgroundNotify::FlushFinished(flush_finished),
323 }
324 }
325 Err(e) => {
326 error!(e; "Failed to flush region {}", self.region_id);
327 timer.stop_and_discard();
329
330 let err = Arc::new(e);
331 self.on_failure(err.clone());
332 WorkerRequest::Background {
333 region_id: self.region_id,
334 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
335 }
336 }
337 };
338 self.send_worker_request(worker_request).await;
339 }
340
341 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
344 let version = &version_data.version;
347 let timer = FLUSH_ELAPSED
348 .with_label_values(&["flush_memtables"])
349 .start_timer();
350
351 let mut write_opts = WriteOptions {
352 write_buffer_size: self.engine_config.sst_write_buffer_size,
353 ..Default::default()
354 };
355 if let Some(row_group_size) = self.row_group_size {
356 write_opts.row_group_size = row_group_size;
357 }
358
359 let DoFlushMemtablesResult {
360 file_metas,
361 flushed_bytes,
362 series_count,
363 flush_metrics,
364 } = self.do_flush_memtables(version, write_opts).await?;
365
366 if !file_metas.is_empty() {
367 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
368 }
369
370 let mut file_ids = Vec::with_capacity(file_metas.len());
371 let mut total_rows = 0;
372 let mut total_bytes = 0;
373 for meta in &file_metas {
374 file_ids.push(meta.file_id);
375 total_rows += meta.num_rows;
376 total_bytes += meta.file_size;
377 }
378 info!(
379 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
380 self.region_id,
381 self.reason.as_str(),
382 file_ids,
383 series_count,
384 total_rows,
385 total_bytes,
386 timer.stop_and_record(),
387 flush_metrics,
388 );
389 flush_metrics.observe();
390
391 let edit = RegionEdit {
392 files_to_add: file_metas,
393 files_to_remove: Vec::new(),
394 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
395 compaction_time_window: None,
396 flushed_entry_id: Some(version_data.last_entry_id),
398 flushed_sequence: Some(version_data.committed_sequence),
399 committed_sequence: None,
400 };
401 info!("Applying {edit:?} to region {}", self.region_id);
402
403 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
404
405 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
406 RegionLeaderState::Downgrading
407 } else {
408 let current_state = self.manifest_ctx.current_state();
410 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
411 RegionLeaderState::Staging
412 } else {
413 RegionLeaderState::Writable
414 }
415 };
416 let version = self
419 .manifest_ctx
420 .update_manifest(expected_state, action_list)
421 .await?;
422 info!(
423 "Successfully update manifest version to {version}, region: {}, reason: {}",
424 self.region_id,
425 self.reason.as_str()
426 );
427
428 Ok(edit)
429 }
430
431 async fn do_flush_memtables(
432 &self,
433 version: &VersionRef,
434 write_opts: WriteOptions,
435 ) -> Result<DoFlushMemtablesResult> {
436 let memtables = version.memtables.immutables();
437 let mut file_metas = Vec::with_capacity(memtables.len());
438 let mut flushed_bytes = 0;
439 let mut series_count = 0;
440 let partition_expr = match &version.metadata.partition_expr {
442 None => None,
443 Some(json_expr) if json_expr.is_empty() => None,
444 Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
445 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
446 };
447 let mut flush_metrics = Metrics::new(WriteType::Flush);
448 for mem in memtables {
449 if mem.is_empty() {
450 continue;
452 }
453
454 let compact_start = std::time::Instant::now();
456 if let Err(e) = mem.compact(true) {
457 common_telemetry::error!(e; "Failed to compact memtable before flush");
458 }
459 let compact_cost = compact_start.elapsed();
460 flush_metrics.compact_memtable += compact_cost;
461
462 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
464 let num_mem_ranges = mem_ranges.ranges.len();
465 let num_mem_rows = mem_ranges.stats.num_rows();
466 let memtable_id = mem.id();
467 series_count += mem_ranges.stats.series_count();
470
471 if mem_ranges.is_record_batch() {
472 let flush_start = Instant::now();
473 let FlushFlatMemResult {
474 num_encoded,
475 max_sequence,
476 num_sources,
477 results,
478 } = self
479 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
480 .await?;
481 for (source_idx, result) in results.into_iter().enumerate() {
482 let (ssts_written, metrics) = result?;
483 if ssts_written.is_empty() {
484 continue;
486 }
487
488 common_telemetry::debug!(
489 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
490 self.region_id,
491 memtable_id,
492 source_idx,
493 num_sources,
494 metrics
495 );
496
497 flush_metrics = flush_metrics.merge(metrics);
498
499 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
500 flushed_bytes += sst_info.file_size;
501 Self::new_file_meta(
502 self.region_id,
503 max_sequence,
504 sst_info,
505 partition_expr.clone(),
506 )
507 }));
508 }
509
510 common_telemetry::debug!(
511 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
512 self.region_id,
513 num_sources,
514 memtable_id,
515 num_mem_ranges,
516 num_encoded,
517 num_mem_rows,
518 flush_start.elapsed(),
519 compact_cost,
520 );
521 } else {
522 let max_sequence = mem_ranges.stats.max_sequence();
523 let source = memtable_source(mem_ranges, &version.options).await?;
524
525 let source = Either::Left(source);
527 let write_request = self.new_write_request(version, max_sequence, source);
528
529 let mut metrics = Metrics::new(WriteType::Flush);
530 let ssts_written = self
531 .access_layer
532 .write_sst(write_request, &write_opts, &mut metrics)
533 .await?;
534 if ssts_written.is_empty() {
535 continue;
537 }
538
539 debug!(
540 "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
541 self.region_id, num_mem_ranges, num_mem_rows, metrics
542 );
543
544 flush_metrics = flush_metrics.merge(metrics);
545
546 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
547 flushed_bytes += sst_info.file_size;
548 Self::new_file_meta(
549 self.region_id,
550 max_sequence,
551 sst_info,
552 partition_expr.clone(),
553 )
554 }));
555 };
556 }
557
558 Ok(DoFlushMemtablesResult {
559 file_metas,
560 flushed_bytes,
561 series_count,
562 flush_metrics,
563 })
564 }
565
566 async fn flush_flat_mem_ranges(
567 &self,
568 version: &VersionRef,
569 write_opts: &WriteOptions,
570 mem_ranges: MemtableRanges,
571 ) -> Result<FlushFlatMemResult> {
572 let batch_schema = to_flat_sst_arrow_schema(
573 &version.metadata,
574 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
575 );
576 let flat_sources = memtable_flat_sources(
577 batch_schema,
578 mem_ranges,
579 &version.options,
580 version.metadata.primary_key.len(),
581 )?;
582 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
583 let num_encoded = flat_sources.encoded.len();
584 let max_sequence = flat_sources.max_sequence;
585 for source in flat_sources.sources {
586 let source = Either::Right(source);
587 let write_request = self.new_write_request(version, max_sequence, source);
588 let access_layer = self.access_layer.clone();
589 let write_opts = write_opts.clone();
590 let semaphore = self.flush_semaphore.clone();
591 let task = common_runtime::spawn_global(async move {
592 let _permit = semaphore.acquire().await.unwrap();
593 let mut metrics = Metrics::new(WriteType::Flush);
594 let ssts = access_layer
595 .write_sst(write_request, &write_opts, &mut metrics)
596 .await?;
597 Ok((ssts, metrics))
598 });
599 tasks.push(task);
600 }
601 for encoded in flat_sources.encoded {
602 let access_layer = self.access_layer.clone();
603 let cache_manager = self.cache_manager.clone();
604 let region_id = version.metadata.region_id;
605 let semaphore = self.flush_semaphore.clone();
606 let task = common_runtime::spawn_global(async move {
607 let _permit = semaphore.acquire().await.unwrap();
608 let metrics = access_layer
609 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
610 .await?;
611 Ok((smallvec![encoded.sst_info], metrics))
612 });
613 tasks.push(task);
614 }
615 let num_sources = tasks.len();
616 let results = futures::future::try_join_all(tasks)
617 .await
618 .context(JoinSnafu)?;
619 Ok(FlushFlatMemResult {
620 num_encoded,
621 max_sequence,
622 num_sources,
623 results,
624 })
625 }
626
627 fn new_file_meta(
628 region_id: RegionId,
629 max_sequence: u64,
630 sst_info: SstInfo,
631 partition_expr: Option<PartitionExpr>,
632 ) -> FileMeta {
633 FileMeta {
634 region_id,
635 file_id: sst_info.file_id,
636 time_range: sst_info.time_range,
637 level: 0,
638 file_size: sst_info.file_size,
639 available_indexes: sst_info.index_metadata.build_available_indexes(),
640 index_file_size: sst_info.index_metadata.file_size,
641 index_file_id: None,
642 num_rows: sst_info.num_rows as u64,
643 num_row_groups: sst_info.num_row_groups,
644 sequence: NonZeroU64::new(max_sequence),
645 partition_expr,
646 num_series: sst_info.num_series,
647 }
648 }
649
650 fn new_write_request(
651 &self,
652 version: &VersionRef,
653 max_sequence: u64,
654 source: Either<Source, FlatSource>,
655 ) -> SstWriteRequest {
656 SstWriteRequest {
657 op_type: OperationType::Flush,
658 metadata: version.metadata.clone(),
659 source,
660 cache_manager: self.cache_manager.clone(),
661 storage: version.options.storage.clone(),
662 max_sequence: Some(max_sequence),
663 index_options: self.index_options.clone(),
664 index_config: self.engine_config.index.clone(),
665 inverted_index_config: self.engine_config.inverted_index.clone(),
666 fulltext_index_config: self.engine_config.fulltext_index.clone(),
667 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
668 }
669 }
670
671 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
673 if let Err(e) = self
674 .request_sender
675 .send(WorkerRequestWithTime::new(request))
676 .await
677 {
678 error!(
679 "Failed to notify flush job status for region {}, request: {:?}",
680 self.region_id, e.0
681 );
682 }
683 }
684
685 fn merge(&mut self, mut other: RegionFlushTask) {
687 assert_eq!(self.region_id, other.region_id);
688 self.senders.append(&mut other.senders);
690 }
691}
692
693struct FlushFlatMemResult {
694 num_encoded: usize,
695 max_sequence: u64,
696 num_sources: usize,
697 results: Vec<Result<(SstInfoArray, Metrics)>>,
698}
699
700struct DoFlushMemtablesResult {
701 file_metas: Vec<FileMeta>,
702 flushed_bytes: u64,
703 series_count: usize,
704 flush_metrics: Metrics,
705}
706
707async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
709 let source = if mem_ranges.ranges.len() == 1 {
710 let only_range = mem_ranges.ranges.into_values().next().unwrap();
711 let iter = only_range.build_iter()?;
712 Source::Iter(iter)
713 } else {
714 let sources = mem_ranges
716 .ranges
717 .into_values()
718 .map(|r| r.build_iter().map(Source::Iter))
719 .collect::<Result<Vec<_>>>()?;
720 let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
721 let maybe_dedup = if options.append_mode {
722 Box::new(merge_reader) as _
724 } else {
725 match options.merge_mode.unwrap_or(MergeMode::LastRow) {
727 MergeMode::LastRow => {
728 Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
729 }
730 MergeMode::LastNonNull => {
731 Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
732 }
733 }
734 };
735 Source::Reader(maybe_dedup)
736 };
737 Ok(source)
738}
739
740struct FlatSources {
741 max_sequence: u64,
742 sources: SmallVec<[FlatSource; 4]>,
743 encoded: SmallVec<[EncodedRange; 4]>,
744}
745
746fn memtable_flat_sources(
748 schema: SchemaRef,
749 mem_ranges: MemtableRanges,
750 options: &RegionOptions,
751 field_column_start: usize,
752) -> Result<FlatSources> {
753 let MemtableRanges { ranges, stats } = mem_ranges;
754 let max_sequence = stats.max_sequence();
755 let mut flat_sources = FlatSources {
756 max_sequence,
757 sources: SmallVec::new(),
758 encoded: SmallVec::new(),
759 };
760
761 if ranges.len() == 1 {
762 let only_range = ranges.into_values().next().unwrap();
763 if let Some(encoded) = only_range.encoded() {
764 flat_sources.encoded.push(encoded);
765 } else {
766 let iter = only_range.build_record_batch_iter(None)?;
767 let iter = maybe_dedup_one(options, field_column_start, iter);
770 flat_sources.sources.push(FlatSource::Iter(iter));
771 };
772 } else {
773 let min_flush_rows = stats.num_rows / 8;
774 let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
775 let mut last_iter_rows = 0;
776 let num_ranges = ranges.len();
777 let mut input_iters = Vec::with_capacity(num_ranges);
778 for (_range_id, range) in ranges {
779 if let Some(encoded) = range.encoded() {
780 flat_sources.encoded.push(encoded);
781 continue;
782 }
783
784 let iter = range.build_record_batch_iter(None)?;
785 input_iters.push(iter);
786 last_iter_rows += range.num_rows();
787
788 if last_iter_rows > min_flush_rows {
789 let maybe_dedup = merge_and_dedup(
790 &schema,
791 options,
792 field_column_start,
793 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
794 )?;
795
796 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
797 last_iter_rows = 0;
798 }
799 }
800
801 if !input_iters.is_empty() {
803 let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
804
805 flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
806 }
807 }
808
809 Ok(flat_sources)
810}
811
812fn merge_and_dedup(
813 schema: &SchemaRef,
814 options: &RegionOptions,
815 field_column_start: usize,
816 input_iters: Vec<BoxedRecordBatchIterator>,
817) -> Result<BoxedRecordBatchIterator> {
818 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
819 let maybe_dedup = if options.append_mode {
820 Box::new(merge_iter) as _
822 } else {
823 match options.merge_mode() {
825 MergeMode::LastRow => {
826 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
827 }
828 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
829 merge_iter,
830 FlatLastNonNull::new(field_column_start, false),
831 )) as _,
832 }
833 };
834 Ok(maybe_dedup)
835}
836
837fn maybe_dedup_one(
838 options: &RegionOptions,
839 field_column_start: usize,
840 input_iter: BoxedRecordBatchIterator,
841) -> BoxedRecordBatchIterator {
842 if options.append_mode {
843 input_iter
845 } else {
846 match options.merge_mode() {
848 MergeMode::LastRow => {
849 Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
850 }
851 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
852 input_iter,
853 FlatLastNonNull::new(field_column_start, false),
854 )),
855 }
856 }
857}
858
859pub(crate) struct FlushScheduler {
861 region_status: HashMap<RegionId, FlushStatus>,
863 scheduler: SchedulerRef,
865}
866
867impl FlushScheduler {
868 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
870 FlushScheduler {
871 region_status: HashMap::new(),
872 scheduler,
873 }
874 }
875
876 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
878 self.region_status.contains_key(®ion_id)
879 }
880
881 pub(crate) fn schedule_flush(
883 &mut self,
884 region_id: RegionId,
885 version_control: &VersionControlRef,
886 task: RegionFlushTask,
887 ) -> Result<()> {
888 debug_assert_eq!(region_id, task.region_id);
889
890 let version = version_control.current().version;
891 if version.memtables.is_empty() {
892 debug_assert!(!self.region_status.contains_key(®ion_id));
893 task.on_success();
895 return Ok(());
896 }
897
898 FLUSH_REQUESTS_TOTAL
900 .with_label_values(&[task.reason.as_str()])
901 .inc();
902
903 let flush_status = self
905 .region_status
906 .entry(region_id)
907 .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
908 if flush_status.flushing {
910 flush_status.merge_task(task);
912 return Ok(());
913 }
914
915 if flush_status.pending_task.is_some() {
918 flush_status.merge_task(task);
919 return Ok(());
920 }
921
922 if let Err(e) = version_control.freeze_mutable() {
924 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
925
926 self.region_status.remove(®ion_id);
928 return Err(e);
929 }
930 let job = task.into_flush_job(version_control);
932 if let Err(e) = self.scheduler.schedule(job) {
933 error!(e; "Failed to schedule flush job for region {}", region_id);
936
937 self.region_status.remove(®ion_id);
939 return Err(e);
940 }
941
942 flush_status.flushing = true;
943
944 Ok(())
945 }
946
947 pub(crate) fn on_flush_success(
951 &mut self,
952 region_id: RegionId,
953 ) -> Option<(
954 Vec<SenderDdlRequest>,
955 Vec<SenderWriteRequest>,
956 Vec<SenderBulkRequest>,
957 )> {
958 let flush_status = self.region_status.get_mut(®ion_id)?;
959
960 flush_status.flushing = false;
962
963 let pending_requests = if flush_status.pending_task.is_none() {
964 let flush_status = self.region_status.remove(®ion_id).unwrap();
967 Some((
968 flush_status.pending_ddls,
969 flush_status.pending_writes,
970 flush_status.pending_bulk_writes,
971 ))
972 } else {
973 let version_data = flush_status.version_control.current();
974 if version_data.version.memtables.is_empty() {
975 let task = flush_status.pending_task.take().unwrap();
978 task.on_success();
980 let flush_status = self.region_status.remove(®ion_id).unwrap();
984 Some((
985 flush_status.pending_ddls,
986 flush_status.pending_writes,
987 flush_status.pending_bulk_writes,
988 ))
989 } else {
990 None
992 }
993 };
994
995 if let Err(e) = self.schedule_next_flush() {
997 error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
998 }
999
1000 pending_requests
1001 }
1002
1003 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1005 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1006
1007 FLUSH_FAILURE_TOTAL.inc();
1008
1009 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1011 return;
1012 };
1013
1014 flush_status.on_failure(err);
1016
1017 if let Err(e) = self.schedule_next_flush() {
1019 error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
1020 }
1021 }
1022
1023 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1025 self.remove_region_on_failure(
1026 region_id,
1027 Arc::new(RegionDroppedSnafu { region_id }.build()),
1028 );
1029 }
1030
1031 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1033 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1034 }
1035
1036 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1038 self.remove_region_on_failure(
1039 region_id,
1040 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1041 );
1042 }
1043
1044 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1045 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1047 return;
1048 };
1049
1050 flush_status.on_failure(err);
1052 }
1053
1054 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1059 let status = self.region_status.get_mut(&request.region_id).unwrap();
1060 status.pending_ddls.push(request);
1061 }
1062
1063 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1068 let status = self
1069 .region_status
1070 .get_mut(&request.request.region_id)
1071 .unwrap();
1072 status.pending_writes.push(request);
1073 }
1074
1075 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1080 let status = self.region_status.get_mut(&request.region_id).unwrap();
1081 status.pending_bulk_writes.push(request);
1082 }
1083
1084 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1086 self.region_status
1087 .get(®ion_id)
1088 .map(|status| !status.pending_ddls.is_empty())
1089 .unwrap_or(false)
1090 }
1091
1092 pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
1094 debug_assert!(
1095 self.region_status
1096 .values()
1097 .all(|status| status.flushing || status.pending_task.is_some())
1098 );
1099
1100 let Some(flush_status) = self
1102 .region_status
1103 .values_mut()
1104 .find(|status| status.pending_task.is_some())
1105 else {
1106 return Ok(());
1107 };
1108 debug_assert!(!flush_status.flushing);
1109 let task = flush_status.pending_task.take().unwrap();
1110 let region_id = flush_status.region_id;
1111 let version_control = flush_status.version_control.clone();
1112
1113 self.schedule_flush(region_id, &version_control, task)
1114 }
1115}
1116
1117impl Drop for FlushScheduler {
1118 fn drop(&mut self) {
1119 for (region_id, flush_status) in self.region_status.drain() {
1120 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1122 }
1123 }
1124}
1125
1126struct FlushStatus {
1130 region_id: RegionId,
1132 version_control: VersionControlRef,
1134 flushing: bool,
1139 pending_task: Option<RegionFlushTask>,
1141 pending_ddls: Vec<SenderDdlRequest>,
1143 pending_writes: Vec<SenderWriteRequest>,
1145 pending_bulk_writes: Vec<SenderBulkRequest>,
1147}
1148
1149impl FlushStatus {
1150 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1151 FlushStatus {
1152 region_id,
1153 version_control,
1154 flushing: false,
1155 pending_task: None,
1156 pending_ddls: Vec::new(),
1157 pending_writes: Vec::new(),
1158 pending_bulk_writes: Vec::new(),
1159 }
1160 }
1161
1162 fn merge_task(&mut self, task: RegionFlushTask) {
1164 if let Some(pending) = &mut self.pending_task {
1165 pending.merge(task);
1166 } else {
1167 self.pending_task = Some(task);
1168 }
1169 }
1170
1171 fn on_failure(self, err: Arc<Error>) {
1172 if let Some(mut task) = self.pending_task {
1173 task.on_failure(err.clone());
1174 }
1175 for ddl in self.pending_ddls {
1176 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1177 region_id: self.region_id,
1178 }));
1179 }
1180 for write_req in self.pending_writes {
1181 write_req
1182 .sender
1183 .send(Err(err.clone()).context(FlushRegionSnafu {
1184 region_id: self.region_id,
1185 }));
1186 }
1187 }
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192 use mito_codec::row_converter::build_primary_key_codec;
1193 use tokio::sync::oneshot;
1194
1195 use super::*;
1196 use crate::cache::CacheManager;
1197 use crate::memtable::bulk::part::BulkPartConverter;
1198 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1199 use crate::memtable::{Memtable, RangesOptions};
1200 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1201 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1202 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1203 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1204
1205 #[test]
1206 fn test_get_mutable_limit() {
1207 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1208 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1209 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1210 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1211 }
1212
1213 #[test]
1214 fn test_over_mutable_limit() {
1215 let manager = WriteBufferManagerImpl::new(1000);
1217 manager.reserve_mem(400);
1218 assert!(!manager.should_flush_engine());
1219 assert!(!manager.should_stall());
1220
1221 manager.reserve_mem(400);
1223 assert!(manager.should_flush_engine());
1224
1225 manager.schedule_free_mem(400);
1227 assert!(!manager.should_flush_engine());
1228 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1229 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1230
1231 manager.free_mem(400);
1233 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1234 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1235 }
1236
1237 #[test]
1238 fn test_over_global() {
1239 let manager = WriteBufferManagerImpl::new(1000);
1241 manager.reserve_mem(1100);
1242 assert!(manager.should_stall());
1243 manager.schedule_free_mem(200);
1245 assert!(manager.should_flush_engine());
1246
1247 manager.schedule_free_mem(450);
1249 assert!(!manager.should_flush_engine());
1250
1251 manager.reserve_mem(50);
1253 assert!(manager.should_flush_engine());
1254 manager.reserve_mem(100);
1255 assert!(manager.should_flush_engine());
1256 }
1257
1258 #[test]
1259 fn test_manager_notify() {
1260 let (sender, receiver) = watch::channel(());
1261 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1262 manager.reserve_mem(500);
1263 assert!(!receiver.has_changed().unwrap());
1264 manager.schedule_free_mem(500);
1265 assert!(!receiver.has_changed().unwrap());
1266 manager.free_mem(500);
1267 assert!(receiver.has_changed().unwrap());
1268 }
1269
1270 #[tokio::test]
1271 async fn test_schedule_empty() {
1272 let env = SchedulerEnv::new().await;
1273 let (tx, _rx) = mpsc::channel(4);
1274 let mut scheduler = env.mock_flush_scheduler();
1275 let builder = VersionControlBuilder::new();
1276
1277 let version_control = Arc::new(builder.build());
1278 let (output_tx, output_rx) = oneshot::channel();
1279 let mut task = RegionFlushTask {
1280 region_id: builder.region_id(),
1281 reason: FlushReason::Others,
1282 senders: Vec::new(),
1283 request_sender: tx,
1284 access_layer: env.access_layer.clone(),
1285 listener: WorkerListener::default(),
1286 engine_config: Arc::new(MitoConfig::default()),
1287 row_group_size: None,
1288 cache_manager: Arc::new(CacheManager::default()),
1289 manifest_ctx: env
1290 .mock_manifest_context(version_control.current().version.metadata.clone())
1291 .await,
1292 index_options: IndexOptions::default(),
1293 flush_semaphore: Arc::new(Semaphore::new(2)),
1294 };
1295 task.push_sender(OptionOutputTx::from(output_tx));
1296 scheduler
1297 .schedule_flush(builder.region_id(), &version_control, task)
1298 .unwrap();
1299 assert!(scheduler.region_status.is_empty());
1300 let output = output_rx.await.unwrap().unwrap();
1301 assert_eq!(output, 0);
1302 assert!(scheduler.region_status.is_empty());
1303 }
1304
1305 #[tokio::test]
1306 async fn test_schedule_pending_request() {
1307 let job_scheduler = Arc::new(VecScheduler::default());
1308 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1309 let (tx, _rx) = mpsc::channel(4);
1310 let mut scheduler = env.mock_flush_scheduler();
1311 let mut builder = VersionControlBuilder::new();
1312 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1314 let version_control = Arc::new(builder.build());
1315 let version_data = version_control.current();
1317 write_rows_to_version(&version_data.version, "host0", 0, 10);
1318 let manifest_ctx = env
1319 .mock_manifest_context(version_data.version.metadata.clone())
1320 .await;
1321 let mut tasks: Vec<_> = (0..3)
1323 .map(|_| RegionFlushTask {
1324 region_id: builder.region_id(),
1325 reason: FlushReason::Others,
1326 senders: Vec::new(),
1327 request_sender: tx.clone(),
1328 access_layer: env.access_layer.clone(),
1329 listener: WorkerListener::default(),
1330 engine_config: Arc::new(MitoConfig::default()),
1331 row_group_size: None,
1332 cache_manager: Arc::new(CacheManager::default()),
1333 manifest_ctx: manifest_ctx.clone(),
1334 index_options: IndexOptions::default(),
1335 flush_semaphore: Arc::new(Semaphore::new(2)),
1336 })
1337 .collect();
1338 let task = tasks.pop().unwrap();
1340 scheduler
1341 .schedule_flush(builder.region_id(), &version_control, task)
1342 .unwrap();
1343 assert_eq!(1, scheduler.region_status.len());
1345 assert_eq!(1, job_scheduler.num_jobs());
1346 let version_data = version_control.current();
1348 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1349 let output_rxs: Vec<_> = tasks
1351 .into_iter()
1352 .map(|mut task| {
1353 let (output_tx, output_rx) = oneshot::channel();
1354 task.push_sender(OptionOutputTx::from(output_tx));
1355 scheduler
1356 .schedule_flush(builder.region_id(), &version_control, task)
1357 .unwrap();
1358 output_rx
1359 })
1360 .collect();
1361 version_control.apply_edit(
1363 Some(RegionEdit {
1364 files_to_add: Vec::new(),
1365 files_to_remove: Vec::new(),
1366 timestamp_ms: None,
1367 compaction_time_window: None,
1368 flushed_entry_id: None,
1369 flushed_sequence: None,
1370 committed_sequence: None,
1371 }),
1372 &[0],
1373 builder.file_purger(),
1374 );
1375 scheduler.on_flush_success(builder.region_id());
1376 assert_eq!(1, job_scheduler.num_jobs());
1378 assert!(scheduler.region_status.is_empty());
1380 for output_rx in output_rxs {
1381 let output = output_rx.await.unwrap().unwrap();
1382 assert_eq!(output, 0);
1383 }
1384 }
1385
1386 #[test]
1388 fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1389 let metadata = metadata_for_test();
1391 let schema = to_flat_sst_arrow_schema(
1392 &metadata,
1393 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1394 );
1395
1396 let capacity = 16;
1399 let pk_codec = build_primary_key_codec(&metadata);
1400 let mut converter =
1401 BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1402 let kvs = build_key_values_with_ts_seq_values(
1403 &metadata,
1404 "dup_key".to_string(),
1405 1,
1406 vec![1000i64, 1000i64].into_iter(),
1407 vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1408 1,
1409 );
1410 converter.append_key_values(&kvs).unwrap();
1411 let part = converter.convert().unwrap();
1412
1413 let build_ranges = |append_mode: bool| -> MemtableRanges {
1416 let memtable = crate::memtable::bulk::BulkMemtable::new(
1417 1,
1418 metadata.clone(),
1419 None,
1420 None,
1421 append_mode,
1422 MergeMode::LastRow,
1423 );
1424 memtable.write_bulk(part.clone()).unwrap();
1425 memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1426 };
1427
1428 {
1430 let mem_ranges = build_ranges(false);
1431 assert_eq!(1, mem_ranges.ranges.len());
1432
1433 let options = RegionOptions {
1434 append_mode: false,
1435 merge_mode: Some(MergeMode::LastRow),
1436 ..Default::default()
1437 };
1438
1439 let flat_sources = memtable_flat_sources(
1440 schema.clone(),
1441 mem_ranges,
1442 &options,
1443 metadata.primary_key.len(),
1444 )
1445 .unwrap();
1446 assert!(flat_sources.encoded.is_empty());
1447 assert_eq!(1, flat_sources.sources.len());
1448
1449 let mut total_rows = 0usize;
1451 for source in flat_sources.sources {
1452 match source {
1453 crate::read::FlatSource::Iter(iter) => {
1454 for rb in iter {
1455 total_rows += rb.unwrap().num_rows();
1456 }
1457 }
1458 crate::read::FlatSource::Stream(_) => unreachable!(),
1459 }
1460 }
1461 assert_eq!(1, total_rows, "dedup should keep a single row");
1462 }
1463
1464 {
1466 let mem_ranges = build_ranges(true);
1467 assert_eq!(1, mem_ranges.ranges.len());
1468
1469 let options = RegionOptions {
1470 append_mode: true,
1471 ..Default::default()
1472 };
1473
1474 let flat_sources =
1475 memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1476 .unwrap();
1477 assert!(flat_sources.encoded.is_empty());
1478 assert_eq!(1, flat_sources.sources.len());
1479
1480 let mut total_rows = 0usize;
1481 for source in flat_sources.sources {
1482 match source {
1483 crate::read::FlatSource::Iter(iter) => {
1484 for rb in iter {
1485 total_rows += rb.unwrap().num_rows();
1486 }
1487 }
1488 crate::read::FlatSource::Stream(_) => unreachable!(),
1489 }
1490 }
1491 assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1492 }
1493 }
1494}