mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use partition::expr::PartitionExpr;
26use smallvec::{SmallVec, smallvec};
27use snafu::ResultExt;
28use store_api::storage::{RegionId, SequenceNumber};
29use strum::IntoStaticStr;
30use tokio::sync::{Semaphore, mpsc, watch};
31
32use crate::access_layer::{
33    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
34};
35use crate::cache::CacheManagerRef;
36use crate::config::MitoConfig;
37use crate::error::{
38    Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
39    RegionTruncatedSnafu, Result,
40};
41use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
42use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
43use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
44use crate::metrics::{
45    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
46    INFLIGHT_FLUSH_COUNT,
47};
48use crate::read::FlatSource;
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
52use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
53use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
54use crate::request::{
55    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
56    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
57};
58use crate::schedule::scheduler::{Job, SchedulerRef};
59use crate::sst::file::FileMeta;
60use crate::sst::parquet::{
61    DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
62};
63use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
64use crate::worker::WorkerListener;
65
66/// Global write buffer (memtable) manager.
67///
68/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
69pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
70    /// Returns whether to trigger the engine.
71    fn should_flush_engine(&self) -> bool;
72
73    /// Returns whether to stall write requests.
74    fn should_stall(&self) -> bool;
75
76    /// Reserves `mem` bytes.
77    fn reserve_mem(&self, mem: usize);
78
79    /// Tells the manager we are freeing `mem` bytes.
80    ///
81    /// We are in the process of freeing `mem` bytes, so it is not considered
82    /// when checking the soft limit.
83    fn schedule_free_mem(&self, mem: usize);
84
85    /// We have freed `mem` bytes.
86    fn free_mem(&self, mem: usize);
87
88    /// Returns the total memory used by memtables.
89    fn memory_usage(&self) -> usize;
90
91    /// Returns the mutable memtable memory limit.
92    ///
93    /// The write buffer manager should flush memtables when the mutable memory usage
94    /// exceeds this limit.
95    fn flush_limit(&self) -> usize;
96}
97
98pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
99
100/// Default [WriteBufferManager] implementation.
101///
102/// Inspired by RocksDB's WriteBufferManager.
103/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
104#[derive(Debug)]
105pub struct WriteBufferManagerImpl {
106    /// Write buffer size for the engine.
107    global_write_buffer_size: usize,
108    /// Mutable memtable memory size limit.
109    mutable_limit: usize,
110    /// Memory in used (e.g. used by mutable and immutable memtables).
111    memory_used: AtomicUsize,
112    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
113    memory_active: AtomicUsize,
114    /// Optional notifier.
115    /// The manager can wake up the worker once we free the write buffer.
116    notifier: Option<watch::Sender<()>>,
117}
118
119impl WriteBufferManagerImpl {
120    /// Returns a new manager with specific `global_write_buffer_size`.
121    pub fn new(global_write_buffer_size: usize) -> Self {
122        Self {
123            global_write_buffer_size,
124            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
125            memory_used: AtomicUsize::new(0),
126            memory_active: AtomicUsize::new(0),
127            notifier: None,
128        }
129    }
130
131    /// Attaches a notifier to the manager.
132    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
133        self.notifier = Some(notifier);
134        self
135    }
136
137    /// Returns memory usage of mutable memtables.
138    pub fn mutable_usage(&self) -> usize {
139        self.memory_active.load(Ordering::Relaxed)
140    }
141
142    /// Returns the size limit for mutable memtables.
143    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
144        // Reserves half of the write buffer for mutable memtable.
145        global_write_buffer_size / 2
146    }
147}
148
149impl WriteBufferManager for WriteBufferManagerImpl {
150    fn should_flush_engine(&self) -> bool {
151        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
152        if mutable_memtable_memory_usage >= self.mutable_limit {
153            debug!(
154                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
155                mutable_memtable_memory_usage,
156                self.memory_usage(),
157                self.mutable_limit,
158                self.global_write_buffer_size,
159            );
160            return true;
161        }
162
163        let memory_usage = self.memory_used.load(Ordering::Relaxed);
164        if memory_usage >= self.global_write_buffer_size {
165            return true;
166        }
167
168        false
169    }
170
171    fn should_stall(&self) -> bool {
172        self.memory_usage() >= self.global_write_buffer_size
173    }
174
175    fn reserve_mem(&self, mem: usize) {
176        self.memory_used.fetch_add(mem, Ordering::Relaxed);
177        self.memory_active.fetch_add(mem, Ordering::Relaxed);
178    }
179
180    fn schedule_free_mem(&self, mem: usize) {
181        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
182    }
183
184    fn free_mem(&self, mem: usize) {
185        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
186        if let Some(notifier) = &self.notifier {
187            // Notifies the worker after the memory usage is decreased. When we drop the memtable
188            // outside of the worker, the worker may still stall requests because the memory usage
189            // is not updated. So we need to notify the worker to handle stalled requests again.
190            let _ = notifier.send(());
191        }
192    }
193
194    fn memory_usage(&self) -> usize {
195        self.memory_used.load(Ordering::Relaxed)
196    }
197
198    fn flush_limit(&self) -> usize {
199        self.mutable_limit
200    }
201}
202
203/// Reason of a flush task.
204#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
205pub enum FlushReason {
206    /// Other reasons.
207    Others,
208    /// Engine reaches flush threshold.
209    EngineFull,
210    /// Manual flush.
211    Manual,
212    /// Flush to alter table.
213    Alter,
214    /// Flush periodically.
215    Periodically,
216    /// Flush memtable during downgrading state.
217    Downgrading,
218    /// Enter staging mode.
219    EnterStaging,
220    /// Flush when region is closing.
221    Closing,
222}
223
224impl FlushReason {
225    /// Get flush reason as static str.
226    fn as_str(&self) -> &'static str {
227        self.into()
228    }
229}
230
231/// Task to flush a region.
232pub(crate) struct RegionFlushTask {
233    /// Region to flush.
234    pub(crate) region_id: RegionId,
235    /// Reason to flush.
236    pub(crate) reason: FlushReason,
237    /// Flush result senders.
238    pub(crate) senders: Vec<OutputTx>,
239    /// Request sender to notify the worker.
240    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
241
242    pub(crate) access_layer: AccessLayerRef,
243    pub(crate) listener: WorkerListener,
244    pub(crate) engine_config: Arc<MitoConfig>,
245    pub(crate) row_group_size: Option<usize>,
246    pub(crate) cache_manager: CacheManagerRef,
247    pub(crate) manifest_ctx: ManifestContextRef,
248
249    /// Index options for the region.
250    pub(crate) index_options: IndexOptions,
251    /// Semaphore to control flush concurrency.
252    pub(crate) flush_semaphore: Arc<Semaphore>,
253    /// Whether the region is in staging mode.
254    pub(crate) is_staging: bool,
255    /// Partition expression of the region.
256    ///
257    /// This is used to generate the file meta.
258    pub(crate) partition_expr: Option<String>,
259}
260
261impl RegionFlushTask {
262    /// Push the sender if it is not none.
263    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
264        if let Some(sender) = sender.take_inner() {
265            self.senders.push(sender);
266        }
267    }
268
269    /// Consumes the task and notify the sender the job is success.
270    fn on_success(self) {
271        for sender in self.senders {
272            sender.send(Ok(0));
273        }
274    }
275
276    /// Send flush error to waiter.
277    fn on_failure(&mut self, err: Arc<Error>) {
278        for sender in self.senders.drain(..) {
279            sender.send(Err(err.clone()).context(FlushRegionSnafu {
280                region_id: self.region_id,
281            }));
282        }
283    }
284
285    /// Converts the flush task into a background job.
286    ///
287    /// We must call this in the region worker.
288    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
289        // Get a version of this region before creating a job to get current
290        // wal entry id, sequence and immutable memtables.
291        let version_data = version_control.current();
292
293        Box::pin(async move {
294            INFLIGHT_FLUSH_COUNT.inc();
295            self.do_flush(version_data).await;
296            INFLIGHT_FLUSH_COUNT.dec();
297        })
298    }
299
300    /// Runs the flush task.
301    async fn do_flush(&mut self, version_data: VersionControlData) {
302        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
303        self.listener.on_flush_begin(self.region_id).await;
304
305        let worker_request = match self.flush_memtables(&version_data).await {
306            Ok(edit) => {
307                let memtables_to_remove = version_data
308                    .version
309                    .memtables
310                    .immutables()
311                    .iter()
312                    .map(|m| m.id())
313                    .collect();
314                let flush_finished = FlushFinished {
315                    region_id: self.region_id,
316                    // The last entry has been flushed.
317                    flushed_entry_id: version_data.last_entry_id,
318                    senders: std::mem::take(&mut self.senders),
319                    _timer: timer,
320                    edit,
321                    memtables_to_remove,
322                    is_staging: self.is_staging,
323                    flush_reason: self.reason,
324                };
325                WorkerRequest::Background {
326                    region_id: self.region_id,
327                    notify: BackgroundNotify::FlushFinished(flush_finished),
328                }
329            }
330            Err(e) => {
331                error!(e; "Failed to flush region {}", self.region_id);
332                // Discard the timer.
333                timer.stop_and_discard();
334
335                let err = Arc::new(e);
336                self.on_failure(err.clone());
337                WorkerRequest::Background {
338                    region_id: self.region_id,
339                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
340                }
341            }
342        };
343        self.send_worker_request(worker_request).await;
344    }
345
346    /// Flushes memtables to level 0 SSTs and updates the manifest.
347    /// Returns the [RegionEdit] to apply.
348    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
349        // We must use the immutable memtables list and entry ids from the `version_data`
350        // for consistency as others might already modify the version in the `version_control`.
351        let version = &version_data.version;
352        let timer = FLUSH_ELAPSED
353            .with_label_values(&["flush_memtables"])
354            .start_timer();
355
356        let mut write_opts = WriteOptions {
357            write_buffer_size: self.engine_config.sst_write_buffer_size,
358            ..Default::default()
359        };
360        if let Some(row_group_size) = self.row_group_size {
361            write_opts.row_group_size = row_group_size;
362        }
363
364        let DoFlushMemtablesResult {
365            file_metas,
366            flushed_bytes,
367            series_count,
368            encoded_part_count,
369            flush_metrics,
370        } = self.do_flush_memtables(version, write_opts).await?;
371
372        if !file_metas.is_empty() {
373            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
374        }
375
376        let mut file_ids = Vec::with_capacity(file_metas.len());
377        let mut total_rows = 0;
378        let mut total_bytes = 0;
379        for meta in &file_metas {
380            file_ids.push(meta.file_id);
381            total_rows += meta.num_rows;
382            total_bytes += meta.file_size;
383        }
384        info!(
385            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
386            self.region_id,
387            self.reason.as_str(),
388            file_ids,
389            series_count,
390            total_rows,
391            total_bytes,
392            timer.stop_and_record(),
393            encoded_part_count,
394            flush_metrics,
395        );
396        flush_metrics.observe();
397
398        let edit = RegionEdit {
399            files_to_add: file_metas,
400            files_to_remove: Vec::new(),
401            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
402            compaction_time_window: None,
403            // The last entry has been flushed.
404            flushed_entry_id: Some(version_data.last_entry_id),
405            flushed_sequence: Some(version_data.committed_sequence),
406            committed_sequence: None,
407        };
408        info!(
409            "Applying {edit:?} to region {}, is_staging: {}",
410            self.region_id, self.is_staging
411        );
412
413        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
414
415        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
416            RegionLeaderState::Downgrading
417        } else {
418            // Check if region is in staging mode
419            let current_state = self.manifest_ctx.current_state();
420            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
421                RegionLeaderState::Staging
422            } else {
423                RegionLeaderState::Writable
424            }
425        };
426        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
427        // add a cleanup job to remove them later.
428        let version = self
429            .manifest_ctx
430            .update_manifest(expected_state, action_list, self.is_staging)
431            .await?;
432        info!(
433            "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
434            self.region_id,
435            self.is_staging,
436            self.reason.as_str()
437        );
438
439        Ok(edit)
440    }
441
442    async fn do_flush_memtables(
443        &self,
444        version: &VersionRef,
445        write_opts: WriteOptions,
446    ) -> Result<DoFlushMemtablesResult> {
447        let memtables = version.memtables.immutables();
448        let mut file_metas = Vec::with_capacity(memtables.len());
449        let mut flushed_bytes = 0;
450        let mut series_count = 0;
451        let mut encoded_part_count = 0;
452        let mut flush_metrics = Metrics::new(WriteType::Flush);
453        let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
454        for mem in memtables {
455            if mem.is_empty() {
456                // Skip empty memtables.
457                continue;
458            }
459
460            // Compact the memtable first, this waits the background compaction to finish.
461            let compact_start = std::time::Instant::now();
462            if let Err(e) = mem.compact(true) {
463                common_telemetry::error!(e; "Failed to compact memtable before flush");
464            }
465            let compact_cost = compact_start.elapsed();
466            flush_metrics.compact_memtable += compact_cost;
467
468            // Sets `for_flush` flag to true.
469            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
470            let num_mem_ranges = mem_ranges.ranges.len();
471
472            // Aggregate stats from all ranges
473            let num_mem_rows = mem_ranges.num_rows();
474            let memtable_series_count = mem_ranges.series_count();
475            let memtable_id = mem.id();
476            // Increases series count for each mem range. We consider each mem range has different series so
477            // the counter may have more series than the actual series count.
478            series_count += memtable_series_count;
479
480            let flush_start = Instant::now();
481            let FlushFlatMemResult {
482                num_encoded,
483                num_sources,
484                results,
485            } = self
486                .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
487                .await?;
488            encoded_part_count += num_encoded;
489            for (source_idx, result) in results.into_iter().enumerate() {
490                let (max_sequence, ssts_written, metrics) = result?;
491                if ssts_written.is_empty() {
492                    // No data written.
493                    continue;
494                }
495
496                common_telemetry::debug!(
497                    "Region {} flush one memtable {} {}/{}, metrics: {:?}",
498                    self.region_id,
499                    memtable_id,
500                    source_idx,
501                    num_sources,
502                    metrics
503                );
504
505                flush_metrics = flush_metrics.merge(metrics);
506
507                file_metas.extend(ssts_written.into_iter().map(|sst_info| {
508                    flushed_bytes += sst_info.file_size;
509                    Self::new_file_meta(
510                        self.region_id,
511                        max_sequence,
512                        sst_info,
513                        partition_expr.clone(),
514                    )
515                }));
516            }
517
518            common_telemetry::debug!(
519                "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
520                self.region_id,
521                num_sources,
522                memtable_id,
523                num_mem_ranges,
524                num_encoded,
525                num_mem_rows,
526                flush_start.elapsed(),
527                compact_cost,
528            );
529        }
530
531        Ok(DoFlushMemtablesResult {
532            file_metas,
533            flushed_bytes,
534            series_count,
535            encoded_part_count,
536            flush_metrics,
537        })
538    }
539
540    async fn flush_flat_mem_ranges(
541        &self,
542        version: &VersionRef,
543        write_opts: &WriteOptions,
544        mem_ranges: MemtableRanges,
545    ) -> Result<FlushFlatMemResult> {
546        let batch_schema = to_flat_sst_arrow_schema(
547            &version.metadata,
548            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
549        );
550        let field_column_start =
551            flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
552        let flat_sources = memtable_flat_sources(
553            batch_schema,
554            mem_ranges,
555            &version.options,
556            field_column_start,
557        )?;
558        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
559        let num_encoded = flat_sources.encoded.len();
560        for (source, max_sequence) in flat_sources.sources {
561            let write_request = self.new_write_request(version, max_sequence, source);
562            let access_layer = self.access_layer.clone();
563            let write_opts = write_opts.clone();
564            let semaphore = self.flush_semaphore.clone();
565            let task = common_runtime::spawn_global(async move {
566                let _permit = semaphore.acquire().await.unwrap();
567                let mut metrics = Metrics::new(WriteType::Flush);
568                let ssts = access_layer
569                    .write_sst(write_request, &write_opts, &mut metrics)
570                    .await?;
571                FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
572                Ok((max_sequence, ssts, metrics))
573            });
574            tasks.push(task);
575        }
576        for (encoded, max_sequence) in flat_sources.encoded {
577            let access_layer = self.access_layer.clone();
578            let cache_manager = self.cache_manager.clone();
579            let region_id = version.metadata.region_id;
580            let semaphore = self.flush_semaphore.clone();
581            let task = common_runtime::spawn_global(async move {
582                let _permit = semaphore.acquire().await.unwrap();
583                let metrics = access_layer
584                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
585                    .await?;
586                FLUSH_FILE_TOTAL.inc();
587                Ok((max_sequence, smallvec![encoded.sst_info], metrics))
588            });
589            tasks.push(task);
590        }
591        let num_sources = tasks.len();
592        let results = futures::future::try_join_all(tasks)
593            .await
594            .context(JoinSnafu)?;
595        Ok(FlushFlatMemResult {
596            num_encoded,
597            num_sources,
598            results,
599        })
600    }
601
602    fn new_file_meta(
603        region_id: RegionId,
604        max_sequence: u64,
605        sst_info: SstInfo,
606        partition_expr: Option<PartitionExpr>,
607    ) -> FileMeta {
608        FileMeta {
609            region_id,
610            file_id: sst_info.file_id,
611            time_range: sst_info.time_range,
612            level: 0,
613            file_size: sst_info.file_size,
614            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
615            available_indexes: sst_info.index_metadata.build_available_indexes(),
616            indexes: sst_info.index_metadata.build_indexes(),
617            index_file_size: sst_info.index_metadata.file_size,
618            index_version: 0,
619            num_rows: sst_info.num_rows as u64,
620            num_row_groups: sst_info.num_row_groups,
621            sequence: NonZeroU64::new(max_sequence),
622            partition_expr,
623            num_series: sst_info.num_series,
624        }
625    }
626
627    fn new_write_request(
628        &self,
629        version: &VersionRef,
630        max_sequence: u64,
631        source: FlatSource,
632    ) -> SstWriteRequest {
633        let flat_format = version
634            .options
635            .sst_format
636            .map(|f| f == FormatType::Flat)
637            .unwrap_or(self.engine_config.default_experimental_flat_format);
638        SstWriteRequest {
639            op_type: OperationType::Flush,
640            metadata: version.metadata.clone(),
641            source,
642            cache_manager: self.cache_manager.clone(),
643            storage: version.options.storage.clone(),
644            max_sequence: Some(max_sequence),
645            sst_write_format: if flat_format {
646                FormatType::Flat
647            } else {
648                FormatType::PrimaryKey
649            },
650            index_options: self.index_options.clone(),
651            index_config: self.engine_config.index.clone(),
652            inverted_index_config: self.engine_config.inverted_index.clone(),
653            fulltext_index_config: self.engine_config.fulltext_index.clone(),
654            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
655            #[cfg(feature = "vector_index")]
656            vector_index_config: self.engine_config.vector_index.clone(),
657        }
658    }
659
660    /// Notify flush job status.
661    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
662        if let Err(e) = self
663            .request_sender
664            .send(WorkerRequestWithTime::new(request))
665            .await
666        {
667            error!(
668                "Failed to notify flush job status for region {}, request: {:?}",
669                self.region_id, e.0
670            );
671        }
672    }
673
674    /// Merge two flush tasks.
675    fn merge(&mut self, mut other: RegionFlushTask) {
676        assert_eq!(self.region_id, other.region_id);
677        // Now we only merge senders. They share the same flush reason.
678        self.senders.append(&mut other.senders);
679    }
680}
681
682struct FlushFlatMemResult {
683    num_encoded: usize,
684    num_sources: usize,
685    results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
686}
687
688struct DoFlushMemtablesResult {
689    file_metas: Vec<FileMeta>,
690    flushed_bytes: u64,
691    series_count: usize,
692    encoded_part_count: usize,
693    flush_metrics: Metrics,
694}
695
696struct FlatSources {
697    sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
698    encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
699}
700
701/// Returns the max sequence and [FlatSource] for the given memtable.
702fn memtable_flat_sources(
703    schema: SchemaRef,
704    mem_ranges: MemtableRanges,
705    options: &RegionOptions,
706    field_column_start: usize,
707) -> Result<FlatSources> {
708    let MemtableRanges { ranges } = mem_ranges;
709    let mut flat_sources = FlatSources {
710        sources: SmallVec::new(),
711        encoded: SmallVec::new(),
712    };
713
714    if ranges.len() == 1 {
715        debug!("Flushing single flat range");
716
717        let only_range = ranges.into_values().next().unwrap();
718        let max_sequence = only_range.stats().max_sequence();
719        if let Some(encoded) = only_range.encoded() {
720            flat_sources.encoded.push((encoded, max_sequence));
721        } else {
722            let iter = only_range.build_record_batch_iter(None, None)?;
723            // Dedup according to append mode and merge mode.
724            // Even single range may have duplicate rows.
725            let iter = maybe_dedup_one(
726                options.append_mode,
727                options.merge_mode(),
728                field_column_start,
729                iter,
730            );
731            flat_sources
732                .sources
733                .push((FlatSource::Iter(iter), max_sequence));
734        };
735    } else {
736        let min_flush_rows = *ENCODE_ROW_THRESHOLD;
737        // Calculate total rows from non-encoded ranges.
738        let total_rows: usize = ranges
739            .values()
740            .filter(|r| r.encoded().is_none())
741            .map(|r| r.num_rows())
742            .sum();
743        debug!(
744            "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
745            total_rows,
746            min_flush_rows,
747            ranges.len()
748        );
749        let mut rows_remaining = total_rows;
750        let mut last_iter_rows = 0;
751        let num_ranges = ranges.len();
752        let mut input_iters = Vec::with_capacity(num_ranges);
753        let mut current_ranges = Vec::new();
754        for (_range_id, range) in ranges {
755            if let Some(encoded) = range.encoded() {
756                let max_sequence = range.stats().max_sequence();
757                flat_sources.encoded.push((encoded, max_sequence));
758                continue;
759            }
760
761            let iter = range.build_record_batch_iter(None, None)?;
762            input_iters.push(iter);
763            let range_rows = range.num_rows();
764            last_iter_rows += range_rows;
765            rows_remaining -= range_rows;
766            current_ranges.push(range);
767
768            // Flush if we have enough rows, but don't flush if the remaining rows
769            // would be less than DEFAULT_ROW_GROUP_SIZE (to avoid small last files).
770            if last_iter_rows >= min_flush_rows
771                && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
772            {
773                debug!(
774                    "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
775                    last_iter_rows,
776                    min_flush_rows,
777                    input_iters.len(),
778                    rows_remaining
779                );
780
781                // Calculate max_sequence from all merged ranges
782                let max_sequence = current_ranges
783                    .iter()
784                    .map(|r| r.stats().max_sequence())
785                    .max()
786                    .unwrap_or(0);
787
788                let maybe_dedup = merge_and_dedup(
789                    &schema,
790                    options.append_mode,
791                    options.merge_mode(),
792                    field_column_start,
793                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
794                )?;
795
796                flat_sources
797                    .sources
798                    .push((FlatSource::Iter(maybe_dedup), max_sequence));
799                last_iter_rows = 0;
800                current_ranges.clear();
801            }
802        }
803
804        // Handle remaining iters.
805        if !input_iters.is_empty() {
806            debug!(
807                "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
808                last_iter_rows,
809                min_flush_rows,
810                input_iters.len(),
811                rows_remaining
812            );
813            let max_sequence = current_ranges
814                .iter()
815                .map(|r| r.stats().max_sequence())
816                .max()
817                .unwrap_or(0);
818
819            let maybe_dedup = merge_and_dedup(
820                &schema,
821                options.append_mode,
822                options.merge_mode(),
823                field_column_start,
824                input_iters,
825            )?;
826
827            flat_sources
828                .sources
829                .push((FlatSource::Iter(maybe_dedup), max_sequence));
830        }
831    }
832
833    Ok(flat_sources)
834}
835
836/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
837///
838/// This function is used during the flush process to combine data from multiple memtable ranges
839/// into a single stream while handling duplicate records according to the configured merge strategy.
840///
841/// # Arguments
842///
843/// * `schema` - The Arrow schema reference that defines the structure of the record batches
844/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
845///                  This is used for append-only workloads where duplicate handling is not required.
846/// * `merge_mode` - The strategy used for deduplication when not in append mode:
847///   - `MergeMode::LastRow`: Keeps the last record for each primary key
848///   - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
849/// * `field_column_start` - The starting column index for fields in the record batch.
850///                          Used when `MergeMode::LastNonNull` to identify which columns
851///                          contain field values versus primary key columns.
852/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
853///
854/// # Returns
855///
856/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
857/// record batches.
858///
859/// # Behavior
860///
861/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
862///    primary key and timestamp
863/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
864/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
865///    applies the specified merge mode:
866///    - `LastRow`: Removes duplicate rows, keeping only the last one
867///    - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
868///
869/// # Examples
870///
871/// ```ignore
872/// let merged_iter = merge_and_dedup(
873///     &schema,
874///     false,  // not append mode, apply dedup
875///     MergeMode::LastRow,
876///     2,  // fields start at column 2 after primary key columns
877///     vec![iter1, iter2, iter3],
878/// )?;
879/// ```
880pub fn merge_and_dedup(
881    schema: &SchemaRef,
882    append_mode: bool,
883    merge_mode: MergeMode,
884    field_column_start: usize,
885    input_iters: Vec<BoxedRecordBatchIterator>,
886) -> Result<BoxedRecordBatchIterator> {
887    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
888    let maybe_dedup = if append_mode {
889        // No dedup in append mode
890        Box::new(merge_iter) as _
891    } else {
892        // Dedup according to merge mode.
893        match merge_mode {
894            MergeMode::LastRow => {
895                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
896            }
897            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
898                merge_iter,
899                FlatLastNonNull::new(field_column_start, false),
900            )) as _,
901        }
902    };
903    Ok(maybe_dedup)
904}
905
906pub fn maybe_dedup_one(
907    append_mode: bool,
908    merge_mode: MergeMode,
909    field_column_start: usize,
910    input_iter: BoxedRecordBatchIterator,
911) -> BoxedRecordBatchIterator {
912    if append_mode {
913        // No dedup in append mode
914        input_iter
915    } else {
916        // Dedup according to merge mode.
917        match merge_mode {
918            MergeMode::LastRow => {
919                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
920            }
921            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
922                input_iter,
923                FlatLastNonNull::new(field_column_start, false),
924            )),
925        }
926    }
927}
928
929/// Manages background flushes of a worker.
930pub(crate) struct FlushScheduler {
931    /// Tracks regions need to flush.
932    region_status: HashMap<RegionId, FlushStatus>,
933    /// Background job scheduler.
934    scheduler: SchedulerRef,
935}
936
937impl FlushScheduler {
938    /// Creates a new flush scheduler.
939    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
940        FlushScheduler {
941            region_status: HashMap::new(),
942            scheduler,
943        }
944    }
945
946    /// Returns true if the region already requested flush.
947    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
948        self.region_status.contains_key(&region_id)
949    }
950
951    fn schedule_flush_task(
952        &mut self,
953        version_control: &VersionControlRef,
954        task: RegionFlushTask,
955    ) -> Result<()> {
956        let region_id = task.region_id;
957
958        // If current region doesn't have flush status, we can flush the region directly.
959        if let Err(e) = version_control.freeze_mutable() {
960            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
961
962            return Err(e);
963        }
964        // Submit a flush job.
965        let job = task.into_flush_job(version_control);
966        if let Err(e) = self.scheduler.schedule(job) {
967            // If scheduler returns error, senders in the job will be dropped and waiters
968            // can get recv errors.
969            error!(e; "Failed to schedule flush job for region {}", region_id);
970
971            return Err(e);
972        }
973        Ok(())
974    }
975
976    /// Schedules a flush `task` for specific `region`.
977    pub(crate) fn schedule_flush(
978        &mut self,
979        region_id: RegionId,
980        version_control: &VersionControlRef,
981        task: RegionFlushTask,
982    ) -> Result<()> {
983        debug_assert_eq!(region_id, task.region_id);
984
985        let version = version_control.current().version;
986        if version.memtables.is_empty() {
987            debug_assert!(!self.region_status.contains_key(&region_id));
988            // The region has nothing to flush.
989            task.on_success();
990            return Ok(());
991        }
992
993        // Don't increase the counter if a region has nothing to flush.
994        FLUSH_REQUESTS_TOTAL
995            .with_label_values(&[task.reason.as_str()])
996            .inc();
997
998        // If current region has flush status, merge the task.
999        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
1000            // Checks whether we can flush the region now.
1001            debug!("Merging flush task for region {}", region_id);
1002            flush_status.merge_task(task);
1003            return Ok(());
1004        }
1005
1006        self.schedule_flush_task(version_control, task)?;
1007
1008        // Add this region to status map.
1009        let _ = self.region_status.insert(
1010            region_id,
1011            FlushStatus::new(region_id, version_control.clone()),
1012        );
1013
1014        Ok(())
1015    }
1016
1017    /// Notifies the scheduler that the flush job is finished.
1018    ///
1019    /// Returns all pending requests if the region doesn't need to flush again.
1020    pub(crate) fn on_flush_success(
1021        &mut self,
1022        region_id: RegionId,
1023    ) -> Option<(
1024        Vec<SenderDdlRequest>,
1025        Vec<SenderWriteRequest>,
1026        Vec<SenderBulkRequest>,
1027    )> {
1028        let flush_status = self.region_status.get_mut(&region_id)?;
1029        // If region doesn't have any pending flush task, we need to remove it from the status.
1030        if flush_status.pending_task.is_none() {
1031            // The region doesn't have any pending flush task.
1032            // Safety: The flush status must exist.
1033            debug!(
1034                "Region {} doesn't have any pending flush task, removing it from the status",
1035                region_id
1036            );
1037            let flush_status = self.region_status.remove(&region_id).unwrap();
1038            return Some((
1039                flush_status.pending_ddls,
1040                flush_status.pending_writes,
1041                flush_status.pending_bulk_writes,
1042            ));
1043        }
1044
1045        // If region has pending task, but has nothing to flush, we need to remove it from the status.
1046        let version_data = flush_status.version_control.current();
1047        if version_data.version.memtables.is_empty() {
1048            // The region has nothing to flush, we also need to remove it from the status.
1049            // Safety: The pending task is not None.
1050            let task = flush_status.pending_task.take().unwrap();
1051            // The region has nothing to flush. We can notify pending task.
1052            task.on_success();
1053            debug!(
1054                "Region {} has nothing to flush, removing it from the status",
1055                region_id
1056            );
1057            // Safety: The flush status must exist.
1058            let flush_status = self.region_status.remove(&region_id).unwrap();
1059            return Some((
1060                flush_status.pending_ddls,
1061                flush_status.pending_writes,
1062                flush_status.pending_bulk_writes,
1063            ));
1064        }
1065
1066        // If region has pending task and has something to flush, we need to schedule it.
1067        debug!("Scheduling pending flush task for region {}", region_id);
1068        // Safety: The flush status must exist.
1069        let task = flush_status.pending_task.take().unwrap();
1070        let version_control = flush_status.version_control.clone();
1071        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1072            error!(
1073                err;
1074                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1075            );
1076        }
1077        // We can flush the region again, keep it in the region status.
1078        None
1079    }
1080
1081    /// Notifies the scheduler that the flush job is failed.
1082    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1083        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1084
1085        FLUSH_FAILURE_TOTAL.inc();
1086
1087        // Remove this region.
1088        let Some(flush_status) = self.region_status.remove(&region_id) else {
1089            return;
1090        };
1091
1092        // Fast fail: cancels all pending tasks and sends error to their waiters.
1093        flush_status.on_failure(err);
1094    }
1095
1096    /// Notifies the scheduler that the region is dropped.
1097    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1098        self.remove_region_on_failure(
1099            region_id,
1100            Arc::new(RegionDroppedSnafu { region_id }.build()),
1101        );
1102    }
1103
1104    /// Notifies the scheduler that the region is closed.
1105    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1106        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1107    }
1108
1109    /// Notifies the scheduler that the region is truncated.
1110    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1111        self.remove_region_on_failure(
1112            region_id,
1113            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1114        );
1115    }
1116
1117    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1118        // Remove this region.
1119        let Some(flush_status) = self.region_status.remove(&region_id) else {
1120            return;
1121        };
1122
1123        // Notifies all pending tasks.
1124        flush_status.on_failure(err);
1125    }
1126
1127    /// Add ddl request to pending queue.
1128    ///
1129    /// # Panics
1130    /// Panics if region didn't request flush.
1131    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1132        let status = self.region_status.get_mut(&request.region_id).unwrap();
1133        status.pending_ddls.push(request);
1134    }
1135
1136    /// Add write request to pending queue.
1137    ///
1138    /// # Panics
1139    /// Panics if region didn't request flush.
1140    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1141        let status = self
1142            .region_status
1143            .get_mut(&request.request.region_id)
1144            .unwrap();
1145        status.pending_writes.push(request);
1146    }
1147
1148    /// Add bulk write request to pending queue.
1149    ///
1150    /// # Panics
1151    /// Panics if region didn't request flush.
1152    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1153        let status = self.region_status.get_mut(&request.region_id).unwrap();
1154        status.pending_bulk_writes.push(request);
1155    }
1156
1157    /// Returns true if the region has pending DDLs.
1158    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1159        self.region_status
1160            .get(&region_id)
1161            .map(|status| !status.pending_ddls.is_empty())
1162            .unwrap_or(false)
1163    }
1164}
1165
1166impl Drop for FlushScheduler {
1167    fn drop(&mut self) {
1168        for (region_id, flush_status) in self.region_status.drain() {
1169            // We are shutting down so notify all pending tasks.
1170            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1171        }
1172    }
1173}
1174
1175/// Flush status of a region scheduled by the [FlushScheduler].
1176///
1177/// Tracks running and pending flush tasks and all pending requests of a region.
1178struct FlushStatus {
1179    /// Current region.
1180    region_id: RegionId,
1181    /// Version control of the region.
1182    version_control: VersionControlRef,
1183    /// Task waiting for next flush.
1184    pending_task: Option<RegionFlushTask>,
1185    /// Pending ddl requests.
1186    pending_ddls: Vec<SenderDdlRequest>,
1187    /// Requests waiting to write after altering the region.
1188    pending_writes: Vec<SenderWriteRequest>,
1189    /// Bulk requests waiting to write after altering the region.
1190    pending_bulk_writes: Vec<SenderBulkRequest>,
1191}
1192
1193impl FlushStatus {
1194    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1195        FlushStatus {
1196            region_id,
1197            version_control,
1198            pending_task: None,
1199            pending_ddls: Vec::new(),
1200            pending_writes: Vec::new(),
1201            pending_bulk_writes: Vec::new(),
1202        }
1203    }
1204
1205    /// Merges the task to pending task.
1206    fn merge_task(&mut self, task: RegionFlushTask) {
1207        if let Some(pending) = &mut self.pending_task {
1208            pending.merge(task);
1209        } else {
1210            self.pending_task = Some(task);
1211        }
1212    }
1213
1214    fn on_failure(self, err: Arc<Error>) {
1215        if let Some(mut task) = self.pending_task {
1216            task.on_failure(err.clone());
1217        }
1218        for ddl in self.pending_ddls {
1219            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1220                region_id: self.region_id,
1221            }));
1222        }
1223        for write_req in self.pending_writes {
1224            write_req
1225                .sender
1226                .send(Err(err.clone()).context(FlushRegionSnafu {
1227                    region_id: self.region_id,
1228                }));
1229        }
1230    }
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235    use mito_codec::row_converter::build_primary_key_codec;
1236    use tokio::sync::oneshot;
1237
1238    use super::*;
1239    use crate::cache::CacheManager;
1240    use crate::memtable::bulk::part::BulkPartConverter;
1241    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1242    use crate::memtable::{Memtable, RangesOptions};
1243    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1244    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1245    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1246    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1247
1248    #[test]
1249    fn test_get_mutable_limit() {
1250        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1251        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1252        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1253        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1254    }
1255
1256    #[test]
1257    fn test_over_mutable_limit() {
1258        // Mutable limit is 500.
1259        let manager = WriteBufferManagerImpl::new(1000);
1260        manager.reserve_mem(400);
1261        assert!(!manager.should_flush_engine());
1262        assert!(!manager.should_stall());
1263
1264        // More than mutable limit.
1265        manager.reserve_mem(400);
1266        assert!(manager.should_flush_engine());
1267
1268        // Freezes mutable.
1269        manager.schedule_free_mem(400);
1270        assert!(!manager.should_flush_engine());
1271        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1272        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1273
1274        // Releases immutable.
1275        manager.free_mem(400);
1276        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1277        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1278    }
1279
1280    #[test]
1281    fn test_over_global() {
1282        // Mutable limit is 500.
1283        let manager = WriteBufferManagerImpl::new(1000);
1284        manager.reserve_mem(1100);
1285        assert!(manager.should_stall());
1286        // Global usage is still 1100.
1287        manager.schedule_free_mem(200);
1288        assert!(manager.should_flush_engine());
1289        assert!(manager.should_stall());
1290
1291        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1292        manager.schedule_free_mem(450);
1293        assert!(manager.should_flush_engine());
1294        assert!(manager.should_stall());
1295
1296        // Now mutable is enough.
1297        manager.reserve_mem(50);
1298        assert!(manager.should_flush_engine());
1299        manager.reserve_mem(100);
1300        assert!(manager.should_flush_engine());
1301    }
1302
1303    #[test]
1304    fn test_manager_notify() {
1305        let (sender, receiver) = watch::channel(());
1306        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1307        manager.reserve_mem(500);
1308        assert!(!receiver.has_changed().unwrap());
1309        manager.schedule_free_mem(500);
1310        assert!(!receiver.has_changed().unwrap());
1311        manager.free_mem(500);
1312        assert!(receiver.has_changed().unwrap());
1313    }
1314
1315    #[tokio::test]
1316    async fn test_schedule_empty() {
1317        let env = SchedulerEnv::new().await;
1318        let (tx, _rx) = mpsc::channel(4);
1319        let mut scheduler = env.mock_flush_scheduler();
1320        let builder = VersionControlBuilder::new();
1321
1322        let version_control = Arc::new(builder.build());
1323        let (output_tx, output_rx) = oneshot::channel();
1324        let mut task = RegionFlushTask {
1325            region_id: builder.region_id(),
1326            reason: FlushReason::Others,
1327            senders: Vec::new(),
1328            request_sender: tx,
1329            access_layer: env.access_layer.clone(),
1330            listener: WorkerListener::default(),
1331            engine_config: Arc::new(MitoConfig::default()),
1332            row_group_size: None,
1333            cache_manager: Arc::new(CacheManager::default()),
1334            manifest_ctx: env
1335                .mock_manifest_context(version_control.current().version.metadata.clone())
1336                .await,
1337            index_options: IndexOptions::default(),
1338            flush_semaphore: Arc::new(Semaphore::new(2)),
1339            is_staging: false,
1340            partition_expr: None,
1341        };
1342        task.push_sender(OptionOutputTx::from(output_tx));
1343        scheduler
1344            .schedule_flush(builder.region_id(), &version_control, task)
1345            .unwrap();
1346        assert!(scheduler.region_status.is_empty());
1347        let output = output_rx.await.unwrap().unwrap();
1348        assert_eq!(output, 0);
1349        assert!(scheduler.region_status.is_empty());
1350    }
1351
1352    #[tokio::test]
1353    async fn test_schedule_pending_request() {
1354        let job_scheduler = Arc::new(VecScheduler::default());
1355        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1356        let (tx, _rx) = mpsc::channel(4);
1357        let mut scheduler = env.mock_flush_scheduler();
1358        let mut builder = VersionControlBuilder::new();
1359        // Overwrites the empty memtable builder.
1360        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1361        let version_control = Arc::new(builder.build());
1362        // Writes data to the memtable so it is not empty.
1363        let version_data = version_control.current();
1364        write_rows_to_version(&version_data.version, "host0", 0, 10);
1365        let manifest_ctx = env
1366            .mock_manifest_context(version_data.version.metadata.clone())
1367            .await;
1368        // Creates 3 tasks.
1369        let mut tasks: Vec<_> = (0..3)
1370            .map(|_| RegionFlushTask {
1371                region_id: builder.region_id(),
1372                reason: FlushReason::Others,
1373                senders: Vec::new(),
1374                request_sender: tx.clone(),
1375                access_layer: env.access_layer.clone(),
1376                listener: WorkerListener::default(),
1377                engine_config: Arc::new(MitoConfig::default()),
1378                row_group_size: None,
1379                cache_manager: Arc::new(CacheManager::default()),
1380                manifest_ctx: manifest_ctx.clone(),
1381                index_options: IndexOptions::default(),
1382                flush_semaphore: Arc::new(Semaphore::new(2)),
1383                is_staging: false,
1384                partition_expr: None,
1385            })
1386            .collect();
1387        // Schedule first task.
1388        let task = tasks.pop().unwrap();
1389        scheduler
1390            .schedule_flush(builder.region_id(), &version_control, task)
1391            .unwrap();
1392        // Should schedule 1 flush.
1393        assert_eq!(1, scheduler.region_status.len());
1394        assert_eq!(1, job_scheduler.num_jobs());
1395        // Check the new version.
1396        let version_data = version_control.current();
1397        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1398        // Schedule remaining tasks.
1399        let output_rxs: Vec<_> = tasks
1400            .into_iter()
1401            .map(|mut task| {
1402                let (output_tx, output_rx) = oneshot::channel();
1403                task.push_sender(OptionOutputTx::from(output_tx));
1404                scheduler
1405                    .schedule_flush(builder.region_id(), &version_control, task)
1406                    .unwrap();
1407                output_rx
1408            })
1409            .collect();
1410        // Assumes the flush job is finished.
1411        version_control.apply_edit(
1412            Some(RegionEdit {
1413                files_to_add: Vec::new(),
1414                files_to_remove: Vec::new(),
1415                timestamp_ms: None,
1416                compaction_time_window: None,
1417                flushed_entry_id: None,
1418                flushed_sequence: None,
1419                committed_sequence: None,
1420            }),
1421            &[0],
1422            builder.file_purger(),
1423        );
1424        scheduler.on_flush_success(builder.region_id());
1425        // No new flush task.
1426        assert_eq!(1, job_scheduler.num_jobs());
1427        // The flush status is cleared.
1428        assert!(scheduler.region_status.is_empty());
1429        for output_rx in output_rxs {
1430            let output = output_rx.await.unwrap().unwrap();
1431            assert_eq!(output, 0);
1432        }
1433    }
1434
1435    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1436    #[test]
1437    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1438        // Build test metadata and flat schema
1439        let metadata = metadata_for_test();
1440        let schema = to_flat_sst_arrow_schema(
1441            &metadata,
1442            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1443        );
1444
1445        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1446        // Two rows with identical keys and timestamps (ts = 1000), different field values
1447        let capacity = 16;
1448        let pk_codec = build_primary_key_codec(&metadata);
1449        let mut converter =
1450            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1451        let kvs = build_key_values_with_ts_seq_values(
1452            &metadata,
1453            "dup_key".to_string(),
1454            1,
1455            vec![1000i64, 1000i64].into_iter(),
1456            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1457            1,
1458        );
1459        converter.append_key_values(&kvs).unwrap();
1460        let part = converter.convert().unwrap();
1461
1462        // Helper to build MemtableRanges with a single range from one bulk part.
1463        // We use BulkMemtable directly because it produces record batch iterators.
1464        let build_ranges = |append_mode: bool| -> MemtableRanges {
1465            let memtable = crate::memtable::bulk::BulkMemtable::new(
1466                1,
1467                crate::memtable::bulk::BulkMemtableConfig::default(),
1468                metadata.clone(),
1469                None,
1470                None,
1471                append_mode,
1472                MergeMode::LastRow,
1473            );
1474            memtable.write_bulk(part.clone()).unwrap();
1475            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1476        };
1477
1478        // Case 1: append_mode = false => dedup happens, total rows should be 1
1479        {
1480            let mem_ranges = build_ranges(false);
1481            assert_eq!(1, mem_ranges.ranges.len());
1482
1483            let options = RegionOptions {
1484                append_mode: false,
1485                merge_mode: Some(MergeMode::LastRow),
1486                ..Default::default()
1487            };
1488
1489            let flat_sources = memtable_flat_sources(
1490                schema.clone(),
1491                mem_ranges,
1492                &options,
1493                metadata.primary_key.len(),
1494            )
1495            .unwrap();
1496            assert!(flat_sources.encoded.is_empty());
1497            assert_eq!(1, flat_sources.sources.len());
1498
1499            // Consume the iterator and count rows
1500            let mut total_rows = 0usize;
1501            for (source, _sequence) in flat_sources.sources {
1502                match source {
1503                    crate::read::FlatSource::Iter(iter) => {
1504                        for rb in iter {
1505                            total_rows += rb.unwrap().num_rows();
1506                        }
1507                    }
1508                    crate::read::FlatSource::Stream(_) => unreachable!(),
1509                }
1510            }
1511            assert_eq!(1, total_rows, "dedup should keep a single row");
1512        }
1513
1514        // Case 2: append_mode = true => no dedup, total rows should be 2
1515        {
1516            let mem_ranges = build_ranges(true);
1517            assert_eq!(1, mem_ranges.ranges.len());
1518
1519            let options = RegionOptions {
1520                append_mode: true,
1521                ..Default::default()
1522            };
1523
1524            let flat_sources =
1525                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1526                    .unwrap();
1527            assert!(flat_sources.encoded.is_empty());
1528            assert_eq!(1, flat_sources.sources.len());
1529
1530            let mut total_rows = 0usize;
1531            for (source, _sequence) in flat_sources.sources {
1532                match source {
1533                    crate::read::FlatSource::Iter(iter) => {
1534                        for rb in iter {
1535                            total_rows += rb.unwrap().num_rows();
1536                        }
1537                    }
1538                    crate::read::FlatSource::Stream(_) => unreachable!(),
1539                }
1540            }
1541            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1542        }
1543    }
1544
1545    #[tokio::test]
1546    async fn test_schedule_pending_request_on_flush_success() {
1547        common_telemetry::init_default_ut_logging();
1548        let job_scheduler = Arc::new(VecScheduler::default());
1549        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1550        let (tx, _rx) = mpsc::channel(4);
1551        let mut scheduler = env.mock_flush_scheduler();
1552        let mut builder = VersionControlBuilder::new();
1553        // Overwrites the empty memtable builder.
1554        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1555        let version_control = Arc::new(builder.build());
1556        // Writes data to the memtable so it is not empty.
1557        let version_data = version_control.current();
1558        write_rows_to_version(&version_data.version, "host0", 0, 10);
1559        let manifest_ctx = env
1560            .mock_manifest_context(version_data.version.metadata.clone())
1561            .await;
1562        // Creates 2 tasks.
1563        let mut tasks: Vec<_> = (0..2)
1564            .map(|_| RegionFlushTask {
1565                region_id: builder.region_id(),
1566                reason: FlushReason::Others,
1567                senders: Vec::new(),
1568                request_sender: tx.clone(),
1569                access_layer: env.access_layer.clone(),
1570                listener: WorkerListener::default(),
1571                engine_config: Arc::new(MitoConfig::default()),
1572                row_group_size: None,
1573                cache_manager: Arc::new(CacheManager::default()),
1574                manifest_ctx: manifest_ctx.clone(),
1575                index_options: IndexOptions::default(),
1576                flush_semaphore: Arc::new(Semaphore::new(2)),
1577                is_staging: false,
1578                partition_expr: None,
1579            })
1580            .collect();
1581        // Schedule first task.
1582        let task = tasks.pop().unwrap();
1583        scheduler
1584            .schedule_flush(builder.region_id(), &version_control, task)
1585            .unwrap();
1586        // Should schedule 1 flush.
1587        assert_eq!(1, scheduler.region_status.len());
1588        assert_eq!(1, job_scheduler.num_jobs());
1589        // Schedule second task.
1590        let task = tasks.pop().unwrap();
1591        scheduler
1592            .schedule_flush(builder.region_id(), &version_control, task)
1593            .unwrap();
1594        assert!(
1595            scheduler
1596                .region_status
1597                .get(&builder.region_id())
1598                .unwrap()
1599                .pending_task
1600                .is_some()
1601        );
1602
1603        // Check the new version.
1604        let version_data = version_control.current();
1605        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1606        // Assumes the flush job is finished.
1607        version_control.apply_edit(
1608            Some(RegionEdit {
1609                files_to_add: Vec::new(),
1610                files_to_remove: Vec::new(),
1611                timestamp_ms: None,
1612                compaction_time_window: None,
1613                flushed_entry_id: None,
1614                flushed_sequence: None,
1615                committed_sequence: None,
1616            }),
1617            &[0],
1618            builder.file_purger(),
1619        );
1620        write_rows_to_version(&version_data.version, "host1", 0, 10);
1621        scheduler.on_flush_success(builder.region_id());
1622        assert_eq!(2, job_scheduler.num_jobs());
1623        // The pending task is cleared.
1624        assert!(
1625            scheduler
1626                .region_status
1627                .get(&builder.region_id())
1628                .unwrap()
1629                .pending_task
1630                .is_none()
1631        );
1632    }
1633}