mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::{RegionId, SequenceNumber};
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
40    RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44    BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48    INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
58use crate::request::{
59    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68/// Global write buffer (memtable) manager.
69///
70/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
71pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72    /// Returns whether to trigger the engine.
73    fn should_flush_engine(&self) -> bool;
74
75    /// Returns whether to stall write requests.
76    fn should_stall(&self) -> bool;
77
78    /// Reserves `mem` bytes.
79    fn reserve_mem(&self, mem: usize);
80
81    /// Tells the manager we are freeing `mem` bytes.
82    ///
83    /// We are in the process of freeing `mem` bytes, so it is not considered
84    /// when checking the soft limit.
85    fn schedule_free_mem(&self, mem: usize);
86
87    /// We have freed `mem` bytes.
88    fn free_mem(&self, mem: usize);
89
90    /// Returns the total memory used by memtables.
91    fn memory_usage(&self) -> usize;
92
93    /// Returns the mutable memtable memory limit.
94    ///
95    /// The write buffer manager should flush memtables when the mutable memory usage
96    /// exceeds this limit.
97    fn flush_limit(&self) -> usize;
98}
99
100pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
101
102/// Default [WriteBufferManager] implementation.
103///
104/// Inspired by RocksDB's WriteBufferManager.
105/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
106#[derive(Debug)]
107pub struct WriteBufferManagerImpl {
108    /// Write buffer size for the engine.
109    global_write_buffer_size: usize,
110    /// Mutable memtable memory size limit.
111    mutable_limit: usize,
112    /// Memory in used (e.g. used by mutable and immutable memtables).
113    memory_used: AtomicUsize,
114    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
115    memory_active: AtomicUsize,
116    /// Optional notifier.
117    /// The manager can wake up the worker once we free the write buffer.
118    notifier: Option<watch::Sender<()>>,
119}
120
121impl WriteBufferManagerImpl {
122    /// Returns a new manager with specific `global_write_buffer_size`.
123    pub fn new(global_write_buffer_size: usize) -> Self {
124        Self {
125            global_write_buffer_size,
126            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
127            memory_used: AtomicUsize::new(0),
128            memory_active: AtomicUsize::new(0),
129            notifier: None,
130        }
131    }
132
133    /// Attaches a notifier to the manager.
134    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
135        self.notifier = Some(notifier);
136        self
137    }
138
139    /// Returns memory usage of mutable memtables.
140    pub fn mutable_usage(&self) -> usize {
141        self.memory_active.load(Ordering::Relaxed)
142    }
143
144    /// Returns the size limit for mutable memtables.
145    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
146        // Reserves half of the write buffer for mutable memtable.
147        global_write_buffer_size / 2
148    }
149}
150
151impl WriteBufferManager for WriteBufferManagerImpl {
152    fn should_flush_engine(&self) -> bool {
153        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
154        if mutable_memtable_memory_usage >= self.mutable_limit {
155            debug!(
156                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
157                mutable_memtable_memory_usage,
158                self.memory_usage(),
159                self.mutable_limit,
160                self.global_write_buffer_size,
161            );
162            return true;
163        }
164
165        let memory_usage = self.memory_used.load(Ordering::Relaxed);
166        if memory_usage >= self.global_write_buffer_size {
167            return true;
168        }
169
170        false
171    }
172
173    fn should_stall(&self) -> bool {
174        self.memory_usage() >= self.global_write_buffer_size
175    }
176
177    fn reserve_mem(&self, mem: usize) {
178        self.memory_used.fetch_add(mem, Ordering::Relaxed);
179        self.memory_active.fetch_add(mem, Ordering::Relaxed);
180    }
181
182    fn schedule_free_mem(&self, mem: usize) {
183        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
184    }
185
186    fn free_mem(&self, mem: usize) {
187        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
188        if let Some(notifier) = &self.notifier {
189            // Notifies the worker after the memory usage is decreased. When we drop the memtable
190            // outside of the worker, the worker may still stall requests because the memory usage
191            // is not updated. So we need to notify the worker to handle stalled requests again.
192            let _ = notifier.send(());
193        }
194    }
195
196    fn memory_usage(&self) -> usize {
197        self.memory_used.load(Ordering::Relaxed)
198    }
199
200    fn flush_limit(&self) -> usize {
201        self.mutable_limit
202    }
203}
204
205/// Reason of a flush task.
206#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
207pub enum FlushReason {
208    /// Other reasons.
209    Others,
210    /// Engine reaches flush threshold.
211    EngineFull,
212    /// Manual flush.
213    Manual,
214    /// Flush to alter table.
215    Alter,
216    /// Flush periodically.
217    Periodically,
218    /// Flush memtable during downgrading state.
219    Downgrading,
220    /// Enter staging mode.
221    EnterStaging,
222}
223
224impl FlushReason {
225    /// Get flush reason as static str.
226    fn as_str(&self) -> &'static str {
227        self.into()
228    }
229}
230
231/// Task to flush a region.
232pub(crate) struct RegionFlushTask {
233    /// Region to flush.
234    pub(crate) region_id: RegionId,
235    /// Reason to flush.
236    pub(crate) reason: FlushReason,
237    /// Flush result senders.
238    pub(crate) senders: Vec<OutputTx>,
239    /// Request sender to notify the worker.
240    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
241
242    pub(crate) access_layer: AccessLayerRef,
243    pub(crate) listener: WorkerListener,
244    pub(crate) engine_config: Arc<MitoConfig>,
245    pub(crate) row_group_size: Option<usize>,
246    pub(crate) cache_manager: CacheManagerRef,
247    pub(crate) manifest_ctx: ManifestContextRef,
248
249    /// Index options for the region.
250    pub(crate) index_options: IndexOptions,
251    /// Semaphore to control flush concurrency.
252    pub(crate) flush_semaphore: Arc<Semaphore>,
253    /// Whether the region is in staging mode.
254    pub(crate) is_staging: bool,
255    /// Partition expression of the region.
256    ///
257    /// This is used to generate the file meta.
258    pub(crate) partition_expr: Option<String>,
259}
260
261impl RegionFlushTask {
262    /// Push the sender if it is not none.
263    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
264        if let Some(sender) = sender.take_inner() {
265            self.senders.push(sender);
266        }
267    }
268
269    /// Consumes the task and notify the sender the job is success.
270    fn on_success(self) {
271        for sender in self.senders {
272            sender.send(Ok(0));
273        }
274    }
275
276    /// Send flush error to waiter.
277    fn on_failure(&mut self, err: Arc<Error>) {
278        for sender in self.senders.drain(..) {
279            sender.send(Err(err.clone()).context(FlushRegionSnafu {
280                region_id: self.region_id,
281            }));
282        }
283    }
284
285    /// Converts the flush task into a background job.
286    ///
287    /// We must call this in the region worker.
288    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
289        // Get a version of this region before creating a job to get current
290        // wal entry id, sequence and immutable memtables.
291        let version_data = version_control.current();
292
293        Box::pin(async move {
294            INFLIGHT_FLUSH_COUNT.inc();
295            self.do_flush(version_data).await;
296            INFLIGHT_FLUSH_COUNT.dec();
297        })
298    }
299
300    /// Runs the flush task.
301    async fn do_flush(&mut self, version_data: VersionControlData) {
302        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
303        self.listener.on_flush_begin(self.region_id).await;
304
305        let worker_request = match self.flush_memtables(&version_data).await {
306            Ok(edit) => {
307                let memtables_to_remove = version_data
308                    .version
309                    .memtables
310                    .immutables()
311                    .iter()
312                    .map(|m| m.id())
313                    .collect();
314                let flush_finished = FlushFinished {
315                    region_id: self.region_id,
316                    // The last entry has been flushed.
317                    flushed_entry_id: version_data.last_entry_id,
318                    senders: std::mem::take(&mut self.senders),
319                    _timer: timer,
320                    edit,
321                    memtables_to_remove,
322                    is_staging: self.is_staging,
323                };
324                WorkerRequest::Background {
325                    region_id: self.region_id,
326                    notify: BackgroundNotify::FlushFinished(flush_finished),
327                }
328            }
329            Err(e) => {
330                error!(e; "Failed to flush region {}", self.region_id);
331                // Discard the timer.
332                timer.stop_and_discard();
333
334                let err = Arc::new(e);
335                self.on_failure(err.clone());
336                WorkerRequest::Background {
337                    region_id: self.region_id,
338                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
339                }
340            }
341        };
342        self.send_worker_request(worker_request).await;
343    }
344
345    /// Flushes memtables to level 0 SSTs and updates the manifest.
346    /// Returns the [RegionEdit] to apply.
347    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
348        // We must use the immutable memtables list and entry ids from the `version_data`
349        // for consistency as others might already modify the version in the `version_control`.
350        let version = &version_data.version;
351        let timer = FLUSH_ELAPSED
352            .with_label_values(&["flush_memtables"])
353            .start_timer();
354
355        let mut write_opts = WriteOptions {
356            write_buffer_size: self.engine_config.sst_write_buffer_size,
357            ..Default::default()
358        };
359        if let Some(row_group_size) = self.row_group_size {
360            write_opts.row_group_size = row_group_size;
361        }
362
363        let DoFlushMemtablesResult {
364            file_metas,
365            flushed_bytes,
366            series_count,
367            flush_metrics,
368        } = self.do_flush_memtables(version, write_opts).await?;
369
370        if !file_metas.is_empty() {
371            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
372        }
373
374        let mut file_ids = Vec::with_capacity(file_metas.len());
375        let mut total_rows = 0;
376        let mut total_bytes = 0;
377        for meta in &file_metas {
378            file_ids.push(meta.file_id);
379            total_rows += meta.num_rows;
380            total_bytes += meta.file_size;
381        }
382        info!(
383            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
384            self.region_id,
385            self.reason.as_str(),
386            file_ids,
387            series_count,
388            total_rows,
389            total_bytes,
390            timer.stop_and_record(),
391            flush_metrics,
392        );
393        flush_metrics.observe();
394
395        let edit = RegionEdit {
396            files_to_add: file_metas,
397            files_to_remove: Vec::new(),
398            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
399            compaction_time_window: None,
400            // The last entry has been flushed.
401            flushed_entry_id: Some(version_data.last_entry_id),
402            flushed_sequence: Some(version_data.committed_sequence),
403            committed_sequence: None,
404        };
405        info!(
406            "Applying {edit:?} to region {}, is_staging: {}",
407            self.region_id, self.is_staging
408        );
409
410        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
411
412        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
413            RegionLeaderState::Downgrading
414        } else {
415            // Check if region is in staging mode
416            let current_state = self.manifest_ctx.current_state();
417            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
418                RegionLeaderState::Staging
419            } else {
420                RegionLeaderState::Writable
421            }
422        };
423        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
424        // add a cleanup job to remove them later.
425        let version = self
426            .manifest_ctx
427            .update_manifest(expected_state, action_list, self.is_staging)
428            .await?;
429        info!(
430            "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
431            self.region_id,
432            self.is_staging,
433            self.reason.as_str()
434        );
435
436        Ok(edit)
437    }
438
439    async fn do_flush_memtables(
440        &self,
441        version: &VersionRef,
442        write_opts: WriteOptions,
443    ) -> Result<DoFlushMemtablesResult> {
444        let memtables = version.memtables.immutables();
445        let mut file_metas = Vec::with_capacity(memtables.len());
446        let mut flushed_bytes = 0;
447        let mut series_count = 0;
448        let mut flush_metrics = Metrics::new(WriteType::Flush);
449        let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
450        for mem in memtables {
451            if mem.is_empty() {
452                // Skip empty memtables.
453                continue;
454            }
455
456            // Compact the memtable first, this waits the background compaction to finish.
457            let compact_start = std::time::Instant::now();
458            if let Err(e) = mem.compact(true) {
459                common_telemetry::error!(e; "Failed to compact memtable before flush");
460            }
461            let compact_cost = compact_start.elapsed();
462            flush_metrics.compact_memtable += compact_cost;
463
464            // Sets `for_flush` flag to true.
465            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
466            let num_mem_ranges = mem_ranges.ranges.len();
467
468            // Aggregate stats from all ranges
469            let num_mem_rows = mem_ranges.num_rows();
470            let memtable_series_count = mem_ranges.series_count();
471            let memtable_id = mem.id();
472            // Increases series count for each mem range. We consider each mem range has different series so
473            // the counter may have more series than the actual series count.
474            series_count += memtable_series_count;
475
476            if mem_ranges.is_record_batch() {
477                let flush_start = Instant::now();
478                let FlushFlatMemResult {
479                    num_encoded,
480                    num_sources,
481                    results,
482                } = self
483                    .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
484                    .await?;
485                for (source_idx, result) in results.into_iter().enumerate() {
486                    let (max_sequence, ssts_written, metrics) = result?;
487                    if ssts_written.is_empty() {
488                        // No data written.
489                        continue;
490                    }
491
492                    common_telemetry::debug!(
493                        "Region {} flush one memtable {} {}/{}, metrics: {:?}",
494                        self.region_id,
495                        memtable_id,
496                        source_idx,
497                        num_sources,
498                        metrics
499                    );
500
501                    flush_metrics = flush_metrics.merge(metrics);
502
503                    file_metas.extend(ssts_written.into_iter().map(|sst_info| {
504                        flushed_bytes += sst_info.file_size;
505                        Self::new_file_meta(
506                            self.region_id,
507                            max_sequence,
508                            sst_info,
509                            partition_expr.clone(),
510                        )
511                    }));
512                }
513
514                common_telemetry::debug!(
515                    "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
516                    self.region_id,
517                    num_sources,
518                    memtable_id,
519                    num_mem_ranges,
520                    num_encoded,
521                    num_mem_rows,
522                    flush_start.elapsed(),
523                    compact_cost,
524                );
525            } else {
526                let max_sequence = mem_ranges.max_sequence();
527                let source = memtable_source(mem_ranges, &version.options).await?;
528
529                // Flush to level 0.
530                let source = Either::Left(source);
531                let write_request = self.new_write_request(version, max_sequence, source);
532
533                let mut metrics = Metrics::new(WriteType::Flush);
534                let ssts_written = self
535                    .access_layer
536                    .write_sst(write_request, &write_opts, &mut metrics)
537                    .await?;
538                if ssts_written.is_empty() {
539                    // No data written.
540                    continue;
541                }
542
543                debug!(
544                    "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
545                    self.region_id, num_mem_ranges, num_mem_rows, metrics
546                );
547
548                flush_metrics = flush_metrics.merge(metrics);
549
550                file_metas.extend(ssts_written.into_iter().map(|sst_info| {
551                    flushed_bytes += sst_info.file_size;
552                    Self::new_file_meta(
553                        self.region_id,
554                        max_sequence,
555                        sst_info,
556                        partition_expr.clone(),
557                    )
558                }));
559            };
560        }
561
562        Ok(DoFlushMemtablesResult {
563            file_metas,
564            flushed_bytes,
565            series_count,
566            flush_metrics,
567        })
568    }
569
570    async fn flush_flat_mem_ranges(
571        &self,
572        version: &VersionRef,
573        write_opts: &WriteOptions,
574        mem_ranges: MemtableRanges,
575    ) -> Result<FlushFlatMemResult> {
576        let batch_schema = to_flat_sst_arrow_schema(
577            &version.metadata,
578            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
579        );
580        let flat_sources = memtable_flat_sources(
581            batch_schema,
582            mem_ranges,
583            &version.options,
584            version.metadata.primary_key.len(),
585        )?;
586        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
587        let num_encoded = flat_sources.encoded.len();
588        for (source, max_sequence) in flat_sources.sources {
589            let source = Either::Right(source);
590            let write_request = self.new_write_request(version, max_sequence, source);
591            let access_layer = self.access_layer.clone();
592            let write_opts = write_opts.clone();
593            let semaphore = self.flush_semaphore.clone();
594            let task = common_runtime::spawn_global(async move {
595                let _permit = semaphore.acquire().await.unwrap();
596                let mut metrics = Metrics::new(WriteType::Flush);
597                let ssts = access_layer
598                    .write_sst(write_request, &write_opts, &mut metrics)
599                    .await?;
600                Ok((max_sequence, ssts, metrics))
601            });
602            tasks.push(task);
603        }
604        for (encoded, max_sequence) in flat_sources.encoded {
605            let access_layer = self.access_layer.clone();
606            let cache_manager = self.cache_manager.clone();
607            let region_id = version.metadata.region_id;
608            let semaphore = self.flush_semaphore.clone();
609            let task = common_runtime::spawn_global(async move {
610                let _permit = semaphore.acquire().await.unwrap();
611                let metrics = access_layer
612                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
613                    .await?;
614                Ok((max_sequence, smallvec![encoded.sst_info], metrics))
615            });
616            tasks.push(task);
617        }
618        let num_sources = tasks.len();
619        let results = futures::future::try_join_all(tasks)
620            .await
621            .context(JoinSnafu)?;
622        Ok(FlushFlatMemResult {
623            num_encoded,
624            num_sources,
625            results,
626        })
627    }
628
629    fn new_file_meta(
630        region_id: RegionId,
631        max_sequence: u64,
632        sst_info: SstInfo,
633        partition_expr: Option<PartitionExpr>,
634    ) -> FileMeta {
635        FileMeta {
636            region_id,
637            file_id: sst_info.file_id,
638            time_range: sst_info.time_range,
639            level: 0,
640            file_size: sst_info.file_size,
641            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
642            available_indexes: sst_info.index_metadata.build_available_indexes(),
643            indexes: sst_info.index_metadata.build_indexes(),
644            index_file_size: sst_info.index_metadata.file_size,
645            index_version: 0,
646            num_rows: sst_info.num_rows as u64,
647            num_row_groups: sst_info.num_row_groups,
648            sequence: NonZeroU64::new(max_sequence),
649            partition_expr,
650            num_series: sst_info.num_series,
651        }
652    }
653
654    fn new_write_request(
655        &self,
656        version: &VersionRef,
657        max_sequence: u64,
658        source: Either<Source, FlatSource>,
659    ) -> SstWriteRequest {
660        SstWriteRequest {
661            op_type: OperationType::Flush,
662            metadata: version.metadata.clone(),
663            source,
664            cache_manager: self.cache_manager.clone(),
665            storage: version.options.storage.clone(),
666            max_sequence: Some(max_sequence),
667            index_options: self.index_options.clone(),
668            index_config: self.engine_config.index.clone(),
669            inverted_index_config: self.engine_config.inverted_index.clone(),
670            fulltext_index_config: self.engine_config.fulltext_index.clone(),
671            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
672            #[cfg(feature = "vector_index")]
673            vector_index_config: self.engine_config.vector_index.clone(),
674        }
675    }
676
677    /// Notify flush job status.
678    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
679        if let Err(e) = self
680            .request_sender
681            .send(WorkerRequestWithTime::new(request))
682            .await
683        {
684            error!(
685                "Failed to notify flush job status for region {}, request: {:?}",
686                self.region_id, e.0
687            );
688        }
689    }
690
691    /// Merge two flush tasks.
692    fn merge(&mut self, mut other: RegionFlushTask) {
693        assert_eq!(self.region_id, other.region_id);
694        // Now we only merge senders. They share the same flush reason.
695        self.senders.append(&mut other.senders);
696    }
697}
698
699struct FlushFlatMemResult {
700    num_encoded: usize,
701    num_sources: usize,
702    results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
703}
704
705struct DoFlushMemtablesResult {
706    file_metas: Vec<FileMeta>,
707    flushed_bytes: u64,
708    series_count: usize,
709    flush_metrics: Metrics,
710}
711
712/// Returns a [Source] for the given memtable.
713async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
714    let source = if mem_ranges.ranges.len() == 1 {
715        let only_range = mem_ranges.ranges.into_values().next().unwrap();
716        let iter = only_range.build_iter()?;
717        Source::Iter(iter)
718    } else {
719        // todo(hl): a workaround since sync version of MergeReader is wip.
720        let sources = mem_ranges
721            .ranges
722            .into_values()
723            .map(|r| r.build_iter().map(Source::Iter))
724            .collect::<Result<Vec<_>>>()?;
725        let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
726        let maybe_dedup = if options.append_mode {
727            // no dedup in append mode
728            Box::new(merge_reader) as _
729        } else {
730            // dedup according to merge mode
731            match options.merge_mode.unwrap_or(MergeMode::LastRow) {
732                MergeMode::LastRow => {
733                    Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
734                }
735                MergeMode::LastNonNull => Box::new(DedupReader::new(
736                    merge_reader,
737                    LastNonNull::new(false),
738                    None,
739                )) as _,
740            }
741        };
742        Source::Reader(maybe_dedup)
743    };
744    Ok(source)
745}
746
747struct FlatSources {
748    sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
749    encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
750}
751
752/// Returns the max sequence and [FlatSource] for the given memtable.
753fn memtable_flat_sources(
754    schema: SchemaRef,
755    mem_ranges: MemtableRanges,
756    options: &RegionOptions,
757    field_column_start: usize,
758) -> Result<FlatSources> {
759    let MemtableRanges { ranges } = mem_ranges;
760    let mut flat_sources = FlatSources {
761        sources: SmallVec::new(),
762        encoded: SmallVec::new(),
763    };
764
765    if ranges.len() == 1 {
766        let only_range = ranges.into_values().next().unwrap();
767        let max_sequence = only_range.stats().max_sequence();
768        if let Some(encoded) = only_range.encoded() {
769            flat_sources.encoded.push((encoded, max_sequence));
770        } else {
771            let iter = only_range.build_record_batch_iter(None)?;
772            // Dedup according to append mode and merge mode.
773            // Even single range may have duplicate rows.
774            let iter = maybe_dedup_one(
775                options.append_mode,
776                options.merge_mode(),
777                field_column_start,
778                iter,
779            );
780            flat_sources
781                .sources
782                .push((FlatSource::Iter(iter), max_sequence));
783        };
784    } else {
785        // Calculate total rows from all ranges for min_flush_rows calculation
786        let total_rows: usize = ranges.values().map(|r| r.stats().num_rows()).sum();
787        let min_flush_rows = total_rows / 8;
788        let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
789        let mut last_iter_rows = 0;
790        let num_ranges = ranges.len();
791        let mut input_iters = Vec::with_capacity(num_ranges);
792        let mut current_ranges = Vec::new();
793        for (_range_id, range) in ranges {
794            if let Some(encoded) = range.encoded() {
795                let max_sequence = range.stats().max_sequence();
796                flat_sources.encoded.push((encoded, max_sequence));
797                continue;
798            }
799
800            let iter = range.build_record_batch_iter(None)?;
801            input_iters.push(iter);
802            last_iter_rows += range.num_rows();
803            current_ranges.push(range);
804
805            if last_iter_rows > min_flush_rows {
806                // Calculate max_sequence from all merged ranges
807                let max_sequence = current_ranges
808                    .iter()
809                    .map(|r| r.stats().max_sequence())
810                    .max()
811                    .unwrap_or(0);
812
813                let maybe_dedup = merge_and_dedup(
814                    &schema,
815                    options.append_mode,
816                    options.merge_mode(),
817                    field_column_start,
818                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
819                )?;
820
821                flat_sources
822                    .sources
823                    .push((FlatSource::Iter(maybe_dedup), max_sequence));
824                last_iter_rows = 0;
825                current_ranges.clear();
826            }
827        }
828
829        // Handle remaining iters.
830        if !input_iters.is_empty() {
831            let max_sequence = current_ranges
832                .iter()
833                .map(|r| r.stats().max_sequence())
834                .max()
835                .unwrap_or(0);
836
837            let maybe_dedup = merge_and_dedup(
838                &schema,
839                options.append_mode,
840                options.merge_mode(),
841                field_column_start,
842                input_iters,
843            )?;
844
845            flat_sources
846                .sources
847                .push((FlatSource::Iter(maybe_dedup), max_sequence));
848        }
849    }
850
851    Ok(flat_sources)
852}
853
854/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
855///
856/// This function is used during the flush process to combine data from multiple memtable ranges
857/// into a single stream while handling duplicate records according to the configured merge strategy.
858///
859/// # Arguments
860///
861/// * `schema` - The Arrow schema reference that defines the structure of the record batches
862/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
863///                  This is used for append-only workloads where duplicate handling is not required.
864/// * `merge_mode` - The strategy used for deduplication when not in append mode:
865///   - `MergeMode::LastRow`: Keeps the last record for each primary key
866///   - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
867/// * `field_column_start` - The starting column index for fields in the record batch.
868///                          Used when `MergeMode::LastNonNull` to identify which columns
869///                          contain field values versus primary key columns.
870/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
871///
872/// # Returns
873///
874/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
875/// record batches.
876///
877/// # Behavior
878///
879/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
880///    primary key and timestamp
881/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
882/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
883///    applies the specified merge mode:
884///    - `LastRow`: Removes duplicate rows, keeping only the last one
885///    - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
886///
887/// # Examples
888///
889/// ```ignore
890/// let merged_iter = merge_and_dedup(
891///     &schema,
892///     false,  // not append mode, apply dedup
893///     MergeMode::LastRow,
894///     2,  // fields start at column 2 after primary key columns
895///     vec![iter1, iter2, iter3],
896/// )?;
897/// ```
898pub fn merge_and_dedup(
899    schema: &SchemaRef,
900    append_mode: bool,
901    merge_mode: MergeMode,
902    field_column_start: usize,
903    input_iters: Vec<BoxedRecordBatchIterator>,
904) -> Result<BoxedRecordBatchIterator> {
905    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
906    let maybe_dedup = if append_mode {
907        // No dedup in append mode
908        Box::new(merge_iter) as _
909    } else {
910        // Dedup according to merge mode.
911        match merge_mode {
912            MergeMode::LastRow => {
913                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
914            }
915            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
916                merge_iter,
917                FlatLastNonNull::new(field_column_start, false),
918            )) as _,
919        }
920    };
921    Ok(maybe_dedup)
922}
923
924pub fn maybe_dedup_one(
925    append_mode: bool,
926    merge_mode: MergeMode,
927    field_column_start: usize,
928    input_iter: BoxedRecordBatchIterator,
929) -> BoxedRecordBatchIterator {
930    if append_mode {
931        // No dedup in append mode
932        input_iter
933    } else {
934        // Dedup according to merge mode.
935        match merge_mode {
936            MergeMode::LastRow => {
937                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
938            }
939            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
940                input_iter,
941                FlatLastNonNull::new(field_column_start, false),
942            )),
943        }
944    }
945}
946
947/// Manages background flushes of a worker.
948pub(crate) struct FlushScheduler {
949    /// Tracks regions need to flush.
950    region_status: HashMap<RegionId, FlushStatus>,
951    /// Background job scheduler.
952    scheduler: SchedulerRef,
953}
954
955impl FlushScheduler {
956    /// Creates a new flush scheduler.
957    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
958        FlushScheduler {
959            region_status: HashMap::new(),
960            scheduler,
961        }
962    }
963
964    /// Returns true if the region already requested flush.
965    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
966        self.region_status.contains_key(&region_id)
967    }
968
969    fn schedule_flush_task(
970        &mut self,
971        version_control: &VersionControlRef,
972        task: RegionFlushTask,
973    ) -> Result<()> {
974        let region_id = task.region_id;
975
976        // If current region doesn't have flush status, we can flush the region directly.
977        if let Err(e) = version_control.freeze_mutable() {
978            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
979
980            return Err(e);
981        }
982        // Submit a flush job.
983        let job = task.into_flush_job(version_control);
984        if let Err(e) = self.scheduler.schedule(job) {
985            // If scheduler returns error, senders in the job will be dropped and waiters
986            // can get recv errors.
987            error!(e; "Failed to schedule flush job for region {}", region_id);
988
989            return Err(e);
990        }
991        Ok(())
992    }
993
994    /// Schedules a flush `task` for specific `region`.
995    pub(crate) fn schedule_flush(
996        &mut self,
997        region_id: RegionId,
998        version_control: &VersionControlRef,
999        task: RegionFlushTask,
1000    ) -> Result<()> {
1001        debug_assert_eq!(region_id, task.region_id);
1002
1003        let version = version_control.current().version;
1004        if version.memtables.is_empty() {
1005            debug_assert!(!self.region_status.contains_key(&region_id));
1006            // The region has nothing to flush.
1007            task.on_success();
1008            return Ok(());
1009        }
1010
1011        // Don't increase the counter if a region has nothing to flush.
1012        FLUSH_REQUESTS_TOTAL
1013            .with_label_values(&[task.reason.as_str()])
1014            .inc();
1015
1016        // If current region has flush status, merge the task.
1017        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
1018            // Checks whether we can flush the region now.
1019            debug!("Merging flush task for region {}", region_id);
1020            flush_status.merge_task(task);
1021            return Ok(());
1022        }
1023
1024        self.schedule_flush_task(version_control, task)?;
1025
1026        // Add this region to status map.
1027        let _ = self.region_status.insert(
1028            region_id,
1029            FlushStatus::new(region_id, version_control.clone()),
1030        );
1031
1032        Ok(())
1033    }
1034
1035    /// Notifies the scheduler that the flush job is finished.
1036    ///
1037    /// Returns all pending requests if the region doesn't need to flush again.
1038    pub(crate) fn on_flush_success(
1039        &mut self,
1040        region_id: RegionId,
1041    ) -> Option<(
1042        Vec<SenderDdlRequest>,
1043        Vec<SenderWriteRequest>,
1044        Vec<SenderBulkRequest>,
1045    )> {
1046        let flush_status = self.region_status.get_mut(&region_id)?;
1047        // If region doesn't have any pending flush task, we need to remove it from the status.
1048        if flush_status.pending_task.is_none() {
1049            // The region doesn't have any pending flush task.
1050            // Safety: The flush status must exist.
1051            debug!(
1052                "Region {} doesn't have any pending flush task, removing it from the status",
1053                region_id
1054            );
1055            let flush_status = self.region_status.remove(&region_id).unwrap();
1056            return Some((
1057                flush_status.pending_ddls,
1058                flush_status.pending_writes,
1059                flush_status.pending_bulk_writes,
1060            ));
1061        }
1062
1063        // If region has pending task, but has nothing to flush, we need to remove it from the status.
1064        let version_data = flush_status.version_control.current();
1065        if version_data.version.memtables.is_empty() {
1066            // The region has nothing to flush, we also need to remove it from the status.
1067            // Safety: The pending task is not None.
1068            let task = flush_status.pending_task.take().unwrap();
1069            // The region has nothing to flush. We can notify pending task.
1070            task.on_success();
1071            debug!(
1072                "Region {} has nothing to flush, removing it from the status",
1073                region_id
1074            );
1075            // Safety: The flush status must exist.
1076            let flush_status = self.region_status.remove(&region_id).unwrap();
1077            return Some((
1078                flush_status.pending_ddls,
1079                flush_status.pending_writes,
1080                flush_status.pending_bulk_writes,
1081            ));
1082        }
1083
1084        // If region has pending task and has something to flush, we need to schedule it.
1085        debug!("Scheduling pending flush task for region {}", region_id);
1086        // Safety: The flush status must exist.
1087        let task = flush_status.pending_task.take().unwrap();
1088        let version_control = flush_status.version_control.clone();
1089        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1090            error!(
1091                err;
1092                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1093            );
1094        }
1095        // We can flush the region again, keep it in the region status.
1096        None
1097    }
1098
1099    /// Notifies the scheduler that the flush job is failed.
1100    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1101        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1102
1103        FLUSH_FAILURE_TOTAL.inc();
1104
1105        // Remove this region.
1106        let Some(flush_status) = self.region_status.remove(&region_id) else {
1107            return;
1108        };
1109
1110        // Fast fail: cancels all pending tasks and sends error to their waiters.
1111        flush_status.on_failure(err);
1112    }
1113
1114    /// Notifies the scheduler that the region is dropped.
1115    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1116        self.remove_region_on_failure(
1117            region_id,
1118            Arc::new(RegionDroppedSnafu { region_id }.build()),
1119        );
1120    }
1121
1122    /// Notifies the scheduler that the region is closed.
1123    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1124        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1125    }
1126
1127    /// Notifies the scheduler that the region is truncated.
1128    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1129        self.remove_region_on_failure(
1130            region_id,
1131            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1132        );
1133    }
1134
1135    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1136        // Remove this region.
1137        let Some(flush_status) = self.region_status.remove(&region_id) else {
1138            return;
1139        };
1140
1141        // Notifies all pending tasks.
1142        flush_status.on_failure(err);
1143    }
1144
1145    /// Add ddl request to pending queue.
1146    ///
1147    /// # Panics
1148    /// Panics if region didn't request flush.
1149    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1150        let status = self.region_status.get_mut(&request.region_id).unwrap();
1151        status.pending_ddls.push(request);
1152    }
1153
1154    /// Add write request to pending queue.
1155    ///
1156    /// # Panics
1157    /// Panics if region didn't request flush.
1158    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1159        let status = self
1160            .region_status
1161            .get_mut(&request.request.region_id)
1162            .unwrap();
1163        status.pending_writes.push(request);
1164    }
1165
1166    /// Add bulk write request to pending queue.
1167    ///
1168    /// # Panics
1169    /// Panics if region didn't request flush.
1170    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1171        let status = self.region_status.get_mut(&request.region_id).unwrap();
1172        status.pending_bulk_writes.push(request);
1173    }
1174
1175    /// Returns true if the region has pending DDLs.
1176    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1177        self.region_status
1178            .get(&region_id)
1179            .map(|status| !status.pending_ddls.is_empty())
1180            .unwrap_or(false)
1181    }
1182}
1183
1184impl Drop for FlushScheduler {
1185    fn drop(&mut self) {
1186        for (region_id, flush_status) in self.region_status.drain() {
1187            // We are shutting down so notify all pending tasks.
1188            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1189        }
1190    }
1191}
1192
1193/// Flush status of a region scheduled by the [FlushScheduler].
1194///
1195/// Tracks running and pending flush tasks and all pending requests of a region.
1196struct FlushStatus {
1197    /// Current region.
1198    region_id: RegionId,
1199    /// Version control of the region.
1200    version_control: VersionControlRef,
1201    /// Task waiting for next flush.
1202    pending_task: Option<RegionFlushTask>,
1203    /// Pending ddl requests.
1204    pending_ddls: Vec<SenderDdlRequest>,
1205    /// Requests waiting to write after altering the region.
1206    pending_writes: Vec<SenderWriteRequest>,
1207    /// Bulk requests waiting to write after altering the region.
1208    pending_bulk_writes: Vec<SenderBulkRequest>,
1209}
1210
1211impl FlushStatus {
1212    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1213        FlushStatus {
1214            region_id,
1215            version_control,
1216            pending_task: None,
1217            pending_ddls: Vec::new(),
1218            pending_writes: Vec::new(),
1219            pending_bulk_writes: Vec::new(),
1220        }
1221    }
1222
1223    /// Merges the task to pending task.
1224    fn merge_task(&mut self, task: RegionFlushTask) {
1225        if let Some(pending) = &mut self.pending_task {
1226            pending.merge(task);
1227        } else {
1228            self.pending_task = Some(task);
1229        }
1230    }
1231
1232    fn on_failure(self, err: Arc<Error>) {
1233        if let Some(mut task) = self.pending_task {
1234            task.on_failure(err.clone());
1235        }
1236        for ddl in self.pending_ddls {
1237            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1238                region_id: self.region_id,
1239            }));
1240        }
1241        for write_req in self.pending_writes {
1242            write_req
1243                .sender
1244                .send(Err(err.clone()).context(FlushRegionSnafu {
1245                    region_id: self.region_id,
1246                }));
1247        }
1248    }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use mito_codec::row_converter::build_primary_key_codec;
1254    use tokio::sync::oneshot;
1255
1256    use super::*;
1257    use crate::cache::CacheManager;
1258    use crate::memtable::bulk::part::BulkPartConverter;
1259    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1260    use crate::memtable::{Memtable, RangesOptions};
1261    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1262    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1263    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1264    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1265
1266    #[test]
1267    fn test_get_mutable_limit() {
1268        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1269        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1270        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1271        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1272    }
1273
1274    #[test]
1275    fn test_over_mutable_limit() {
1276        // Mutable limit is 500.
1277        let manager = WriteBufferManagerImpl::new(1000);
1278        manager.reserve_mem(400);
1279        assert!(!manager.should_flush_engine());
1280        assert!(!manager.should_stall());
1281
1282        // More than mutable limit.
1283        manager.reserve_mem(400);
1284        assert!(manager.should_flush_engine());
1285
1286        // Freezes mutable.
1287        manager.schedule_free_mem(400);
1288        assert!(!manager.should_flush_engine());
1289        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1290        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1291
1292        // Releases immutable.
1293        manager.free_mem(400);
1294        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1295        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1296    }
1297
1298    #[test]
1299    fn test_over_global() {
1300        // Mutable limit is 500.
1301        let manager = WriteBufferManagerImpl::new(1000);
1302        manager.reserve_mem(1100);
1303        assert!(manager.should_stall());
1304        // Global usage is still 1100.
1305        manager.schedule_free_mem(200);
1306        assert!(manager.should_flush_engine());
1307        assert!(manager.should_stall());
1308
1309        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1310        manager.schedule_free_mem(450);
1311        assert!(manager.should_flush_engine());
1312        assert!(manager.should_stall());
1313
1314        // Now mutable is enough.
1315        manager.reserve_mem(50);
1316        assert!(manager.should_flush_engine());
1317        manager.reserve_mem(100);
1318        assert!(manager.should_flush_engine());
1319    }
1320
1321    #[test]
1322    fn test_manager_notify() {
1323        let (sender, receiver) = watch::channel(());
1324        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1325        manager.reserve_mem(500);
1326        assert!(!receiver.has_changed().unwrap());
1327        manager.schedule_free_mem(500);
1328        assert!(!receiver.has_changed().unwrap());
1329        manager.free_mem(500);
1330        assert!(receiver.has_changed().unwrap());
1331    }
1332
1333    #[tokio::test]
1334    async fn test_schedule_empty() {
1335        let env = SchedulerEnv::new().await;
1336        let (tx, _rx) = mpsc::channel(4);
1337        let mut scheduler = env.mock_flush_scheduler();
1338        let builder = VersionControlBuilder::new();
1339
1340        let version_control = Arc::new(builder.build());
1341        let (output_tx, output_rx) = oneshot::channel();
1342        let mut task = RegionFlushTask {
1343            region_id: builder.region_id(),
1344            reason: FlushReason::Others,
1345            senders: Vec::new(),
1346            request_sender: tx,
1347            access_layer: env.access_layer.clone(),
1348            listener: WorkerListener::default(),
1349            engine_config: Arc::new(MitoConfig::default()),
1350            row_group_size: None,
1351            cache_manager: Arc::new(CacheManager::default()),
1352            manifest_ctx: env
1353                .mock_manifest_context(version_control.current().version.metadata.clone())
1354                .await,
1355            index_options: IndexOptions::default(),
1356            flush_semaphore: Arc::new(Semaphore::new(2)),
1357            is_staging: false,
1358            partition_expr: None,
1359        };
1360        task.push_sender(OptionOutputTx::from(output_tx));
1361        scheduler
1362            .schedule_flush(builder.region_id(), &version_control, task)
1363            .unwrap();
1364        assert!(scheduler.region_status.is_empty());
1365        let output = output_rx.await.unwrap().unwrap();
1366        assert_eq!(output, 0);
1367        assert!(scheduler.region_status.is_empty());
1368    }
1369
1370    #[tokio::test]
1371    async fn test_schedule_pending_request() {
1372        let job_scheduler = Arc::new(VecScheduler::default());
1373        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1374        let (tx, _rx) = mpsc::channel(4);
1375        let mut scheduler = env.mock_flush_scheduler();
1376        let mut builder = VersionControlBuilder::new();
1377        // Overwrites the empty memtable builder.
1378        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1379        let version_control = Arc::new(builder.build());
1380        // Writes data to the memtable so it is not empty.
1381        let version_data = version_control.current();
1382        write_rows_to_version(&version_data.version, "host0", 0, 10);
1383        let manifest_ctx = env
1384            .mock_manifest_context(version_data.version.metadata.clone())
1385            .await;
1386        // Creates 3 tasks.
1387        let mut tasks: Vec<_> = (0..3)
1388            .map(|_| RegionFlushTask {
1389                region_id: builder.region_id(),
1390                reason: FlushReason::Others,
1391                senders: Vec::new(),
1392                request_sender: tx.clone(),
1393                access_layer: env.access_layer.clone(),
1394                listener: WorkerListener::default(),
1395                engine_config: Arc::new(MitoConfig::default()),
1396                row_group_size: None,
1397                cache_manager: Arc::new(CacheManager::default()),
1398                manifest_ctx: manifest_ctx.clone(),
1399                index_options: IndexOptions::default(),
1400                flush_semaphore: Arc::new(Semaphore::new(2)),
1401                is_staging: false,
1402                partition_expr: None,
1403            })
1404            .collect();
1405        // Schedule first task.
1406        let task = tasks.pop().unwrap();
1407        scheduler
1408            .schedule_flush(builder.region_id(), &version_control, task)
1409            .unwrap();
1410        // Should schedule 1 flush.
1411        assert_eq!(1, scheduler.region_status.len());
1412        assert_eq!(1, job_scheduler.num_jobs());
1413        // Check the new version.
1414        let version_data = version_control.current();
1415        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1416        // Schedule remaining tasks.
1417        let output_rxs: Vec<_> = tasks
1418            .into_iter()
1419            .map(|mut task| {
1420                let (output_tx, output_rx) = oneshot::channel();
1421                task.push_sender(OptionOutputTx::from(output_tx));
1422                scheduler
1423                    .schedule_flush(builder.region_id(), &version_control, task)
1424                    .unwrap();
1425                output_rx
1426            })
1427            .collect();
1428        // Assumes the flush job is finished.
1429        version_control.apply_edit(
1430            Some(RegionEdit {
1431                files_to_add: Vec::new(),
1432                files_to_remove: Vec::new(),
1433                timestamp_ms: None,
1434                compaction_time_window: None,
1435                flushed_entry_id: None,
1436                flushed_sequence: None,
1437                committed_sequence: None,
1438            }),
1439            &[0],
1440            builder.file_purger(),
1441        );
1442        scheduler.on_flush_success(builder.region_id());
1443        // No new flush task.
1444        assert_eq!(1, job_scheduler.num_jobs());
1445        // The flush status is cleared.
1446        assert!(scheduler.region_status.is_empty());
1447        for output_rx in output_rxs {
1448            let output = output_rx.await.unwrap().unwrap();
1449            assert_eq!(output, 0);
1450        }
1451    }
1452
1453    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1454    #[test]
1455    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1456        // Build test metadata and flat schema
1457        let metadata = metadata_for_test();
1458        let schema = to_flat_sst_arrow_schema(
1459            &metadata,
1460            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1461        );
1462
1463        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1464        // Two rows with identical keys and timestamps (ts = 1000), different field values
1465        let capacity = 16;
1466        let pk_codec = build_primary_key_codec(&metadata);
1467        let mut converter =
1468            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1469        let kvs = build_key_values_with_ts_seq_values(
1470            &metadata,
1471            "dup_key".to_string(),
1472            1,
1473            vec![1000i64, 1000i64].into_iter(),
1474            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1475            1,
1476        );
1477        converter.append_key_values(&kvs).unwrap();
1478        let part = converter.convert().unwrap();
1479
1480        // Helper to build MemtableRanges with a single range from one bulk part.
1481        // We use BulkMemtable directly because it produces record batch iterators.
1482        let build_ranges = |append_mode: bool| -> MemtableRanges {
1483            let memtable = crate::memtable::bulk::BulkMemtable::new(
1484                1,
1485                metadata.clone(),
1486                None,
1487                None,
1488                append_mode,
1489                MergeMode::LastRow,
1490            );
1491            memtable.write_bulk(part.clone()).unwrap();
1492            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1493        };
1494
1495        // Case 1: append_mode = false => dedup happens, total rows should be 1
1496        {
1497            let mem_ranges = build_ranges(false);
1498            assert_eq!(1, mem_ranges.ranges.len());
1499
1500            let options = RegionOptions {
1501                append_mode: false,
1502                merge_mode: Some(MergeMode::LastRow),
1503                ..Default::default()
1504            };
1505
1506            let flat_sources = memtable_flat_sources(
1507                schema.clone(),
1508                mem_ranges,
1509                &options,
1510                metadata.primary_key.len(),
1511            )
1512            .unwrap();
1513            assert!(flat_sources.encoded.is_empty());
1514            assert_eq!(1, flat_sources.sources.len());
1515
1516            // Consume the iterator and count rows
1517            let mut total_rows = 0usize;
1518            for (source, _sequence) in flat_sources.sources {
1519                match source {
1520                    crate::read::FlatSource::Iter(iter) => {
1521                        for rb in iter {
1522                            total_rows += rb.unwrap().num_rows();
1523                        }
1524                    }
1525                    crate::read::FlatSource::Stream(_) => unreachable!(),
1526                }
1527            }
1528            assert_eq!(1, total_rows, "dedup should keep a single row");
1529        }
1530
1531        // Case 2: append_mode = true => no dedup, total rows should be 2
1532        {
1533            let mem_ranges = build_ranges(true);
1534            assert_eq!(1, mem_ranges.ranges.len());
1535
1536            let options = RegionOptions {
1537                append_mode: true,
1538                ..Default::default()
1539            };
1540
1541            let flat_sources =
1542                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1543                    .unwrap();
1544            assert!(flat_sources.encoded.is_empty());
1545            assert_eq!(1, flat_sources.sources.len());
1546
1547            let mut total_rows = 0usize;
1548            for (source, _sequence) in flat_sources.sources {
1549                match source {
1550                    crate::read::FlatSource::Iter(iter) => {
1551                        for rb in iter {
1552                            total_rows += rb.unwrap().num_rows();
1553                        }
1554                    }
1555                    crate::read::FlatSource::Stream(_) => unreachable!(),
1556                }
1557            }
1558            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1559        }
1560    }
1561
1562    #[tokio::test]
1563    async fn test_schedule_pending_request_on_flush_success() {
1564        common_telemetry::init_default_ut_logging();
1565        let job_scheduler = Arc::new(VecScheduler::default());
1566        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1567        let (tx, _rx) = mpsc::channel(4);
1568        let mut scheduler = env.mock_flush_scheduler();
1569        let mut builder = VersionControlBuilder::new();
1570        // Overwrites the empty memtable builder.
1571        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1572        let version_control = Arc::new(builder.build());
1573        // Writes data to the memtable so it is not empty.
1574        let version_data = version_control.current();
1575        write_rows_to_version(&version_data.version, "host0", 0, 10);
1576        let manifest_ctx = env
1577            .mock_manifest_context(version_data.version.metadata.clone())
1578            .await;
1579        // Creates 2 tasks.
1580        let mut tasks: Vec<_> = (0..2)
1581            .map(|_| RegionFlushTask {
1582                region_id: builder.region_id(),
1583                reason: FlushReason::Others,
1584                senders: Vec::new(),
1585                request_sender: tx.clone(),
1586                access_layer: env.access_layer.clone(),
1587                listener: WorkerListener::default(),
1588                engine_config: Arc::new(MitoConfig::default()),
1589                row_group_size: None,
1590                cache_manager: Arc::new(CacheManager::default()),
1591                manifest_ctx: manifest_ctx.clone(),
1592                index_options: IndexOptions::default(),
1593                flush_semaphore: Arc::new(Semaphore::new(2)),
1594                is_staging: false,
1595                partition_expr: None,
1596            })
1597            .collect();
1598        // Schedule first task.
1599        let task = tasks.pop().unwrap();
1600        scheduler
1601            .schedule_flush(builder.region_id(), &version_control, task)
1602            .unwrap();
1603        // Should schedule 1 flush.
1604        assert_eq!(1, scheduler.region_status.len());
1605        assert_eq!(1, job_scheduler.num_jobs());
1606        // Schedule second task.
1607        let task = tasks.pop().unwrap();
1608        scheduler
1609            .schedule_flush(builder.region_id(), &version_control, task)
1610            .unwrap();
1611        assert!(
1612            scheduler
1613                .region_status
1614                .get(&builder.region_id())
1615                .unwrap()
1616                .pending_task
1617                .is_some()
1618        );
1619
1620        // Check the new version.
1621        let version_data = version_control.current();
1622        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1623        // Assumes the flush job is finished.
1624        version_control.apply_edit(
1625            Some(RegionEdit {
1626                files_to_add: Vec::new(),
1627                files_to_remove: Vec::new(),
1628                timestamp_ms: None,
1629                compaction_time_window: None,
1630                flushed_entry_id: None,
1631                flushed_sequence: None,
1632                committed_sequence: None,
1633            }),
1634            &[0],
1635            builder.file_purger(),
1636        );
1637        write_rows_to_version(&version_data.version, "host1", 0, 10);
1638        scheduler.on_flush_success(builder.region_id());
1639        assert_eq!(2, job_scheduler.num_jobs());
1640        // The pending task is cleared.
1641        assert!(
1642            scheduler
1643                .region_status
1644                .get(&builder.region_id())
1645                .unwrap()
1646                .pending_task
1647                .is_none()
1648        );
1649    }
1650}