mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::{RegionId, SequenceNumber};
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
40    RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
44use crate::memtable::{
45    BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
46};
47use crate::metrics::{
48    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
49    INFLIGHT_FLUSH_COUNT,
50};
51use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
52use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
53use crate::read::flat_merge::FlatMergeIterator;
54use crate::read::merge::MergeReaderBuilder;
55use crate::read::{FlatSource, Source};
56use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
57use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
58use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
59use crate::request::{
60    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
61    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
62};
63use crate::schedule::scheduler::{Job, SchedulerRef};
64use crate::sst::file::FileMeta;
65use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
66use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
67use crate::worker::WorkerListener;
68
69/// Global write buffer (memtable) manager.
70///
71/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
72pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
73    /// Returns whether to trigger the engine.
74    fn should_flush_engine(&self) -> bool;
75
76    /// Returns whether to stall write requests.
77    fn should_stall(&self) -> bool;
78
79    /// Reserves `mem` bytes.
80    fn reserve_mem(&self, mem: usize);
81
82    /// Tells the manager we are freeing `mem` bytes.
83    ///
84    /// We are in the process of freeing `mem` bytes, so it is not considered
85    /// when checking the soft limit.
86    fn schedule_free_mem(&self, mem: usize);
87
88    /// We have freed `mem` bytes.
89    fn free_mem(&self, mem: usize);
90
91    /// Returns the total memory used by memtables.
92    fn memory_usage(&self) -> usize;
93
94    /// Returns the mutable memtable memory limit.
95    ///
96    /// The write buffer manager should flush memtables when the mutable memory usage
97    /// exceeds this limit.
98    fn flush_limit(&self) -> usize;
99}
100
101pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
102
103/// Default [WriteBufferManager] implementation.
104///
105/// Inspired by RocksDB's WriteBufferManager.
106/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
107#[derive(Debug)]
108pub struct WriteBufferManagerImpl {
109    /// Write buffer size for the engine.
110    global_write_buffer_size: usize,
111    /// Mutable memtable memory size limit.
112    mutable_limit: usize,
113    /// Memory in used (e.g. used by mutable and immutable memtables).
114    memory_used: AtomicUsize,
115    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
116    memory_active: AtomicUsize,
117    /// Optional notifier.
118    /// The manager can wake up the worker once we free the write buffer.
119    notifier: Option<watch::Sender<()>>,
120}
121
122impl WriteBufferManagerImpl {
123    /// Returns a new manager with specific `global_write_buffer_size`.
124    pub fn new(global_write_buffer_size: usize) -> Self {
125        Self {
126            global_write_buffer_size,
127            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
128            memory_used: AtomicUsize::new(0),
129            memory_active: AtomicUsize::new(0),
130            notifier: None,
131        }
132    }
133
134    /// Attaches a notifier to the manager.
135    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
136        self.notifier = Some(notifier);
137        self
138    }
139
140    /// Returns memory usage of mutable memtables.
141    pub fn mutable_usage(&self) -> usize {
142        self.memory_active.load(Ordering::Relaxed)
143    }
144
145    /// Returns the size limit for mutable memtables.
146    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
147        // Reserves half of the write buffer for mutable memtable.
148        global_write_buffer_size / 2
149    }
150}
151
152impl WriteBufferManager for WriteBufferManagerImpl {
153    fn should_flush_engine(&self) -> bool {
154        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
155        if mutable_memtable_memory_usage >= self.mutable_limit {
156            debug!(
157                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
158                mutable_memtable_memory_usage,
159                self.memory_usage(),
160                self.mutable_limit,
161                self.global_write_buffer_size,
162            );
163            return true;
164        }
165
166        let memory_usage = self.memory_used.load(Ordering::Relaxed);
167        if memory_usage >= self.global_write_buffer_size {
168            return true;
169        }
170
171        false
172    }
173
174    fn should_stall(&self) -> bool {
175        self.memory_usage() >= self.global_write_buffer_size
176    }
177
178    fn reserve_mem(&self, mem: usize) {
179        self.memory_used.fetch_add(mem, Ordering::Relaxed);
180        self.memory_active.fetch_add(mem, Ordering::Relaxed);
181    }
182
183    fn schedule_free_mem(&self, mem: usize) {
184        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
185    }
186
187    fn free_mem(&self, mem: usize) {
188        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
189        if let Some(notifier) = &self.notifier {
190            // Notifies the worker after the memory usage is decreased. When we drop the memtable
191            // outside of the worker, the worker may still stall requests because the memory usage
192            // is not updated. So we need to notify the worker to handle stalled requests again.
193            let _ = notifier.send(());
194        }
195    }
196
197    fn memory_usage(&self) -> usize {
198        self.memory_used.load(Ordering::Relaxed)
199    }
200
201    fn flush_limit(&self) -> usize {
202        self.mutable_limit
203    }
204}
205
206/// Reason of a flush task.
207#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
208pub enum FlushReason {
209    /// Other reasons.
210    Others,
211    /// Engine reaches flush threshold.
212    EngineFull,
213    /// Manual flush.
214    Manual,
215    /// Flush to alter table.
216    Alter,
217    /// Flush periodically.
218    Periodically,
219    /// Flush memtable during downgrading state.
220    Downgrading,
221    /// Enter staging mode.
222    EnterStaging,
223    /// Flush when region is closing.
224    Closing,
225}
226
227impl FlushReason {
228    /// Get flush reason as static str.
229    fn as_str(&self) -> &'static str {
230        self.into()
231    }
232}
233
234/// Task to flush a region.
235pub(crate) struct RegionFlushTask {
236    /// Region to flush.
237    pub(crate) region_id: RegionId,
238    /// Reason to flush.
239    pub(crate) reason: FlushReason,
240    /// Flush result senders.
241    pub(crate) senders: Vec<OutputTx>,
242    /// Request sender to notify the worker.
243    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
244
245    pub(crate) access_layer: AccessLayerRef,
246    pub(crate) listener: WorkerListener,
247    pub(crate) engine_config: Arc<MitoConfig>,
248    pub(crate) row_group_size: Option<usize>,
249    pub(crate) cache_manager: CacheManagerRef,
250    pub(crate) manifest_ctx: ManifestContextRef,
251
252    /// Index options for the region.
253    pub(crate) index_options: IndexOptions,
254    /// Semaphore to control flush concurrency.
255    pub(crate) flush_semaphore: Arc<Semaphore>,
256    /// Whether the region is in staging mode.
257    pub(crate) is_staging: bool,
258    /// Partition expression of the region.
259    ///
260    /// This is used to generate the file meta.
261    pub(crate) partition_expr: Option<String>,
262}
263
264impl RegionFlushTask {
265    /// Push the sender if it is not none.
266    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
267        if let Some(sender) = sender.take_inner() {
268            self.senders.push(sender);
269        }
270    }
271
272    /// Consumes the task and notify the sender the job is success.
273    fn on_success(self) {
274        for sender in self.senders {
275            sender.send(Ok(0));
276        }
277    }
278
279    /// Send flush error to waiter.
280    fn on_failure(&mut self, err: Arc<Error>) {
281        for sender in self.senders.drain(..) {
282            sender.send(Err(err.clone()).context(FlushRegionSnafu {
283                region_id: self.region_id,
284            }));
285        }
286    }
287
288    /// Converts the flush task into a background job.
289    ///
290    /// We must call this in the region worker.
291    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
292        // Get a version of this region before creating a job to get current
293        // wal entry id, sequence and immutable memtables.
294        let version_data = version_control.current();
295
296        Box::pin(async move {
297            INFLIGHT_FLUSH_COUNT.inc();
298            self.do_flush(version_data).await;
299            INFLIGHT_FLUSH_COUNT.dec();
300        })
301    }
302
303    /// Runs the flush task.
304    async fn do_flush(&mut self, version_data: VersionControlData) {
305        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
306        self.listener.on_flush_begin(self.region_id).await;
307
308        let worker_request = match self.flush_memtables(&version_data).await {
309            Ok(edit) => {
310                let memtables_to_remove = version_data
311                    .version
312                    .memtables
313                    .immutables()
314                    .iter()
315                    .map(|m| m.id())
316                    .collect();
317                let flush_finished = FlushFinished {
318                    region_id: self.region_id,
319                    // The last entry has been flushed.
320                    flushed_entry_id: version_data.last_entry_id,
321                    senders: std::mem::take(&mut self.senders),
322                    _timer: timer,
323                    edit,
324                    memtables_to_remove,
325                    is_staging: self.is_staging,
326                    flush_reason: self.reason,
327                };
328                WorkerRequest::Background {
329                    region_id: self.region_id,
330                    notify: BackgroundNotify::FlushFinished(flush_finished),
331                }
332            }
333            Err(e) => {
334                error!(e; "Failed to flush region {}", self.region_id);
335                // Discard the timer.
336                timer.stop_and_discard();
337
338                let err = Arc::new(e);
339                self.on_failure(err.clone());
340                WorkerRequest::Background {
341                    region_id: self.region_id,
342                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
343                }
344            }
345        };
346        self.send_worker_request(worker_request).await;
347    }
348
349    /// Flushes memtables to level 0 SSTs and updates the manifest.
350    /// Returns the [RegionEdit] to apply.
351    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
352        // We must use the immutable memtables list and entry ids from the `version_data`
353        // for consistency as others might already modify the version in the `version_control`.
354        let version = &version_data.version;
355        let timer = FLUSH_ELAPSED
356            .with_label_values(&["flush_memtables"])
357            .start_timer();
358
359        let mut write_opts = WriteOptions {
360            write_buffer_size: self.engine_config.sst_write_buffer_size,
361            ..Default::default()
362        };
363        if let Some(row_group_size) = self.row_group_size {
364            write_opts.row_group_size = row_group_size;
365        }
366
367        let DoFlushMemtablesResult {
368            file_metas,
369            flushed_bytes,
370            series_count,
371            encoded_part_count,
372            flush_metrics,
373        } = self.do_flush_memtables(version, write_opts).await?;
374
375        if !file_metas.is_empty() {
376            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
377        }
378
379        let mut file_ids = Vec::with_capacity(file_metas.len());
380        let mut total_rows = 0;
381        let mut total_bytes = 0;
382        for meta in &file_metas {
383            file_ids.push(meta.file_id);
384            total_rows += meta.num_rows;
385            total_bytes += meta.file_size;
386        }
387        info!(
388            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
389            self.region_id,
390            self.reason.as_str(),
391            file_ids,
392            series_count,
393            total_rows,
394            total_bytes,
395            timer.stop_and_record(),
396            encoded_part_count,
397            flush_metrics,
398        );
399        flush_metrics.observe();
400
401        let edit = RegionEdit {
402            files_to_add: file_metas,
403            files_to_remove: Vec::new(),
404            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
405            compaction_time_window: None,
406            // The last entry has been flushed.
407            flushed_entry_id: Some(version_data.last_entry_id),
408            flushed_sequence: Some(version_data.committed_sequence),
409            committed_sequence: None,
410        };
411        info!(
412            "Applying {edit:?} to region {}, is_staging: {}",
413            self.region_id, self.is_staging
414        );
415
416        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
417
418        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
419            RegionLeaderState::Downgrading
420        } else {
421            // Check if region is in staging mode
422            let current_state = self.manifest_ctx.current_state();
423            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
424                RegionLeaderState::Staging
425            } else {
426                RegionLeaderState::Writable
427            }
428        };
429        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
430        // add a cleanup job to remove them later.
431        let version = self
432            .manifest_ctx
433            .update_manifest(expected_state, action_list, self.is_staging)
434            .await?;
435        info!(
436            "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
437            self.region_id,
438            self.is_staging,
439            self.reason.as_str()
440        );
441
442        Ok(edit)
443    }
444
445    async fn do_flush_memtables(
446        &self,
447        version: &VersionRef,
448        write_opts: WriteOptions,
449    ) -> Result<DoFlushMemtablesResult> {
450        let memtables = version.memtables.immutables();
451        let mut file_metas = Vec::with_capacity(memtables.len());
452        let mut flushed_bytes = 0;
453        let mut series_count = 0;
454        let mut encoded_part_count = 0;
455        let mut flush_metrics = Metrics::new(WriteType::Flush);
456        let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
457        for mem in memtables {
458            if mem.is_empty() {
459                // Skip empty memtables.
460                continue;
461            }
462
463            // Compact the memtable first, this waits the background compaction to finish.
464            let compact_start = std::time::Instant::now();
465            if let Err(e) = mem.compact(true) {
466                common_telemetry::error!(e; "Failed to compact memtable before flush");
467            }
468            let compact_cost = compact_start.elapsed();
469            flush_metrics.compact_memtable += compact_cost;
470
471            // Sets `for_flush` flag to true.
472            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
473            let num_mem_ranges = mem_ranges.ranges.len();
474
475            // Aggregate stats from all ranges
476            let num_mem_rows = mem_ranges.num_rows();
477            let memtable_series_count = mem_ranges.series_count();
478            let memtable_id = mem.id();
479            // Increases series count for each mem range. We consider each mem range has different series so
480            // the counter may have more series than the actual series count.
481            series_count += memtable_series_count;
482
483            if mem_ranges.is_record_batch() {
484                let flush_start = Instant::now();
485                let FlushFlatMemResult {
486                    num_encoded,
487                    num_sources,
488                    results,
489                } = self
490                    .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
491                    .await?;
492                encoded_part_count += num_encoded;
493                for (source_idx, result) in results.into_iter().enumerate() {
494                    let (max_sequence, ssts_written, metrics) = result?;
495                    if ssts_written.is_empty() {
496                        // No data written.
497                        continue;
498                    }
499
500                    common_telemetry::debug!(
501                        "Region {} flush one memtable {} {}/{}, metrics: {:?}",
502                        self.region_id,
503                        memtable_id,
504                        source_idx,
505                        num_sources,
506                        metrics
507                    );
508
509                    flush_metrics = flush_metrics.merge(metrics);
510
511                    file_metas.extend(ssts_written.into_iter().map(|sst_info| {
512                        flushed_bytes += sst_info.file_size;
513                        Self::new_file_meta(
514                            self.region_id,
515                            max_sequence,
516                            sst_info,
517                            partition_expr.clone(),
518                        )
519                    }));
520                }
521
522                common_telemetry::debug!(
523                    "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
524                    self.region_id,
525                    num_sources,
526                    memtable_id,
527                    num_mem_ranges,
528                    num_encoded,
529                    num_mem_rows,
530                    flush_start.elapsed(),
531                    compact_cost,
532                );
533            } else {
534                let max_sequence = mem_ranges.max_sequence();
535                let source = memtable_source(mem_ranges, &version.options).await?;
536
537                // Flush to level 0.
538                let source = Either::Left(source);
539                let write_request = self.new_write_request(version, max_sequence, source);
540
541                let mut metrics = Metrics::new(WriteType::Flush);
542                let ssts_written = self
543                    .access_layer
544                    .write_sst(write_request, &write_opts, &mut metrics)
545                    .await?;
546                FLUSH_FILE_TOTAL.inc_by(ssts_written.len() as u64);
547                if ssts_written.is_empty() {
548                    // No data written.
549                    continue;
550                }
551
552                debug!(
553                    "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
554                    self.region_id, num_mem_ranges, num_mem_rows, metrics
555                );
556
557                flush_metrics = flush_metrics.merge(metrics);
558
559                file_metas.extend(ssts_written.into_iter().map(|sst_info| {
560                    flushed_bytes += sst_info.file_size;
561                    Self::new_file_meta(
562                        self.region_id,
563                        max_sequence,
564                        sst_info,
565                        partition_expr.clone(),
566                    )
567                }));
568            };
569        }
570
571        Ok(DoFlushMemtablesResult {
572            file_metas,
573            flushed_bytes,
574            series_count,
575            encoded_part_count,
576            flush_metrics,
577        })
578    }
579
580    async fn flush_flat_mem_ranges(
581        &self,
582        version: &VersionRef,
583        write_opts: &WriteOptions,
584        mem_ranges: MemtableRanges,
585    ) -> Result<FlushFlatMemResult> {
586        let batch_schema = to_flat_sst_arrow_schema(
587            &version.metadata,
588            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
589        );
590        let flat_sources = memtable_flat_sources(
591            batch_schema,
592            mem_ranges,
593            &version.options,
594            version.metadata.primary_key.len(),
595        )?;
596        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
597        let num_encoded = flat_sources.encoded.len();
598        for (source, max_sequence) in flat_sources.sources {
599            let source = Either::Right(source);
600            let write_request = self.new_write_request(version, max_sequence, source);
601            let access_layer = self.access_layer.clone();
602            let write_opts = write_opts.clone();
603            let semaphore = self.flush_semaphore.clone();
604            let task = common_runtime::spawn_global(async move {
605                let _permit = semaphore.acquire().await.unwrap();
606                let mut metrics = Metrics::new(WriteType::Flush);
607                let ssts = access_layer
608                    .write_sst(write_request, &write_opts, &mut metrics)
609                    .await?;
610                FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
611                Ok((max_sequence, ssts, metrics))
612            });
613            tasks.push(task);
614        }
615        for (encoded, max_sequence) in flat_sources.encoded {
616            let access_layer = self.access_layer.clone();
617            let cache_manager = self.cache_manager.clone();
618            let region_id = version.metadata.region_id;
619            let semaphore = self.flush_semaphore.clone();
620            let task = common_runtime::spawn_global(async move {
621                let _permit = semaphore.acquire().await.unwrap();
622                let metrics = access_layer
623                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
624                    .await?;
625                FLUSH_FILE_TOTAL.inc();
626                Ok((max_sequence, smallvec![encoded.sst_info], metrics))
627            });
628            tasks.push(task);
629        }
630        let num_sources = tasks.len();
631        let results = futures::future::try_join_all(tasks)
632            .await
633            .context(JoinSnafu)?;
634        Ok(FlushFlatMemResult {
635            num_encoded,
636            num_sources,
637            results,
638        })
639    }
640
641    fn new_file_meta(
642        region_id: RegionId,
643        max_sequence: u64,
644        sst_info: SstInfo,
645        partition_expr: Option<PartitionExpr>,
646    ) -> FileMeta {
647        FileMeta {
648            region_id,
649            file_id: sst_info.file_id,
650            time_range: sst_info.time_range,
651            level: 0,
652            file_size: sst_info.file_size,
653            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
654            available_indexes: sst_info.index_metadata.build_available_indexes(),
655            indexes: sst_info.index_metadata.build_indexes(),
656            index_file_size: sst_info.index_metadata.file_size,
657            index_version: 0,
658            num_rows: sst_info.num_rows as u64,
659            num_row_groups: sst_info.num_row_groups,
660            sequence: NonZeroU64::new(max_sequence),
661            partition_expr,
662            num_series: sst_info.num_series,
663        }
664    }
665
666    fn new_write_request(
667        &self,
668        version: &VersionRef,
669        max_sequence: u64,
670        source: Either<Source, FlatSource>,
671    ) -> SstWriteRequest {
672        SstWriteRequest {
673            op_type: OperationType::Flush,
674            metadata: version.metadata.clone(),
675            source,
676            cache_manager: self.cache_manager.clone(),
677            storage: version.options.storage.clone(),
678            max_sequence: Some(max_sequence),
679            index_options: self.index_options.clone(),
680            index_config: self.engine_config.index.clone(),
681            inverted_index_config: self.engine_config.inverted_index.clone(),
682            fulltext_index_config: self.engine_config.fulltext_index.clone(),
683            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
684            #[cfg(feature = "vector_index")]
685            vector_index_config: self.engine_config.vector_index.clone(),
686        }
687    }
688
689    /// Notify flush job status.
690    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
691        if let Err(e) = self
692            .request_sender
693            .send(WorkerRequestWithTime::new(request))
694            .await
695        {
696            error!(
697                "Failed to notify flush job status for region {}, request: {:?}",
698                self.region_id, e.0
699            );
700        }
701    }
702
703    /// Merge two flush tasks.
704    fn merge(&mut self, mut other: RegionFlushTask) {
705        assert_eq!(self.region_id, other.region_id);
706        // Now we only merge senders. They share the same flush reason.
707        self.senders.append(&mut other.senders);
708    }
709}
710
711struct FlushFlatMemResult {
712    num_encoded: usize,
713    num_sources: usize,
714    results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
715}
716
717struct DoFlushMemtablesResult {
718    file_metas: Vec<FileMeta>,
719    flushed_bytes: u64,
720    series_count: usize,
721    encoded_part_count: usize,
722    flush_metrics: Metrics,
723}
724
725/// Returns a [Source] for the given memtable.
726async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
727    let source = if mem_ranges.ranges.len() == 1 {
728        let only_range = mem_ranges.ranges.into_values().next().unwrap();
729        let iter = only_range.build_iter()?;
730        Source::Iter(iter)
731    } else {
732        // todo(hl): a workaround since sync version of MergeReader is wip.
733        let sources = mem_ranges
734            .ranges
735            .into_values()
736            .map(|r| r.build_iter().map(Source::Iter))
737            .collect::<Result<Vec<_>>>()?;
738        let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
739        let maybe_dedup = if options.append_mode {
740            // no dedup in append mode
741            Box::new(merge_reader) as _
742        } else {
743            // dedup according to merge mode
744            match options.merge_mode.unwrap_or(MergeMode::LastRow) {
745                MergeMode::LastRow => {
746                    Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
747                }
748                MergeMode::LastNonNull => Box::new(DedupReader::new(
749                    merge_reader,
750                    LastNonNull::new(false),
751                    None,
752                )) as _,
753            }
754        };
755        Source::Reader(maybe_dedup)
756    };
757    Ok(source)
758}
759
760struct FlatSources {
761    sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
762    encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
763}
764
765/// Returns the max sequence and [FlatSource] for the given memtable.
766fn memtable_flat_sources(
767    schema: SchemaRef,
768    mem_ranges: MemtableRanges,
769    options: &RegionOptions,
770    field_column_start: usize,
771) -> Result<FlatSources> {
772    let MemtableRanges { ranges } = mem_ranges;
773    let mut flat_sources = FlatSources {
774        sources: SmallVec::new(),
775        encoded: SmallVec::new(),
776    };
777
778    if ranges.len() == 1 {
779        debug!("Flushing single flat range");
780
781        let only_range = ranges.into_values().next().unwrap();
782        let max_sequence = only_range.stats().max_sequence();
783        if let Some(encoded) = only_range.encoded() {
784            flat_sources.encoded.push((encoded, max_sequence));
785        } else {
786            let iter = only_range.build_record_batch_iter(None)?;
787            // Dedup according to append mode and merge mode.
788            // Even single range may have duplicate rows.
789            let iter = maybe_dedup_one(
790                options.append_mode,
791                options.merge_mode(),
792                field_column_start,
793                iter,
794            );
795            flat_sources
796                .sources
797                .push((FlatSource::Iter(iter), max_sequence));
798        };
799    } else {
800        let min_flush_rows = *ENCODE_ROW_THRESHOLD;
801        // Calculate total rows from non-encoded ranges.
802        let total_rows: usize = ranges
803            .values()
804            .filter(|r| r.encoded().is_none())
805            .map(|r| r.num_rows())
806            .sum();
807        debug!(
808            "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
809            total_rows,
810            min_flush_rows,
811            ranges.len()
812        );
813        let mut rows_remaining = total_rows;
814        let mut last_iter_rows = 0;
815        let num_ranges = ranges.len();
816        let mut input_iters = Vec::with_capacity(num_ranges);
817        let mut current_ranges = Vec::new();
818        for (_range_id, range) in ranges {
819            if let Some(encoded) = range.encoded() {
820                let max_sequence = range.stats().max_sequence();
821                flat_sources.encoded.push((encoded, max_sequence));
822                continue;
823            }
824
825            let iter = range.build_record_batch_iter(None)?;
826            input_iters.push(iter);
827            let range_rows = range.num_rows();
828            last_iter_rows += range_rows;
829            rows_remaining -= range_rows;
830            current_ranges.push(range);
831
832            // Flush if we have enough rows, but don't flush if the remaining rows
833            // would be less than DEFAULT_ROW_GROUP_SIZE (to avoid small last files).
834            if last_iter_rows >= min_flush_rows
835                && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
836            {
837                debug!(
838                    "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
839                    last_iter_rows,
840                    min_flush_rows,
841                    input_iters.len(),
842                    rows_remaining
843                );
844
845                // Calculate max_sequence from all merged ranges
846                let max_sequence = current_ranges
847                    .iter()
848                    .map(|r| r.stats().max_sequence())
849                    .max()
850                    .unwrap_or(0);
851
852                let maybe_dedup = merge_and_dedup(
853                    &schema,
854                    options.append_mode,
855                    options.merge_mode(),
856                    field_column_start,
857                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
858                )?;
859
860                flat_sources
861                    .sources
862                    .push((FlatSource::Iter(maybe_dedup), max_sequence));
863                last_iter_rows = 0;
864                current_ranges.clear();
865            }
866        }
867
868        // Handle remaining iters.
869        if !input_iters.is_empty() {
870            debug!(
871                "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
872                last_iter_rows,
873                min_flush_rows,
874                input_iters.len(),
875                rows_remaining
876            );
877            let max_sequence = current_ranges
878                .iter()
879                .map(|r| r.stats().max_sequence())
880                .max()
881                .unwrap_or(0);
882
883            let maybe_dedup = merge_and_dedup(
884                &schema,
885                options.append_mode,
886                options.merge_mode(),
887                field_column_start,
888                input_iters,
889            )?;
890
891            flat_sources
892                .sources
893                .push((FlatSource::Iter(maybe_dedup), max_sequence));
894        }
895    }
896
897    Ok(flat_sources)
898}
899
900/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
901///
902/// This function is used during the flush process to combine data from multiple memtable ranges
903/// into a single stream while handling duplicate records according to the configured merge strategy.
904///
905/// # Arguments
906///
907/// * `schema` - The Arrow schema reference that defines the structure of the record batches
908/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
909///                  This is used for append-only workloads where duplicate handling is not required.
910/// * `merge_mode` - The strategy used for deduplication when not in append mode:
911///   - `MergeMode::LastRow`: Keeps the last record for each primary key
912///   - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
913/// * `field_column_start` - The starting column index for fields in the record batch.
914///                          Used when `MergeMode::LastNonNull` to identify which columns
915///                          contain field values versus primary key columns.
916/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
917///
918/// # Returns
919///
920/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
921/// record batches.
922///
923/// # Behavior
924///
925/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
926///    primary key and timestamp
927/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
928/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
929///    applies the specified merge mode:
930///    - `LastRow`: Removes duplicate rows, keeping only the last one
931///    - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
932///
933/// # Examples
934///
935/// ```ignore
936/// let merged_iter = merge_and_dedup(
937///     &schema,
938///     false,  // not append mode, apply dedup
939///     MergeMode::LastRow,
940///     2,  // fields start at column 2 after primary key columns
941///     vec![iter1, iter2, iter3],
942/// )?;
943/// ```
944pub fn merge_and_dedup(
945    schema: &SchemaRef,
946    append_mode: bool,
947    merge_mode: MergeMode,
948    field_column_start: usize,
949    input_iters: Vec<BoxedRecordBatchIterator>,
950) -> Result<BoxedRecordBatchIterator> {
951    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
952    let maybe_dedup = if append_mode {
953        // No dedup in append mode
954        Box::new(merge_iter) as _
955    } else {
956        // Dedup according to merge mode.
957        match merge_mode {
958            MergeMode::LastRow => {
959                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
960            }
961            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
962                merge_iter,
963                FlatLastNonNull::new(field_column_start, false),
964            )) as _,
965        }
966    };
967    Ok(maybe_dedup)
968}
969
970pub fn maybe_dedup_one(
971    append_mode: bool,
972    merge_mode: MergeMode,
973    field_column_start: usize,
974    input_iter: BoxedRecordBatchIterator,
975) -> BoxedRecordBatchIterator {
976    if append_mode {
977        // No dedup in append mode
978        input_iter
979    } else {
980        // Dedup according to merge mode.
981        match merge_mode {
982            MergeMode::LastRow => {
983                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
984            }
985            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
986                input_iter,
987                FlatLastNonNull::new(field_column_start, false),
988            )),
989        }
990    }
991}
992
993/// Manages background flushes of a worker.
994pub(crate) struct FlushScheduler {
995    /// Tracks regions need to flush.
996    region_status: HashMap<RegionId, FlushStatus>,
997    /// Background job scheduler.
998    scheduler: SchedulerRef,
999}
1000
1001impl FlushScheduler {
1002    /// Creates a new flush scheduler.
1003    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
1004        FlushScheduler {
1005            region_status: HashMap::new(),
1006            scheduler,
1007        }
1008    }
1009
1010    /// Returns true if the region already requested flush.
1011    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
1012        self.region_status.contains_key(&region_id)
1013    }
1014
1015    fn schedule_flush_task(
1016        &mut self,
1017        version_control: &VersionControlRef,
1018        task: RegionFlushTask,
1019    ) -> Result<()> {
1020        let region_id = task.region_id;
1021
1022        // If current region doesn't have flush status, we can flush the region directly.
1023        if let Err(e) = version_control.freeze_mutable() {
1024            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
1025
1026            return Err(e);
1027        }
1028        // Submit a flush job.
1029        let job = task.into_flush_job(version_control);
1030        if let Err(e) = self.scheduler.schedule(job) {
1031            // If scheduler returns error, senders in the job will be dropped and waiters
1032            // can get recv errors.
1033            error!(e; "Failed to schedule flush job for region {}", region_id);
1034
1035            return Err(e);
1036        }
1037        Ok(())
1038    }
1039
1040    /// Schedules a flush `task` for specific `region`.
1041    pub(crate) fn schedule_flush(
1042        &mut self,
1043        region_id: RegionId,
1044        version_control: &VersionControlRef,
1045        task: RegionFlushTask,
1046    ) -> Result<()> {
1047        debug_assert_eq!(region_id, task.region_id);
1048
1049        let version = version_control.current().version;
1050        if version.memtables.is_empty() {
1051            debug_assert!(!self.region_status.contains_key(&region_id));
1052            // The region has nothing to flush.
1053            task.on_success();
1054            return Ok(());
1055        }
1056
1057        // Don't increase the counter if a region has nothing to flush.
1058        FLUSH_REQUESTS_TOTAL
1059            .with_label_values(&[task.reason.as_str()])
1060            .inc();
1061
1062        // If current region has flush status, merge the task.
1063        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
1064            // Checks whether we can flush the region now.
1065            debug!("Merging flush task for region {}", region_id);
1066            flush_status.merge_task(task);
1067            return Ok(());
1068        }
1069
1070        self.schedule_flush_task(version_control, task)?;
1071
1072        // Add this region to status map.
1073        let _ = self.region_status.insert(
1074            region_id,
1075            FlushStatus::new(region_id, version_control.clone()),
1076        );
1077
1078        Ok(())
1079    }
1080
1081    /// Notifies the scheduler that the flush job is finished.
1082    ///
1083    /// Returns all pending requests if the region doesn't need to flush again.
1084    pub(crate) fn on_flush_success(
1085        &mut self,
1086        region_id: RegionId,
1087    ) -> Option<(
1088        Vec<SenderDdlRequest>,
1089        Vec<SenderWriteRequest>,
1090        Vec<SenderBulkRequest>,
1091    )> {
1092        let flush_status = self.region_status.get_mut(&region_id)?;
1093        // If region doesn't have any pending flush task, we need to remove it from the status.
1094        if flush_status.pending_task.is_none() {
1095            // The region doesn't have any pending flush task.
1096            // Safety: The flush status must exist.
1097            debug!(
1098                "Region {} doesn't have any pending flush task, removing it from the status",
1099                region_id
1100            );
1101            let flush_status = self.region_status.remove(&region_id).unwrap();
1102            return Some((
1103                flush_status.pending_ddls,
1104                flush_status.pending_writes,
1105                flush_status.pending_bulk_writes,
1106            ));
1107        }
1108
1109        // If region has pending task, but has nothing to flush, we need to remove it from the status.
1110        let version_data = flush_status.version_control.current();
1111        if version_data.version.memtables.is_empty() {
1112            // The region has nothing to flush, we also need to remove it from the status.
1113            // Safety: The pending task is not None.
1114            let task = flush_status.pending_task.take().unwrap();
1115            // The region has nothing to flush. We can notify pending task.
1116            task.on_success();
1117            debug!(
1118                "Region {} has nothing to flush, removing it from the status",
1119                region_id
1120            );
1121            // Safety: The flush status must exist.
1122            let flush_status = self.region_status.remove(&region_id).unwrap();
1123            return Some((
1124                flush_status.pending_ddls,
1125                flush_status.pending_writes,
1126                flush_status.pending_bulk_writes,
1127            ));
1128        }
1129
1130        // If region has pending task and has something to flush, we need to schedule it.
1131        debug!("Scheduling pending flush task for region {}", region_id);
1132        // Safety: The flush status must exist.
1133        let task = flush_status.pending_task.take().unwrap();
1134        let version_control = flush_status.version_control.clone();
1135        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1136            error!(
1137                err;
1138                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1139            );
1140        }
1141        // We can flush the region again, keep it in the region status.
1142        None
1143    }
1144
1145    /// Notifies the scheduler that the flush job is failed.
1146    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1147        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1148
1149        FLUSH_FAILURE_TOTAL.inc();
1150
1151        // Remove this region.
1152        let Some(flush_status) = self.region_status.remove(&region_id) else {
1153            return;
1154        };
1155
1156        // Fast fail: cancels all pending tasks and sends error to their waiters.
1157        flush_status.on_failure(err);
1158    }
1159
1160    /// Notifies the scheduler that the region is dropped.
1161    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1162        self.remove_region_on_failure(
1163            region_id,
1164            Arc::new(RegionDroppedSnafu { region_id }.build()),
1165        );
1166    }
1167
1168    /// Notifies the scheduler that the region is closed.
1169    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1170        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1171    }
1172
1173    /// Notifies the scheduler that the region is truncated.
1174    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1175        self.remove_region_on_failure(
1176            region_id,
1177            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1178        );
1179    }
1180
1181    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1182        // Remove this region.
1183        let Some(flush_status) = self.region_status.remove(&region_id) else {
1184            return;
1185        };
1186
1187        // Notifies all pending tasks.
1188        flush_status.on_failure(err);
1189    }
1190
1191    /// Add ddl request to pending queue.
1192    ///
1193    /// # Panics
1194    /// Panics if region didn't request flush.
1195    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1196        let status = self.region_status.get_mut(&request.region_id).unwrap();
1197        status.pending_ddls.push(request);
1198    }
1199
1200    /// Add write request to pending queue.
1201    ///
1202    /// # Panics
1203    /// Panics if region didn't request flush.
1204    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1205        let status = self
1206            .region_status
1207            .get_mut(&request.request.region_id)
1208            .unwrap();
1209        status.pending_writes.push(request);
1210    }
1211
1212    /// Add bulk write request to pending queue.
1213    ///
1214    /// # Panics
1215    /// Panics if region didn't request flush.
1216    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1217        let status = self.region_status.get_mut(&request.region_id).unwrap();
1218        status.pending_bulk_writes.push(request);
1219    }
1220
1221    /// Returns true if the region has pending DDLs.
1222    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1223        self.region_status
1224            .get(&region_id)
1225            .map(|status| !status.pending_ddls.is_empty())
1226            .unwrap_or(false)
1227    }
1228}
1229
1230impl Drop for FlushScheduler {
1231    fn drop(&mut self) {
1232        for (region_id, flush_status) in self.region_status.drain() {
1233            // We are shutting down so notify all pending tasks.
1234            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1235        }
1236    }
1237}
1238
1239/// Flush status of a region scheduled by the [FlushScheduler].
1240///
1241/// Tracks running and pending flush tasks and all pending requests of a region.
1242struct FlushStatus {
1243    /// Current region.
1244    region_id: RegionId,
1245    /// Version control of the region.
1246    version_control: VersionControlRef,
1247    /// Task waiting for next flush.
1248    pending_task: Option<RegionFlushTask>,
1249    /// Pending ddl requests.
1250    pending_ddls: Vec<SenderDdlRequest>,
1251    /// Requests waiting to write after altering the region.
1252    pending_writes: Vec<SenderWriteRequest>,
1253    /// Bulk requests waiting to write after altering the region.
1254    pending_bulk_writes: Vec<SenderBulkRequest>,
1255}
1256
1257impl FlushStatus {
1258    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1259        FlushStatus {
1260            region_id,
1261            version_control,
1262            pending_task: None,
1263            pending_ddls: Vec::new(),
1264            pending_writes: Vec::new(),
1265            pending_bulk_writes: Vec::new(),
1266        }
1267    }
1268
1269    /// Merges the task to pending task.
1270    fn merge_task(&mut self, task: RegionFlushTask) {
1271        if let Some(pending) = &mut self.pending_task {
1272            pending.merge(task);
1273        } else {
1274            self.pending_task = Some(task);
1275        }
1276    }
1277
1278    fn on_failure(self, err: Arc<Error>) {
1279        if let Some(mut task) = self.pending_task {
1280            task.on_failure(err.clone());
1281        }
1282        for ddl in self.pending_ddls {
1283            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1284                region_id: self.region_id,
1285            }));
1286        }
1287        for write_req in self.pending_writes {
1288            write_req
1289                .sender
1290                .send(Err(err.clone()).context(FlushRegionSnafu {
1291                    region_id: self.region_id,
1292                }));
1293        }
1294    }
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299    use mito_codec::row_converter::build_primary_key_codec;
1300    use tokio::sync::oneshot;
1301
1302    use super::*;
1303    use crate::cache::CacheManager;
1304    use crate::memtable::bulk::part::BulkPartConverter;
1305    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1306    use crate::memtable::{Memtable, RangesOptions};
1307    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1308    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1309    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1310    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1311
1312    #[test]
1313    fn test_get_mutable_limit() {
1314        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1315        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1316        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1317        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1318    }
1319
1320    #[test]
1321    fn test_over_mutable_limit() {
1322        // Mutable limit is 500.
1323        let manager = WriteBufferManagerImpl::new(1000);
1324        manager.reserve_mem(400);
1325        assert!(!manager.should_flush_engine());
1326        assert!(!manager.should_stall());
1327
1328        // More than mutable limit.
1329        manager.reserve_mem(400);
1330        assert!(manager.should_flush_engine());
1331
1332        // Freezes mutable.
1333        manager.schedule_free_mem(400);
1334        assert!(!manager.should_flush_engine());
1335        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1336        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1337
1338        // Releases immutable.
1339        manager.free_mem(400);
1340        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1341        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1342    }
1343
1344    #[test]
1345    fn test_over_global() {
1346        // Mutable limit is 500.
1347        let manager = WriteBufferManagerImpl::new(1000);
1348        manager.reserve_mem(1100);
1349        assert!(manager.should_stall());
1350        // Global usage is still 1100.
1351        manager.schedule_free_mem(200);
1352        assert!(manager.should_flush_engine());
1353        assert!(manager.should_stall());
1354
1355        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1356        manager.schedule_free_mem(450);
1357        assert!(manager.should_flush_engine());
1358        assert!(manager.should_stall());
1359
1360        // Now mutable is enough.
1361        manager.reserve_mem(50);
1362        assert!(manager.should_flush_engine());
1363        manager.reserve_mem(100);
1364        assert!(manager.should_flush_engine());
1365    }
1366
1367    #[test]
1368    fn test_manager_notify() {
1369        let (sender, receiver) = watch::channel(());
1370        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1371        manager.reserve_mem(500);
1372        assert!(!receiver.has_changed().unwrap());
1373        manager.schedule_free_mem(500);
1374        assert!(!receiver.has_changed().unwrap());
1375        manager.free_mem(500);
1376        assert!(receiver.has_changed().unwrap());
1377    }
1378
1379    #[tokio::test]
1380    async fn test_schedule_empty() {
1381        let env = SchedulerEnv::new().await;
1382        let (tx, _rx) = mpsc::channel(4);
1383        let mut scheduler = env.mock_flush_scheduler();
1384        let builder = VersionControlBuilder::new();
1385
1386        let version_control = Arc::new(builder.build());
1387        let (output_tx, output_rx) = oneshot::channel();
1388        let mut task = RegionFlushTask {
1389            region_id: builder.region_id(),
1390            reason: FlushReason::Others,
1391            senders: Vec::new(),
1392            request_sender: tx,
1393            access_layer: env.access_layer.clone(),
1394            listener: WorkerListener::default(),
1395            engine_config: Arc::new(MitoConfig::default()),
1396            row_group_size: None,
1397            cache_manager: Arc::new(CacheManager::default()),
1398            manifest_ctx: env
1399                .mock_manifest_context(version_control.current().version.metadata.clone())
1400                .await,
1401            index_options: IndexOptions::default(),
1402            flush_semaphore: Arc::new(Semaphore::new(2)),
1403            is_staging: false,
1404            partition_expr: None,
1405        };
1406        task.push_sender(OptionOutputTx::from(output_tx));
1407        scheduler
1408            .schedule_flush(builder.region_id(), &version_control, task)
1409            .unwrap();
1410        assert!(scheduler.region_status.is_empty());
1411        let output = output_rx.await.unwrap().unwrap();
1412        assert_eq!(output, 0);
1413        assert!(scheduler.region_status.is_empty());
1414    }
1415
1416    #[tokio::test]
1417    async fn test_schedule_pending_request() {
1418        let job_scheduler = Arc::new(VecScheduler::default());
1419        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1420        let (tx, _rx) = mpsc::channel(4);
1421        let mut scheduler = env.mock_flush_scheduler();
1422        let mut builder = VersionControlBuilder::new();
1423        // Overwrites the empty memtable builder.
1424        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1425        let version_control = Arc::new(builder.build());
1426        // Writes data to the memtable so it is not empty.
1427        let version_data = version_control.current();
1428        write_rows_to_version(&version_data.version, "host0", 0, 10);
1429        let manifest_ctx = env
1430            .mock_manifest_context(version_data.version.metadata.clone())
1431            .await;
1432        // Creates 3 tasks.
1433        let mut tasks: Vec<_> = (0..3)
1434            .map(|_| RegionFlushTask {
1435                region_id: builder.region_id(),
1436                reason: FlushReason::Others,
1437                senders: Vec::new(),
1438                request_sender: tx.clone(),
1439                access_layer: env.access_layer.clone(),
1440                listener: WorkerListener::default(),
1441                engine_config: Arc::new(MitoConfig::default()),
1442                row_group_size: None,
1443                cache_manager: Arc::new(CacheManager::default()),
1444                manifest_ctx: manifest_ctx.clone(),
1445                index_options: IndexOptions::default(),
1446                flush_semaphore: Arc::new(Semaphore::new(2)),
1447                is_staging: false,
1448                partition_expr: None,
1449            })
1450            .collect();
1451        // Schedule first task.
1452        let task = tasks.pop().unwrap();
1453        scheduler
1454            .schedule_flush(builder.region_id(), &version_control, task)
1455            .unwrap();
1456        // Should schedule 1 flush.
1457        assert_eq!(1, scheduler.region_status.len());
1458        assert_eq!(1, job_scheduler.num_jobs());
1459        // Check the new version.
1460        let version_data = version_control.current();
1461        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1462        // Schedule remaining tasks.
1463        let output_rxs: Vec<_> = tasks
1464            .into_iter()
1465            .map(|mut task| {
1466                let (output_tx, output_rx) = oneshot::channel();
1467                task.push_sender(OptionOutputTx::from(output_tx));
1468                scheduler
1469                    .schedule_flush(builder.region_id(), &version_control, task)
1470                    .unwrap();
1471                output_rx
1472            })
1473            .collect();
1474        // Assumes the flush job is finished.
1475        version_control.apply_edit(
1476            Some(RegionEdit {
1477                files_to_add: Vec::new(),
1478                files_to_remove: Vec::new(),
1479                timestamp_ms: None,
1480                compaction_time_window: None,
1481                flushed_entry_id: None,
1482                flushed_sequence: None,
1483                committed_sequence: None,
1484            }),
1485            &[0],
1486            builder.file_purger(),
1487        );
1488        scheduler.on_flush_success(builder.region_id());
1489        // No new flush task.
1490        assert_eq!(1, job_scheduler.num_jobs());
1491        // The flush status is cleared.
1492        assert!(scheduler.region_status.is_empty());
1493        for output_rx in output_rxs {
1494            let output = output_rx.await.unwrap().unwrap();
1495            assert_eq!(output, 0);
1496        }
1497    }
1498
1499    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1500    #[test]
1501    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1502        // Build test metadata and flat schema
1503        let metadata = metadata_for_test();
1504        let schema = to_flat_sst_arrow_schema(
1505            &metadata,
1506            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1507        );
1508
1509        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1510        // Two rows with identical keys and timestamps (ts = 1000), different field values
1511        let capacity = 16;
1512        let pk_codec = build_primary_key_codec(&metadata);
1513        let mut converter =
1514            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1515        let kvs = build_key_values_with_ts_seq_values(
1516            &metadata,
1517            "dup_key".to_string(),
1518            1,
1519            vec![1000i64, 1000i64].into_iter(),
1520            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1521            1,
1522        );
1523        converter.append_key_values(&kvs).unwrap();
1524        let part = converter.convert().unwrap();
1525
1526        // Helper to build MemtableRanges with a single range from one bulk part.
1527        // We use BulkMemtable directly because it produces record batch iterators.
1528        let build_ranges = |append_mode: bool| -> MemtableRanges {
1529            let memtable = crate::memtable::bulk::BulkMemtable::new(
1530                1,
1531                crate::memtable::bulk::BulkMemtableConfig::default(),
1532                metadata.clone(),
1533                None,
1534                None,
1535                append_mode,
1536                MergeMode::LastRow,
1537            );
1538            memtable.write_bulk(part.clone()).unwrap();
1539            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1540        };
1541
1542        // Case 1: append_mode = false => dedup happens, total rows should be 1
1543        {
1544            let mem_ranges = build_ranges(false);
1545            assert_eq!(1, mem_ranges.ranges.len());
1546
1547            let options = RegionOptions {
1548                append_mode: false,
1549                merge_mode: Some(MergeMode::LastRow),
1550                ..Default::default()
1551            };
1552
1553            let flat_sources = memtable_flat_sources(
1554                schema.clone(),
1555                mem_ranges,
1556                &options,
1557                metadata.primary_key.len(),
1558            )
1559            .unwrap();
1560            assert!(flat_sources.encoded.is_empty());
1561            assert_eq!(1, flat_sources.sources.len());
1562
1563            // Consume the iterator and count rows
1564            let mut total_rows = 0usize;
1565            for (source, _sequence) in flat_sources.sources {
1566                match source {
1567                    crate::read::FlatSource::Iter(iter) => {
1568                        for rb in iter {
1569                            total_rows += rb.unwrap().num_rows();
1570                        }
1571                    }
1572                    crate::read::FlatSource::Stream(_) => unreachable!(),
1573                }
1574            }
1575            assert_eq!(1, total_rows, "dedup should keep a single row");
1576        }
1577
1578        // Case 2: append_mode = true => no dedup, total rows should be 2
1579        {
1580            let mem_ranges = build_ranges(true);
1581            assert_eq!(1, mem_ranges.ranges.len());
1582
1583            let options = RegionOptions {
1584                append_mode: true,
1585                ..Default::default()
1586            };
1587
1588            let flat_sources =
1589                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1590                    .unwrap();
1591            assert!(flat_sources.encoded.is_empty());
1592            assert_eq!(1, flat_sources.sources.len());
1593
1594            let mut total_rows = 0usize;
1595            for (source, _sequence) in flat_sources.sources {
1596                match source {
1597                    crate::read::FlatSource::Iter(iter) => {
1598                        for rb in iter {
1599                            total_rows += rb.unwrap().num_rows();
1600                        }
1601                    }
1602                    crate::read::FlatSource::Stream(_) => unreachable!(),
1603                }
1604            }
1605            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1606        }
1607    }
1608
1609    #[tokio::test]
1610    async fn test_schedule_pending_request_on_flush_success() {
1611        common_telemetry::init_default_ut_logging();
1612        let job_scheduler = Arc::new(VecScheduler::default());
1613        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1614        let (tx, _rx) = mpsc::channel(4);
1615        let mut scheduler = env.mock_flush_scheduler();
1616        let mut builder = VersionControlBuilder::new();
1617        // Overwrites the empty memtable builder.
1618        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1619        let version_control = Arc::new(builder.build());
1620        // Writes data to the memtable so it is not empty.
1621        let version_data = version_control.current();
1622        write_rows_to_version(&version_data.version, "host0", 0, 10);
1623        let manifest_ctx = env
1624            .mock_manifest_context(version_data.version.metadata.clone())
1625            .await;
1626        // Creates 2 tasks.
1627        let mut tasks: Vec<_> = (0..2)
1628            .map(|_| RegionFlushTask {
1629                region_id: builder.region_id(),
1630                reason: FlushReason::Others,
1631                senders: Vec::new(),
1632                request_sender: tx.clone(),
1633                access_layer: env.access_layer.clone(),
1634                listener: WorkerListener::default(),
1635                engine_config: Arc::new(MitoConfig::default()),
1636                row_group_size: None,
1637                cache_manager: Arc::new(CacheManager::default()),
1638                manifest_ctx: manifest_ctx.clone(),
1639                index_options: IndexOptions::default(),
1640                flush_semaphore: Arc::new(Semaphore::new(2)),
1641                is_staging: false,
1642                partition_expr: None,
1643            })
1644            .collect();
1645        // Schedule first task.
1646        let task = tasks.pop().unwrap();
1647        scheduler
1648            .schedule_flush(builder.region_id(), &version_control, task)
1649            .unwrap();
1650        // Should schedule 1 flush.
1651        assert_eq!(1, scheduler.region_status.len());
1652        assert_eq!(1, job_scheduler.num_jobs());
1653        // Schedule second task.
1654        let task = tasks.pop().unwrap();
1655        scheduler
1656            .schedule_flush(builder.region_id(), &version_control, task)
1657            .unwrap();
1658        assert!(
1659            scheduler
1660                .region_status
1661                .get(&builder.region_id())
1662                .unwrap()
1663                .pending_task
1664                .is_some()
1665        );
1666
1667        // Check the new version.
1668        let version_data = version_control.current();
1669        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1670        // Assumes the flush job is finished.
1671        version_control.apply_edit(
1672            Some(RegionEdit {
1673                files_to_add: Vec::new(),
1674                files_to_remove: Vec::new(),
1675                timestamp_ms: None,
1676                compaction_time_window: None,
1677                flushed_entry_id: None,
1678                flushed_sequence: None,
1679                committed_sequence: None,
1680            }),
1681            &[0],
1682            builder.file_purger(),
1683        );
1684        write_rows_to_version(&version_data.version, "host1", 0, 10);
1685        scheduler.on_flush_success(builder.region_id());
1686        assert_eq!(2, job_scheduler.num_jobs());
1687        // The pending task is cleared.
1688        assert!(
1689            scheduler
1690                .region_status
1691                .get(&builder.region_id())
1692                .unwrap()
1693                .pending_task
1694                .is_none()
1695        );
1696    }
1697}