1use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use partition::expr::PartitionExpr;
26use smallvec::{SmallVec, smallvec};
27use snafu::ResultExt;
28use store_api::storage::{RegionId, SequenceNumber};
29use strum::IntoStaticStr;
30use tokio::sync::{Semaphore, mpsc, watch};
31
32use crate::access_layer::{
33 AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
34};
35use crate::cache::CacheManagerRef;
36use crate::config::MitoConfig;
37use crate::error::{
38 Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
39 RegionTruncatedSnafu, Result,
40};
41use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
42use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
43use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
44use crate::metrics::{
45 FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
46 INFLIGHT_FLUSH_COUNT,
47};
48use crate::read::FlatSource;
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
52use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
53use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
54use crate::request::{
55 BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
56 SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
57};
58use crate::schedule::scheduler::{Job, SchedulerRef};
59use crate::sst::file::FileMeta;
60use crate::sst::parquet::{
61 DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
62};
63use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
64use crate::worker::WorkerListener;
65
66pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
70 fn should_flush_engine(&self) -> bool;
72
73 fn should_stall(&self) -> bool;
75
76 fn reserve_mem(&self, mem: usize);
78
79 fn schedule_free_mem(&self, mem: usize);
84
85 fn free_mem(&self, mem: usize);
87
88 fn memory_usage(&self) -> usize;
90
91 fn flush_limit(&self) -> usize;
96}
97
98pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
99
100#[derive(Debug)]
105pub struct WriteBufferManagerImpl {
106 global_write_buffer_size: usize,
108 mutable_limit: usize,
110 memory_used: AtomicUsize,
112 memory_active: AtomicUsize,
114 notifier: Option<watch::Sender<()>>,
117}
118
119impl WriteBufferManagerImpl {
120 pub fn new(global_write_buffer_size: usize) -> Self {
122 Self {
123 global_write_buffer_size,
124 mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
125 memory_used: AtomicUsize::new(0),
126 memory_active: AtomicUsize::new(0),
127 notifier: None,
128 }
129 }
130
131 pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
133 self.notifier = Some(notifier);
134 self
135 }
136
137 pub fn mutable_usage(&self) -> usize {
139 self.memory_active.load(Ordering::Relaxed)
140 }
141
142 fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
144 global_write_buffer_size / 2
146 }
147}
148
149impl WriteBufferManager for WriteBufferManagerImpl {
150 fn should_flush_engine(&self) -> bool {
151 let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
152 if mutable_memtable_memory_usage >= self.mutable_limit {
153 debug!(
154 "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
155 mutable_memtable_memory_usage,
156 self.memory_usage(),
157 self.mutable_limit,
158 self.global_write_buffer_size,
159 );
160 return true;
161 }
162
163 let memory_usage = self.memory_used.load(Ordering::Relaxed);
164 if memory_usage >= self.global_write_buffer_size {
165 return true;
166 }
167
168 false
169 }
170
171 fn should_stall(&self) -> bool {
172 self.memory_usage() >= self.global_write_buffer_size
173 }
174
175 fn reserve_mem(&self, mem: usize) {
176 self.memory_used.fetch_add(mem, Ordering::Relaxed);
177 self.memory_active.fetch_add(mem, Ordering::Relaxed);
178 }
179
180 fn schedule_free_mem(&self, mem: usize) {
181 self.memory_active.fetch_sub(mem, Ordering::Relaxed);
182 }
183
184 fn free_mem(&self, mem: usize) {
185 self.memory_used.fetch_sub(mem, Ordering::Relaxed);
186 if let Some(notifier) = &self.notifier {
187 let _ = notifier.send(());
191 }
192 }
193
194 fn memory_usage(&self) -> usize {
195 self.memory_used.load(Ordering::Relaxed)
196 }
197
198 fn flush_limit(&self) -> usize {
199 self.mutable_limit
200 }
201}
202
203#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
205pub enum FlushReason {
206 Others,
208 EngineFull,
210 Manual,
212 Alter,
214 Periodically,
216 Downgrading,
218 EnterStaging,
220 Closing,
222}
223
224impl FlushReason {
225 fn as_str(&self) -> &'static str {
227 self.into()
228 }
229}
230
231pub(crate) struct RegionFlushTask {
233 pub(crate) region_id: RegionId,
235 pub(crate) reason: FlushReason,
237 pub(crate) senders: Vec<OutputTx>,
239 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
241
242 pub(crate) access_layer: AccessLayerRef,
243 pub(crate) listener: WorkerListener,
244 pub(crate) engine_config: Arc<MitoConfig>,
245 pub(crate) row_group_size: Option<usize>,
246 pub(crate) cache_manager: CacheManagerRef,
247 pub(crate) manifest_ctx: ManifestContextRef,
248
249 pub(crate) index_options: IndexOptions,
251 pub(crate) flush_semaphore: Arc<Semaphore>,
253 pub(crate) is_staging: bool,
255 pub(crate) partition_expr: Option<String>,
259}
260
261impl RegionFlushTask {
262 pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
264 if let Some(sender) = sender.take_inner() {
265 self.senders.push(sender);
266 }
267 }
268
269 fn on_success(self) {
271 for sender in self.senders {
272 sender.send(Ok(0));
273 }
274 }
275
276 fn on_failure(&mut self, err: Arc<Error>) {
278 for sender in self.senders.drain(..) {
279 sender.send(Err(err.clone()).context(FlushRegionSnafu {
280 region_id: self.region_id,
281 }));
282 }
283 }
284
285 fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
289 let version_data = version_control.current();
292
293 Box::pin(async move {
294 INFLIGHT_FLUSH_COUNT.inc();
295 self.do_flush(version_data).await;
296 INFLIGHT_FLUSH_COUNT.dec();
297 })
298 }
299
300 async fn do_flush(&mut self, version_data: VersionControlData) {
302 let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
303 self.listener.on_flush_begin(self.region_id).await;
304
305 let worker_request = match self.flush_memtables(&version_data).await {
306 Ok(edit) => {
307 let memtables_to_remove = version_data
308 .version
309 .memtables
310 .immutables()
311 .iter()
312 .map(|m| m.id())
313 .collect();
314 let flush_finished = FlushFinished {
315 region_id: self.region_id,
316 flushed_entry_id: version_data.last_entry_id,
318 senders: std::mem::take(&mut self.senders),
319 _timer: timer,
320 edit,
321 memtables_to_remove,
322 is_staging: self.is_staging,
323 flush_reason: self.reason,
324 };
325 WorkerRequest::Background {
326 region_id: self.region_id,
327 notify: BackgroundNotify::FlushFinished(flush_finished),
328 }
329 }
330 Err(e) => {
331 error!(e; "Failed to flush region {}", self.region_id);
332 timer.stop_and_discard();
334
335 let err = Arc::new(e);
336 self.on_failure(err.clone());
337 WorkerRequest::Background {
338 region_id: self.region_id,
339 notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
340 }
341 }
342 };
343 self.send_worker_request(worker_request).await;
344 }
345
346 async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
349 let version = &version_data.version;
352 let timer = FLUSH_ELAPSED
353 .with_label_values(&["flush_memtables"])
354 .start_timer();
355
356 let mut write_opts = WriteOptions {
357 write_buffer_size: self.engine_config.sst_write_buffer_size,
358 ..Default::default()
359 };
360 if let Some(row_group_size) = self.row_group_size {
361 write_opts.row_group_size = row_group_size;
362 }
363
364 let DoFlushMemtablesResult {
365 file_metas,
366 flushed_bytes,
367 series_count,
368 encoded_part_count,
369 flush_metrics,
370 } = self.do_flush_memtables(version, write_opts).await?;
371
372 if !file_metas.is_empty() {
373 FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
374 }
375
376 let mut file_ids = Vec::with_capacity(file_metas.len());
377 let mut total_rows = 0;
378 let mut total_bytes = 0;
379 for meta in &file_metas {
380 file_ids.push(meta.file_id);
381 total_rows += meta.num_rows;
382 total_bytes += meta.file_size;
383 }
384 info!(
385 "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
386 self.region_id,
387 self.reason.as_str(),
388 file_ids,
389 series_count,
390 total_rows,
391 total_bytes,
392 timer.stop_and_record(),
393 encoded_part_count,
394 flush_metrics,
395 );
396 flush_metrics.observe();
397
398 let edit = RegionEdit {
399 files_to_add: file_metas,
400 files_to_remove: Vec::new(),
401 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
402 compaction_time_window: None,
403 flushed_entry_id: Some(version_data.last_entry_id),
405 flushed_sequence: Some(version_data.committed_sequence),
406 committed_sequence: None,
407 };
408 info!(
409 "Applying {edit:?} to region {}, is_staging: {}",
410 self.region_id, self.is_staging
411 );
412
413 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
414
415 let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
416 RegionLeaderState::Downgrading
417 } else {
418 let current_state = self.manifest_ctx.current_state();
420 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
421 RegionLeaderState::Staging
422 } else {
423 RegionLeaderState::Writable
424 }
425 };
426 let version = self
429 .manifest_ctx
430 .update_manifest(expected_state, action_list, self.is_staging)
431 .await?;
432 info!(
433 "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
434 self.region_id,
435 self.is_staging,
436 self.reason.as_str()
437 );
438
439 Ok(edit)
440 }
441
442 async fn do_flush_memtables(
443 &self,
444 version: &VersionRef,
445 write_opts: WriteOptions,
446 ) -> Result<DoFlushMemtablesResult> {
447 let memtables = version.memtables.immutables();
448 let mut file_metas = Vec::with_capacity(memtables.len());
449 let mut flushed_bytes = 0;
450 let mut series_count = 0;
451 let mut encoded_part_count = 0;
452 let mut flush_metrics = Metrics::new(WriteType::Flush);
453 let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
454 for mem in memtables {
455 if mem.is_empty() {
456 continue;
458 }
459
460 let compact_start = std::time::Instant::now();
462 if let Err(e) = mem.compact(true) {
463 common_telemetry::error!(e; "Failed to compact memtable before flush");
464 }
465 let compact_cost = compact_start.elapsed();
466 flush_metrics.compact_memtable += compact_cost;
467
468 let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
470 let num_mem_ranges = mem_ranges.ranges.len();
471
472 let num_mem_rows = mem_ranges.num_rows();
474 let memtable_series_count = mem_ranges.series_count();
475 let memtable_id = mem.id();
476 series_count += memtable_series_count;
479
480 let flush_start = Instant::now();
481 let FlushFlatMemResult {
482 num_encoded,
483 num_sources,
484 results,
485 } = self
486 .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
487 .await?;
488 encoded_part_count += num_encoded;
489 for (source_idx, result) in results.into_iter().enumerate() {
490 let (max_sequence, ssts_written, metrics) = result?;
491 if ssts_written.is_empty() {
492 continue;
494 }
495
496 common_telemetry::debug!(
497 "Region {} flush one memtable {} {}/{}, metrics: {:?}",
498 self.region_id,
499 memtable_id,
500 source_idx,
501 num_sources,
502 metrics
503 );
504
505 flush_metrics = flush_metrics.merge(metrics);
506
507 file_metas.extend(ssts_written.into_iter().map(|sst_info| {
508 flushed_bytes += sst_info.file_size;
509 Self::new_file_meta(
510 self.region_id,
511 max_sequence,
512 sst_info,
513 partition_expr.clone(),
514 )
515 }));
516 }
517
518 common_telemetry::debug!(
519 "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
520 self.region_id,
521 num_sources,
522 memtable_id,
523 num_mem_ranges,
524 num_encoded,
525 num_mem_rows,
526 flush_start.elapsed(),
527 compact_cost,
528 );
529 }
530
531 Ok(DoFlushMemtablesResult {
532 file_metas,
533 flushed_bytes,
534 series_count,
535 encoded_part_count,
536 flush_metrics,
537 })
538 }
539
540 async fn flush_flat_mem_ranges(
541 &self,
542 version: &VersionRef,
543 write_opts: &WriteOptions,
544 mem_ranges: MemtableRanges,
545 ) -> Result<FlushFlatMemResult> {
546 let batch_schema = to_flat_sst_arrow_schema(
547 &version.metadata,
548 &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
549 );
550 let field_column_start =
551 flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
552 let flat_sources = memtable_flat_sources(
553 batch_schema,
554 mem_ranges,
555 &version.options,
556 field_column_start,
557 )?;
558 let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
559 let num_encoded = flat_sources.encoded.len();
560 for (source, max_sequence) in flat_sources.sources {
561 let write_request = self.new_write_request(version, max_sequence, source);
562 let access_layer = self.access_layer.clone();
563 let write_opts = write_opts.clone();
564 let semaphore = self.flush_semaphore.clone();
565 let task = common_runtime::spawn_global(async move {
566 let _permit = semaphore.acquire().await.unwrap();
567 let mut metrics = Metrics::new(WriteType::Flush);
568 let ssts = access_layer
569 .write_sst(write_request, &write_opts, &mut metrics)
570 .await?;
571 FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
572 Ok((max_sequence, ssts, metrics))
573 });
574 tasks.push(task);
575 }
576 for (encoded, max_sequence) in flat_sources.encoded {
577 let access_layer = self.access_layer.clone();
578 let cache_manager = self.cache_manager.clone();
579 let region_id = version.metadata.region_id;
580 let semaphore = self.flush_semaphore.clone();
581 let task = common_runtime::spawn_global(async move {
582 let _permit = semaphore.acquire().await.unwrap();
583 let metrics = access_layer
584 .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
585 .await?;
586 FLUSH_FILE_TOTAL.inc();
587 Ok((max_sequence, smallvec![encoded.sst_info], metrics))
588 });
589 tasks.push(task);
590 }
591 let num_sources = tasks.len();
592 let results = futures::future::try_join_all(tasks)
593 .await
594 .context(JoinSnafu)?;
595 Ok(FlushFlatMemResult {
596 num_encoded,
597 num_sources,
598 results,
599 })
600 }
601
602 fn new_file_meta(
603 region_id: RegionId,
604 max_sequence: u64,
605 sst_info: SstInfo,
606 partition_expr: Option<PartitionExpr>,
607 ) -> FileMeta {
608 FileMeta {
609 region_id,
610 file_id: sst_info.file_id,
611 time_range: sst_info.time_range,
612 level: 0,
613 file_size: sst_info.file_size,
614 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
615 available_indexes: sst_info.index_metadata.build_available_indexes(),
616 indexes: sst_info.index_metadata.build_indexes(),
617 index_file_size: sst_info.index_metadata.file_size,
618 index_version: 0,
619 num_rows: sst_info.num_rows as u64,
620 num_row_groups: sst_info.num_row_groups,
621 sequence: NonZeroU64::new(max_sequence),
622 partition_expr,
623 num_series: sst_info.num_series,
624 }
625 }
626
627 fn new_write_request(
628 &self,
629 version: &VersionRef,
630 max_sequence: u64,
631 source: FlatSource,
632 ) -> SstWriteRequest {
633 let flat_format = version
634 .options
635 .sst_format
636 .map(|f| f == FormatType::Flat)
637 .unwrap_or(self.engine_config.default_experimental_flat_format);
638 SstWriteRequest {
639 op_type: OperationType::Flush,
640 metadata: version.metadata.clone(),
641 source,
642 cache_manager: self.cache_manager.clone(),
643 storage: version.options.storage.clone(),
644 max_sequence: Some(max_sequence),
645 sst_write_format: if flat_format {
646 FormatType::Flat
647 } else {
648 FormatType::PrimaryKey
649 },
650 index_options: self.index_options.clone(),
651 index_config: self.engine_config.index.clone(),
652 inverted_index_config: self.engine_config.inverted_index.clone(),
653 fulltext_index_config: self.engine_config.fulltext_index.clone(),
654 bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
655 #[cfg(feature = "vector_index")]
656 vector_index_config: self.engine_config.vector_index.clone(),
657 }
658 }
659
660 pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
662 if let Err(e) = self
663 .request_sender
664 .send(WorkerRequestWithTime::new(request))
665 .await
666 {
667 error!(
668 "Failed to notify flush job status for region {}, request: {:?}",
669 self.region_id, e.0
670 );
671 }
672 }
673
674 fn merge(&mut self, mut other: RegionFlushTask) {
676 assert_eq!(self.region_id, other.region_id);
677 self.senders.append(&mut other.senders);
679 }
680}
681
682struct FlushFlatMemResult {
683 num_encoded: usize,
684 num_sources: usize,
685 results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
686}
687
688struct DoFlushMemtablesResult {
689 file_metas: Vec<FileMeta>,
690 flushed_bytes: u64,
691 series_count: usize,
692 encoded_part_count: usize,
693 flush_metrics: Metrics,
694}
695
696struct FlatSources {
697 sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
698 encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
699}
700
701fn memtable_flat_sources(
703 schema: SchemaRef,
704 mem_ranges: MemtableRanges,
705 options: &RegionOptions,
706 field_column_start: usize,
707) -> Result<FlatSources> {
708 let MemtableRanges { ranges } = mem_ranges;
709 let mut flat_sources = FlatSources {
710 sources: SmallVec::new(),
711 encoded: SmallVec::new(),
712 };
713
714 if ranges.len() == 1 {
715 debug!("Flushing single flat range");
716
717 let only_range = ranges.into_values().next().unwrap();
718 let max_sequence = only_range.stats().max_sequence();
719 if let Some(encoded) = only_range.encoded() {
720 flat_sources.encoded.push((encoded, max_sequence));
721 } else {
722 let iter = only_range.build_record_batch_iter(None, None)?;
723 let iter = maybe_dedup_one(
726 options.append_mode,
727 options.merge_mode(),
728 field_column_start,
729 iter,
730 );
731 flat_sources
732 .sources
733 .push((FlatSource::Iter(iter), max_sequence));
734 };
735 } else {
736 let min_flush_rows = *ENCODE_ROW_THRESHOLD;
737 let total_rows: usize = ranges
739 .values()
740 .filter(|r| r.encoded().is_none())
741 .map(|r| r.num_rows())
742 .sum();
743 debug!(
744 "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
745 total_rows,
746 min_flush_rows,
747 ranges.len()
748 );
749 let mut rows_remaining = total_rows;
750 let mut last_iter_rows = 0;
751 let num_ranges = ranges.len();
752 let mut input_iters = Vec::with_capacity(num_ranges);
753 let mut current_ranges = Vec::new();
754 for (_range_id, range) in ranges {
755 if let Some(encoded) = range.encoded() {
756 let max_sequence = range.stats().max_sequence();
757 flat_sources.encoded.push((encoded, max_sequence));
758 continue;
759 }
760
761 let iter = range.build_record_batch_iter(None, None)?;
762 input_iters.push(iter);
763 let range_rows = range.num_rows();
764 last_iter_rows += range_rows;
765 rows_remaining -= range_rows;
766 current_ranges.push(range);
767
768 if last_iter_rows >= min_flush_rows
771 && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
772 {
773 debug!(
774 "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
775 last_iter_rows,
776 min_flush_rows,
777 input_iters.len(),
778 rows_remaining
779 );
780
781 let max_sequence = current_ranges
783 .iter()
784 .map(|r| r.stats().max_sequence())
785 .max()
786 .unwrap_or(0);
787
788 let maybe_dedup = merge_and_dedup(
789 &schema,
790 options.append_mode,
791 options.merge_mode(),
792 field_column_start,
793 std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
794 )?;
795
796 flat_sources
797 .sources
798 .push((FlatSource::Iter(maybe_dedup), max_sequence));
799 last_iter_rows = 0;
800 current_ranges.clear();
801 }
802 }
803
804 if !input_iters.is_empty() {
806 debug!(
807 "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
808 last_iter_rows,
809 min_flush_rows,
810 input_iters.len(),
811 rows_remaining
812 );
813 let max_sequence = current_ranges
814 .iter()
815 .map(|r| r.stats().max_sequence())
816 .max()
817 .unwrap_or(0);
818
819 let maybe_dedup = merge_and_dedup(
820 &schema,
821 options.append_mode,
822 options.merge_mode(),
823 field_column_start,
824 input_iters,
825 )?;
826
827 flat_sources
828 .sources
829 .push((FlatSource::Iter(maybe_dedup), max_sequence));
830 }
831 }
832
833 Ok(flat_sources)
834}
835
836pub fn merge_and_dedup(
881 schema: &SchemaRef,
882 append_mode: bool,
883 merge_mode: MergeMode,
884 field_column_start: usize,
885 input_iters: Vec<BoxedRecordBatchIterator>,
886) -> Result<BoxedRecordBatchIterator> {
887 let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
888 let maybe_dedup = if append_mode {
889 Box::new(merge_iter) as _
891 } else {
892 match merge_mode {
894 MergeMode::LastRow => {
895 Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
896 }
897 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
898 merge_iter,
899 FlatLastNonNull::new(field_column_start, false),
900 )) as _,
901 }
902 };
903 Ok(maybe_dedup)
904}
905
906pub fn maybe_dedup_one(
907 append_mode: bool,
908 merge_mode: MergeMode,
909 field_column_start: usize,
910 input_iter: BoxedRecordBatchIterator,
911) -> BoxedRecordBatchIterator {
912 if append_mode {
913 input_iter
915 } else {
916 match merge_mode {
918 MergeMode::LastRow => {
919 Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
920 }
921 MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
922 input_iter,
923 FlatLastNonNull::new(field_column_start, false),
924 )),
925 }
926 }
927}
928
929pub(crate) struct FlushScheduler {
931 region_status: HashMap<RegionId, FlushStatus>,
933 scheduler: SchedulerRef,
935}
936
937impl FlushScheduler {
938 pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
940 FlushScheduler {
941 region_status: HashMap::new(),
942 scheduler,
943 }
944 }
945
946 pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
948 self.region_status.contains_key(®ion_id)
949 }
950
951 fn schedule_flush_task(
952 &mut self,
953 version_control: &VersionControlRef,
954 task: RegionFlushTask,
955 ) -> Result<()> {
956 let region_id = task.region_id;
957
958 if let Err(e) = version_control.freeze_mutable() {
960 error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
961
962 return Err(e);
963 }
964 let job = task.into_flush_job(version_control);
966 if let Err(e) = self.scheduler.schedule(job) {
967 error!(e; "Failed to schedule flush job for region {}", region_id);
970
971 return Err(e);
972 }
973 Ok(())
974 }
975
976 pub(crate) fn schedule_flush(
978 &mut self,
979 region_id: RegionId,
980 version_control: &VersionControlRef,
981 task: RegionFlushTask,
982 ) -> Result<()> {
983 debug_assert_eq!(region_id, task.region_id);
984
985 let version = version_control.current().version;
986 if version.memtables.is_empty() {
987 debug_assert!(!self.region_status.contains_key(®ion_id));
988 task.on_success();
990 return Ok(());
991 }
992
993 FLUSH_REQUESTS_TOTAL
995 .with_label_values(&[task.reason.as_str()])
996 .inc();
997
998 if let Some(flush_status) = self.region_status.get_mut(®ion_id) {
1000 debug!("Merging flush task for region {}", region_id);
1002 flush_status.merge_task(task);
1003 return Ok(());
1004 }
1005
1006 self.schedule_flush_task(version_control, task)?;
1007
1008 let _ = self.region_status.insert(
1010 region_id,
1011 FlushStatus::new(region_id, version_control.clone()),
1012 );
1013
1014 Ok(())
1015 }
1016
1017 pub(crate) fn on_flush_success(
1021 &mut self,
1022 region_id: RegionId,
1023 ) -> Option<(
1024 Vec<SenderDdlRequest>,
1025 Vec<SenderWriteRequest>,
1026 Vec<SenderBulkRequest>,
1027 )> {
1028 let flush_status = self.region_status.get_mut(®ion_id)?;
1029 if flush_status.pending_task.is_none() {
1031 debug!(
1034 "Region {} doesn't have any pending flush task, removing it from the status",
1035 region_id
1036 );
1037 let flush_status = self.region_status.remove(®ion_id).unwrap();
1038 return Some((
1039 flush_status.pending_ddls,
1040 flush_status.pending_writes,
1041 flush_status.pending_bulk_writes,
1042 ));
1043 }
1044
1045 let version_data = flush_status.version_control.current();
1047 if version_data.version.memtables.is_empty() {
1048 let task = flush_status.pending_task.take().unwrap();
1051 task.on_success();
1053 debug!(
1054 "Region {} has nothing to flush, removing it from the status",
1055 region_id
1056 );
1057 let flush_status = self.region_status.remove(®ion_id).unwrap();
1059 return Some((
1060 flush_status.pending_ddls,
1061 flush_status.pending_writes,
1062 flush_status.pending_bulk_writes,
1063 ));
1064 }
1065
1066 debug!("Scheduling pending flush task for region {}", region_id);
1068 let task = flush_status.pending_task.take().unwrap();
1070 let version_control = flush_status.version_control.clone();
1071 if let Err(err) = self.schedule_flush_task(&version_control, task) {
1072 error!(
1073 err;
1074 "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1075 );
1076 }
1077 None
1079 }
1080
1081 pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1083 error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1084
1085 FLUSH_FAILURE_TOTAL.inc();
1086
1087 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1089 return;
1090 };
1091
1092 flush_status.on_failure(err);
1094 }
1095
1096 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1098 self.remove_region_on_failure(
1099 region_id,
1100 Arc::new(RegionDroppedSnafu { region_id }.build()),
1101 );
1102 }
1103
1104 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1106 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1107 }
1108
1109 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1111 self.remove_region_on_failure(
1112 region_id,
1113 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1114 );
1115 }
1116
1117 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1118 let Some(flush_status) = self.region_status.remove(®ion_id) else {
1120 return;
1121 };
1122
1123 flush_status.on_failure(err);
1125 }
1126
1127 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1132 let status = self.region_status.get_mut(&request.region_id).unwrap();
1133 status.pending_ddls.push(request);
1134 }
1135
1136 pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1141 let status = self
1142 .region_status
1143 .get_mut(&request.request.region_id)
1144 .unwrap();
1145 status.pending_writes.push(request);
1146 }
1147
1148 pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1153 let status = self.region_status.get_mut(&request.region_id).unwrap();
1154 status.pending_bulk_writes.push(request);
1155 }
1156
1157 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1159 self.region_status
1160 .get(®ion_id)
1161 .map(|status| !status.pending_ddls.is_empty())
1162 .unwrap_or(false)
1163 }
1164}
1165
1166impl Drop for FlushScheduler {
1167 fn drop(&mut self) {
1168 for (region_id, flush_status) in self.region_status.drain() {
1169 flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1171 }
1172 }
1173}
1174
1175struct FlushStatus {
1179 region_id: RegionId,
1181 version_control: VersionControlRef,
1183 pending_task: Option<RegionFlushTask>,
1185 pending_ddls: Vec<SenderDdlRequest>,
1187 pending_writes: Vec<SenderWriteRequest>,
1189 pending_bulk_writes: Vec<SenderBulkRequest>,
1191}
1192
1193impl FlushStatus {
1194 fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1195 FlushStatus {
1196 region_id,
1197 version_control,
1198 pending_task: None,
1199 pending_ddls: Vec::new(),
1200 pending_writes: Vec::new(),
1201 pending_bulk_writes: Vec::new(),
1202 }
1203 }
1204
1205 fn merge_task(&mut self, task: RegionFlushTask) {
1207 if let Some(pending) = &mut self.pending_task {
1208 pending.merge(task);
1209 } else {
1210 self.pending_task = Some(task);
1211 }
1212 }
1213
1214 fn on_failure(self, err: Arc<Error>) {
1215 if let Some(mut task) = self.pending_task {
1216 task.on_failure(err.clone());
1217 }
1218 for ddl in self.pending_ddls {
1219 ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1220 region_id: self.region_id,
1221 }));
1222 }
1223 for write_req in self.pending_writes {
1224 write_req
1225 .sender
1226 .send(Err(err.clone()).context(FlushRegionSnafu {
1227 region_id: self.region_id,
1228 }));
1229 }
1230 }
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235 use mito_codec::row_converter::build_primary_key_codec;
1236 use tokio::sync::oneshot;
1237
1238 use super::*;
1239 use crate::cache::CacheManager;
1240 use crate::memtable::bulk::part::BulkPartConverter;
1241 use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1242 use crate::memtable::{Memtable, RangesOptions};
1243 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1244 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1245 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1246 use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1247
1248 #[test]
1249 fn test_get_mutable_limit() {
1250 assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1251 assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1252 assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1253 assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1254 }
1255
1256 #[test]
1257 fn test_over_mutable_limit() {
1258 let manager = WriteBufferManagerImpl::new(1000);
1260 manager.reserve_mem(400);
1261 assert!(!manager.should_flush_engine());
1262 assert!(!manager.should_stall());
1263
1264 manager.reserve_mem(400);
1266 assert!(manager.should_flush_engine());
1267
1268 manager.schedule_free_mem(400);
1270 assert!(!manager.should_flush_engine());
1271 assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1272 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1273
1274 manager.free_mem(400);
1276 assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1277 assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1278 }
1279
1280 #[test]
1281 fn test_over_global() {
1282 let manager = WriteBufferManagerImpl::new(1000);
1284 manager.reserve_mem(1100);
1285 assert!(manager.should_stall());
1286 manager.schedule_free_mem(200);
1288 assert!(manager.should_flush_engine());
1289 assert!(manager.should_stall());
1290
1291 manager.schedule_free_mem(450);
1293 assert!(manager.should_flush_engine());
1294 assert!(manager.should_stall());
1295
1296 manager.reserve_mem(50);
1298 assert!(manager.should_flush_engine());
1299 manager.reserve_mem(100);
1300 assert!(manager.should_flush_engine());
1301 }
1302
1303 #[test]
1304 fn test_manager_notify() {
1305 let (sender, receiver) = watch::channel(());
1306 let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1307 manager.reserve_mem(500);
1308 assert!(!receiver.has_changed().unwrap());
1309 manager.schedule_free_mem(500);
1310 assert!(!receiver.has_changed().unwrap());
1311 manager.free_mem(500);
1312 assert!(receiver.has_changed().unwrap());
1313 }
1314
1315 #[tokio::test]
1316 async fn test_schedule_empty() {
1317 let env = SchedulerEnv::new().await;
1318 let (tx, _rx) = mpsc::channel(4);
1319 let mut scheduler = env.mock_flush_scheduler();
1320 let builder = VersionControlBuilder::new();
1321
1322 let version_control = Arc::new(builder.build());
1323 let (output_tx, output_rx) = oneshot::channel();
1324 let mut task = RegionFlushTask {
1325 region_id: builder.region_id(),
1326 reason: FlushReason::Others,
1327 senders: Vec::new(),
1328 request_sender: tx,
1329 access_layer: env.access_layer.clone(),
1330 listener: WorkerListener::default(),
1331 engine_config: Arc::new(MitoConfig::default()),
1332 row_group_size: None,
1333 cache_manager: Arc::new(CacheManager::default()),
1334 manifest_ctx: env
1335 .mock_manifest_context(version_control.current().version.metadata.clone())
1336 .await,
1337 index_options: IndexOptions::default(),
1338 flush_semaphore: Arc::new(Semaphore::new(2)),
1339 is_staging: false,
1340 partition_expr: None,
1341 };
1342 task.push_sender(OptionOutputTx::from(output_tx));
1343 scheduler
1344 .schedule_flush(builder.region_id(), &version_control, task)
1345 .unwrap();
1346 assert!(scheduler.region_status.is_empty());
1347 let output = output_rx.await.unwrap().unwrap();
1348 assert_eq!(output, 0);
1349 assert!(scheduler.region_status.is_empty());
1350 }
1351
1352 #[tokio::test]
1353 async fn test_schedule_pending_request() {
1354 let job_scheduler = Arc::new(VecScheduler::default());
1355 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1356 let (tx, _rx) = mpsc::channel(4);
1357 let mut scheduler = env.mock_flush_scheduler();
1358 let mut builder = VersionControlBuilder::new();
1359 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1361 let version_control = Arc::new(builder.build());
1362 let version_data = version_control.current();
1364 write_rows_to_version(&version_data.version, "host0", 0, 10);
1365 let manifest_ctx = env
1366 .mock_manifest_context(version_data.version.metadata.clone())
1367 .await;
1368 let mut tasks: Vec<_> = (0..3)
1370 .map(|_| RegionFlushTask {
1371 region_id: builder.region_id(),
1372 reason: FlushReason::Others,
1373 senders: Vec::new(),
1374 request_sender: tx.clone(),
1375 access_layer: env.access_layer.clone(),
1376 listener: WorkerListener::default(),
1377 engine_config: Arc::new(MitoConfig::default()),
1378 row_group_size: None,
1379 cache_manager: Arc::new(CacheManager::default()),
1380 manifest_ctx: manifest_ctx.clone(),
1381 index_options: IndexOptions::default(),
1382 flush_semaphore: Arc::new(Semaphore::new(2)),
1383 is_staging: false,
1384 partition_expr: None,
1385 })
1386 .collect();
1387 let task = tasks.pop().unwrap();
1389 scheduler
1390 .schedule_flush(builder.region_id(), &version_control, task)
1391 .unwrap();
1392 assert_eq!(1, scheduler.region_status.len());
1394 assert_eq!(1, job_scheduler.num_jobs());
1395 let version_data = version_control.current();
1397 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1398 let output_rxs: Vec<_> = tasks
1400 .into_iter()
1401 .map(|mut task| {
1402 let (output_tx, output_rx) = oneshot::channel();
1403 task.push_sender(OptionOutputTx::from(output_tx));
1404 scheduler
1405 .schedule_flush(builder.region_id(), &version_control, task)
1406 .unwrap();
1407 output_rx
1408 })
1409 .collect();
1410 version_control.apply_edit(
1412 Some(RegionEdit {
1413 files_to_add: Vec::new(),
1414 files_to_remove: Vec::new(),
1415 timestamp_ms: None,
1416 compaction_time_window: None,
1417 flushed_entry_id: None,
1418 flushed_sequence: None,
1419 committed_sequence: None,
1420 }),
1421 &[0],
1422 builder.file_purger(),
1423 );
1424 scheduler.on_flush_success(builder.region_id());
1425 assert_eq!(1, job_scheduler.num_jobs());
1427 assert!(scheduler.region_status.is_empty());
1429 for output_rx in output_rxs {
1430 let output = output_rx.await.unwrap().unwrap();
1431 assert_eq!(output, 0);
1432 }
1433 }
1434
1435 #[test]
1437 fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1438 let metadata = metadata_for_test();
1440 let schema = to_flat_sst_arrow_schema(
1441 &metadata,
1442 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1443 );
1444
1445 let capacity = 16;
1448 let pk_codec = build_primary_key_codec(&metadata);
1449 let mut converter =
1450 BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1451 let kvs = build_key_values_with_ts_seq_values(
1452 &metadata,
1453 "dup_key".to_string(),
1454 1,
1455 vec![1000i64, 1000i64].into_iter(),
1456 vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1457 1,
1458 );
1459 converter.append_key_values(&kvs).unwrap();
1460 let part = converter.convert().unwrap();
1461
1462 let build_ranges = |append_mode: bool| -> MemtableRanges {
1465 let memtable = crate::memtable::bulk::BulkMemtable::new(
1466 1,
1467 crate::memtable::bulk::BulkMemtableConfig::default(),
1468 metadata.clone(),
1469 None,
1470 None,
1471 append_mode,
1472 MergeMode::LastRow,
1473 );
1474 memtable.write_bulk(part.clone()).unwrap();
1475 memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1476 };
1477
1478 {
1480 let mem_ranges = build_ranges(false);
1481 assert_eq!(1, mem_ranges.ranges.len());
1482
1483 let options = RegionOptions {
1484 append_mode: false,
1485 merge_mode: Some(MergeMode::LastRow),
1486 ..Default::default()
1487 };
1488
1489 let flat_sources = memtable_flat_sources(
1490 schema.clone(),
1491 mem_ranges,
1492 &options,
1493 metadata.primary_key.len(),
1494 )
1495 .unwrap();
1496 assert!(flat_sources.encoded.is_empty());
1497 assert_eq!(1, flat_sources.sources.len());
1498
1499 let mut total_rows = 0usize;
1501 for (source, _sequence) in flat_sources.sources {
1502 match source {
1503 crate::read::FlatSource::Iter(iter) => {
1504 for rb in iter {
1505 total_rows += rb.unwrap().num_rows();
1506 }
1507 }
1508 crate::read::FlatSource::Stream(_) => unreachable!(),
1509 }
1510 }
1511 assert_eq!(1, total_rows, "dedup should keep a single row");
1512 }
1513
1514 {
1516 let mem_ranges = build_ranges(true);
1517 assert_eq!(1, mem_ranges.ranges.len());
1518
1519 let options = RegionOptions {
1520 append_mode: true,
1521 ..Default::default()
1522 };
1523
1524 let flat_sources =
1525 memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1526 .unwrap();
1527 assert!(flat_sources.encoded.is_empty());
1528 assert_eq!(1, flat_sources.sources.len());
1529
1530 let mut total_rows = 0usize;
1531 for (source, _sequence) in flat_sources.sources {
1532 match source {
1533 crate::read::FlatSource::Iter(iter) => {
1534 for rb in iter {
1535 total_rows += rb.unwrap().num_rows();
1536 }
1537 }
1538 crate::read::FlatSource::Stream(_) => unreachable!(),
1539 }
1540 }
1541 assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1542 }
1543 }
1544
1545 #[tokio::test]
1546 async fn test_schedule_pending_request_on_flush_success() {
1547 common_telemetry::init_default_ut_logging();
1548 let job_scheduler = Arc::new(VecScheduler::default());
1549 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1550 let (tx, _rx) = mpsc::channel(4);
1551 let mut scheduler = env.mock_flush_scheduler();
1552 let mut builder = VersionControlBuilder::new();
1553 builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1555 let version_control = Arc::new(builder.build());
1556 let version_data = version_control.current();
1558 write_rows_to_version(&version_data.version, "host0", 0, 10);
1559 let manifest_ctx = env
1560 .mock_manifest_context(version_data.version.metadata.clone())
1561 .await;
1562 let mut tasks: Vec<_> = (0..2)
1564 .map(|_| RegionFlushTask {
1565 region_id: builder.region_id(),
1566 reason: FlushReason::Others,
1567 senders: Vec::new(),
1568 request_sender: tx.clone(),
1569 access_layer: env.access_layer.clone(),
1570 listener: WorkerListener::default(),
1571 engine_config: Arc::new(MitoConfig::default()),
1572 row_group_size: None,
1573 cache_manager: Arc::new(CacheManager::default()),
1574 manifest_ctx: manifest_ctx.clone(),
1575 index_options: IndexOptions::default(),
1576 flush_semaphore: Arc::new(Semaphore::new(2)),
1577 is_staging: false,
1578 partition_expr: None,
1579 })
1580 .collect();
1581 let task = tasks.pop().unwrap();
1583 scheduler
1584 .schedule_flush(builder.region_id(), &version_control, task)
1585 .unwrap();
1586 assert_eq!(1, scheduler.region_status.len());
1588 assert_eq!(1, job_scheduler.num_jobs());
1589 let task = tasks.pop().unwrap();
1591 scheduler
1592 .schedule_flush(builder.region_id(), &version_control, task)
1593 .unwrap();
1594 assert!(
1595 scheduler
1596 .region_status
1597 .get(&builder.region_id())
1598 .unwrap()
1599 .pending_task
1600 .is_some()
1601 );
1602
1603 let version_data = version_control.current();
1605 assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1606 version_control.apply_edit(
1608 Some(RegionEdit {
1609 files_to_add: Vec::new(),
1610 files_to_remove: Vec::new(),
1611 timestamp_ms: None,
1612 compaction_time_window: None,
1613 flushed_entry_id: None,
1614 flushed_sequence: None,
1615 committed_sequence: None,
1616 }),
1617 &[0],
1618 builder.file_purger(),
1619 );
1620 write_rows_to_version(&version_data.version, "host1", 0, 10);
1621 scheduler.on_flush_success(builder.region_id());
1622 assert_eq!(2, job_scheduler.num_jobs());
1623 assert!(
1625 scheduler
1626 .region_status
1627 .get(&builder.region_id())
1628 .unwrap()
1629 .pending_task
1630 .is_none()
1631 );
1632 }
1633}