Skip to main content

mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use bytes::Bytes;
24use common_telemetry::{debug, error, info};
25use datatypes::arrow::datatypes::SchemaRef;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::region_request::RegionFlushReason;
30use store_api::storage::{RegionId, SequenceNumber};
31use strum::IntoStaticStr;
32use tokio::sync::{Semaphore, mpsc, watch};
33
34use crate::access_layer::{
35    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
36};
37use crate::cache::CacheManagerRef;
38use crate::config::MitoConfig;
39use crate::engine::region_hook::SstFileInfo;
40use crate::error::{
41    Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
42    RegionTruncatedSnafu, Result,
43};
44use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
45use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
46use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
47use crate::metrics::{
48    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
49    INFLIGHT_FLUSH_COUNT,
50};
51use crate::read::FlatSource;
52use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
53use crate::read::flat_merge::FlatMergeIterator;
54use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
55use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
56use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
57use crate::request::{
58    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
59    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
60};
61use crate::schedule::scheduler::{Job, SchedulerRef};
62use crate::sst::file::FileMeta;
63use crate::sst::parquet::metadata::extract_primary_key_range;
64use crate::sst::parquet::{
65    DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
66};
67use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
68use crate::worker::WorkerListener;
69
70/// Global write buffer (memtable) manager.
71///
72/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
73pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
74    /// Returns whether to trigger the engine.
75    fn should_flush_engine(&self) -> bool;
76
77    /// Returns whether to stall write requests.
78    fn should_stall(&self) -> bool;
79
80    /// Reserves `mem` bytes.
81    fn reserve_mem(&self, mem: usize);
82
83    /// Tells the manager we are freeing `mem` bytes.
84    ///
85    /// We are in the process of freeing `mem` bytes, so it is not considered
86    /// when checking the soft limit.
87    fn schedule_free_mem(&self, mem: usize);
88
89    /// We have freed `mem` bytes.
90    fn free_mem(&self, mem: usize);
91
92    /// Returns the total memory used by memtables.
93    fn memory_usage(&self) -> usize;
94
95    /// Returns the mutable memtable memory limit.
96    ///
97    /// The write buffer manager should flush memtables when the mutable memory usage
98    /// exceeds this limit.
99    fn flush_limit(&self) -> usize;
100}
101
102pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
103
104/// Default [WriteBufferManager] implementation.
105///
106/// Inspired by RocksDB's WriteBufferManager.
107/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
108#[derive(Debug)]
109pub struct WriteBufferManagerImpl {
110    /// Write buffer size for the engine.
111    global_write_buffer_size: usize,
112    /// Mutable memtable memory size limit.
113    mutable_limit: usize,
114    /// Memory in used (e.g. used by mutable and immutable memtables).
115    memory_used: AtomicUsize,
116    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
117    memory_active: AtomicUsize,
118    /// Optional notifier.
119    /// The manager can wake up the worker once we free the write buffer.
120    notifier: Option<watch::Sender<()>>,
121}
122
123impl WriteBufferManagerImpl {
124    /// Returns a new manager with specific `global_write_buffer_size`.
125    pub fn new(global_write_buffer_size: usize) -> Self {
126        Self {
127            global_write_buffer_size,
128            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
129            memory_used: AtomicUsize::new(0),
130            memory_active: AtomicUsize::new(0),
131            notifier: None,
132        }
133    }
134
135    /// Attaches a notifier to the manager.
136    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
137        self.notifier = Some(notifier);
138        self
139    }
140
141    /// Returns memory usage of mutable memtables.
142    pub fn mutable_usage(&self) -> usize {
143        self.memory_active.load(Ordering::Relaxed)
144    }
145
146    /// Returns the size limit for mutable memtables.
147    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
148        // Reserves half of the write buffer for mutable memtable.
149        global_write_buffer_size / 2
150    }
151}
152
153impl WriteBufferManager for WriteBufferManagerImpl {
154    fn should_flush_engine(&self) -> bool {
155        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
156        if mutable_memtable_memory_usage >= self.mutable_limit {
157            debug!(
158                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
159                mutable_memtable_memory_usage,
160                self.memory_usage(),
161                self.mutable_limit,
162                self.global_write_buffer_size,
163            );
164            return true;
165        }
166
167        let memory_usage = self.memory_used.load(Ordering::Relaxed);
168        if memory_usage >= self.global_write_buffer_size {
169            return true;
170        }
171
172        false
173    }
174
175    fn should_stall(&self) -> bool {
176        self.memory_usage() >= self.global_write_buffer_size
177    }
178
179    fn reserve_mem(&self, mem: usize) {
180        self.memory_used.fetch_add(mem, Ordering::Relaxed);
181        self.memory_active.fetch_add(mem, Ordering::Relaxed);
182    }
183
184    fn schedule_free_mem(&self, mem: usize) {
185        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
186    }
187
188    fn free_mem(&self, mem: usize) {
189        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
190        if let Some(notifier) = &self.notifier {
191            // Notifies the worker after the memory usage is decreased. When we drop the memtable
192            // outside of the worker, the worker may still stall requests because the memory usage
193            // is not updated. So we need to notify the worker to handle stalled requests again.
194            let _ = notifier.send(());
195        }
196    }
197
198    fn memory_usage(&self) -> usize {
199        self.memory_used.load(Ordering::Relaxed)
200    }
201
202    fn flush_limit(&self) -> usize {
203        self.mutable_limit
204    }
205}
206
207/// Reason of a flush task.
208#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
209pub enum FlushReason {
210    /// Engine reaches flush threshold.
211    EngineFull,
212    /// Manual flush.
213    Manual,
214    /// Flush to alter table.
215    Alter,
216    /// Flush periodically.
217    Periodically,
218    /// Flush memtable during downgrading state.
219    Downgrading,
220    /// Enter staging mode.
221    EnterStaging,
222    /// Flush when region is closing.
223    Closing,
224    /// Flush triggered before region migration.
225    RegionMigration,
226    /// Flush triggered by repartition procedure.
227    Repartition,
228    /// Flush triggered by remote WAL pruning.
229    RemoteWalPrune,
230}
231
232impl FlushReason {
233    /// Get flush reason as static str.
234    fn as_str(&self) -> &'static str {
235        self.into()
236    }
237}
238
239impl From<RegionFlushReason> for FlushReason {
240    fn from(reason: RegionFlushReason) -> Self {
241        match reason {
242            RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
243            RegionFlushReason::Repartition => FlushReason::Repartition,
244            RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
245            RegionFlushReason::Closing => FlushReason::Closing,
246            RegionFlushReason::Downgrading => FlushReason::Downgrading,
247        }
248    }
249}
250
251/// Task to flush a region.
252pub(crate) struct RegionFlushTask {
253    /// Region to flush.
254    pub(crate) region_id: RegionId,
255    /// Reason to flush.
256    pub(crate) reason: FlushReason,
257    /// Flush result senders.
258    pub(crate) senders: Vec<OutputTx>,
259    /// Request sender to notify the worker.
260    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
261
262    pub(crate) access_layer: AccessLayerRef,
263    pub(crate) listener: WorkerListener,
264    pub(crate) engine_config: Arc<MitoConfig>,
265    pub(crate) row_group_size: Option<usize>,
266    pub(crate) cache_manager: CacheManagerRef,
267    pub(crate) manifest_ctx: ManifestContextRef,
268
269    /// Index options for the region.
270    pub(crate) index_options: IndexOptions,
271    /// Semaphore to control flush concurrency.
272    pub(crate) flush_semaphore: Arc<Semaphore>,
273    /// Whether the region is in staging mode.
274    pub(crate) is_staging: bool,
275    /// Partition expression of the region.
276    ///
277    /// This is used to generate the file meta.
278    pub(crate) partition_expr: Option<String>,
279}
280
281impl RegionFlushTask {
282    /// Push the sender if it is not none.
283    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
284        if let Some(sender) = sender.take_inner() {
285            self.senders.push(sender);
286        }
287    }
288
289    /// Consumes the task and notify the sender the job is success.
290    fn on_success(self) {
291        for sender in self.senders {
292            sender.send(Ok(0));
293        }
294    }
295
296    /// Send flush error to waiter.
297    fn on_failure(&mut self, err: Arc<Error>) {
298        for sender in self.senders.drain(..) {
299            sender.send(Err(err.clone()).context(FlushRegionSnafu {
300                region_id: self.region_id,
301            }));
302        }
303    }
304
305    /// Converts the flush task into a background job.
306    ///
307    /// We must call this in the region worker.
308    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
309        // Get a version of this region before creating a job to get current
310        // wal entry id, sequence and immutable memtables.
311        let version_data = version_control.current();
312
313        Box::pin(async move {
314            INFLIGHT_FLUSH_COUNT.inc();
315            self.do_flush(version_data).await;
316            INFLIGHT_FLUSH_COUNT.dec();
317        })
318    }
319
320    /// Runs the flush task.
321    async fn do_flush(&mut self, version_data: VersionControlData) {
322        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
323        self.listener.on_flush_begin(self.region_id).await;
324
325        let worker_request = match self.flush_memtables(&version_data).await {
326            Ok(edit) => {
327                let memtables_to_remove = version_data
328                    .version
329                    .memtables
330                    .immutables()
331                    .iter()
332                    .map(|m| m.id())
333                    .collect();
334                let flush_finished = FlushFinished {
335                    region_id: self.region_id,
336                    // The last entry has been flushed.
337                    flushed_entry_id: version_data.last_entry_id,
338                    senders: std::mem::take(&mut self.senders),
339                    _timer: timer,
340                    edit,
341                    memtables_to_remove,
342                    is_staging: self.is_staging,
343                    flush_reason: self.reason,
344                };
345                WorkerRequest::Background {
346                    region_id: self.region_id,
347                    notify: BackgroundNotify::FlushFinished(flush_finished),
348                }
349            }
350            Err(e) => {
351                error!(e; "Failed to flush region {}", self.region_id);
352                // Discard the timer.
353                timer.stop_and_discard();
354
355                let err = Arc::new(e);
356                self.on_failure(err.clone());
357                WorkerRequest::Background {
358                    region_id: self.region_id,
359                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
360                }
361            }
362        };
363        self.send_worker_request(worker_request).await;
364    }
365
366    /// Flushes memtables to level 0 SSTs and updates the manifest.
367    /// Returns the [RegionEdit] to apply.
368    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
369        // We must use the immutable memtables list and entry ids from the `version_data`
370        // for consistency as others might already modify the version in the `version_control`.
371        let version = &version_data.version;
372        let timer = FLUSH_ELAPSED
373            .with_label_values(&["flush_memtables"])
374            .start_timer();
375
376        let mut write_opts = WriteOptions {
377            write_buffer_size: self.engine_config.sst_write_buffer_size,
378            ..Default::default()
379        };
380        if let Some(row_group_size) = self.row_group_size {
381            write_opts.row_group_size = row_group_size;
382        }
383
384        let DoFlushMemtablesResult {
385            file_metas,
386            flushed_bytes,
387            series_count,
388            encoded_part_count,
389            flush_metrics,
390            sst_infos,
391        } = self.do_flush_memtables(version, write_opts).await?;
392
393        if !file_metas.is_empty() {
394            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
395        }
396
397        let mut file_ids = Vec::with_capacity(file_metas.len());
398        let mut total_rows = 0;
399        let mut total_bytes = 0;
400        for meta in &file_metas {
401            file_ids.push(meta.file_id);
402            total_rows += meta.num_rows;
403            total_bytes += meta.file_size;
404        }
405        info!(
406            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
407            self.region_id,
408            self.reason.as_str(),
409            file_ids,
410            series_count,
411            total_rows,
412            total_bytes,
413            timer.stop_and_record(),
414            encoded_part_count,
415            flush_metrics,
416        );
417        flush_metrics.observe();
418
419        let hook = self.manifest_ctx.hook();
420        if let Some(hook) = &hook {
421            let files: Vec<SstFileInfo<'_>> = sst_infos
422                .iter()
423                .zip(file_metas.iter())
424                .map(|(sst_info, file_meta)| SstFileInfo {
425                    sst_info_ref: sst_info,
426                    file_meta,
427                })
428                .collect();
429            hook.on_sst_files_written(self.region_id, &version.metadata, &files)
430                .await;
431        }
432
433        let edit = RegionEdit {
434            files_to_add: file_metas,
435            files_to_remove: Vec::new(),
436            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
437            compaction_time_window: None,
438            // The last entry has been flushed.
439            flushed_entry_id: Some(version_data.last_entry_id),
440            flushed_sequence: Some(version_data.committed_sequence),
441            committed_sequence: None,
442        };
443        info!(
444            "Applying {edit:?} to region {}, is_staging: {}",
445            self.region_id, self.is_staging
446        );
447
448        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
449
450        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
451            RegionLeaderState::Downgrading
452        } else {
453            // Check if region is in staging mode
454            let current_state = self.manifest_ctx.current_state();
455            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
456                RegionLeaderState::Staging
457            } else {
458                RegionLeaderState::Writable
459            }
460        };
461        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
462        // add a cleanup job to remove them later.
463        let manifest_version = self
464            .manifest_ctx
465            .update_manifest(expected_state, action_list, self.is_staging)
466            .await?;
467        info!(
468            "Successfully update manifest version to {manifest_version}, region: {}, is_staging: {}, reason: {}",
469            self.region_id,
470            self.is_staging,
471            self.reason.as_str()
472        );
473
474        Ok(edit)
475    }
476
477    async fn do_flush_memtables(
478        &self,
479        version: &VersionRef,
480        write_opts: WriteOptions,
481    ) -> Result<DoFlushMemtablesResult> {
482        let memtables = version.memtables.immutables();
483        let mut file_metas = Vec::with_capacity(memtables.len());
484        let mut flushed_bytes = 0;
485        let mut series_count = 0;
486        let mut encoded_part_count = 0;
487        let mut flush_metrics = Metrics::new(WriteType::Flush);
488        let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
489        let hook = self.manifest_ctx.hook();
490        let mut all_sst_infos = Vec::new();
491        for mem in memtables {
492            if mem.is_empty() {
493                // Skip empty memtables.
494                continue;
495            }
496
497            // Compact the memtable first, this waits the background compaction to finish.
498            let compact_start = std::time::Instant::now();
499            if let Err(e) = mem.compact(true) {
500                common_telemetry::error!(e; "Failed to compact memtable before flush");
501            }
502            let compact_cost = compact_start.elapsed();
503            flush_metrics.compact_memtable += compact_cost;
504
505            // Sets `for_flush` flag to true.
506            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
507            let num_mem_ranges = mem_ranges.ranges.len();
508
509            // Aggregate stats from all ranges
510            let num_mem_rows = mem_ranges.num_rows();
511            let memtable_series_count = mem_ranges.series_count();
512            let memtable_id = mem.id();
513            // Increases series count for each mem range. We consider each mem range has different series so
514            // the counter may have more series than the actual series count.
515            series_count += memtable_series_count;
516
517            let flush_start = Instant::now();
518            let FlushFlatMemResult {
519                num_encoded,
520                num_sources,
521                results,
522            } = self
523                .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
524                .await?;
525            encoded_part_count += num_encoded;
526            for (source_idx, result) in results.into_iter().enumerate() {
527                let (max_sequence, ssts_written, metrics) = result?;
528                if ssts_written.is_empty() {
529                    // No data written.
530                    continue;
531                }
532
533                common_telemetry::debug!(
534                    "Region {} flush one memtable {} {}/{}, metrics: {:?}",
535                    self.region_id,
536                    memtable_id,
537                    source_idx,
538                    num_sources,
539                    metrics
540                );
541
542                flush_metrics = flush_metrics.merge(metrics);
543
544                for sst_info in &ssts_written {
545                    flushed_bytes += sst_info.file_size;
546                    let pk_range = sst_info
547                        .file_metadata
548                        .as_ref()
549                        .and_then(|meta| extract_primary_key_range(meta, &version.metadata));
550                    file_metas.push(Self::new_file_meta(
551                        self.region_id,
552                        max_sequence,
553                        sst_info,
554                        partition_expr.clone(),
555                        pk_range,
556                    ));
557                }
558                if hook.is_some() {
559                    all_sst_infos.extend(ssts_written);
560                }
561            }
562
563            common_telemetry::debug!(
564                "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
565                self.region_id,
566                num_sources,
567                memtable_id,
568                num_mem_ranges,
569                num_encoded,
570                num_mem_rows,
571                flush_start.elapsed(),
572                compact_cost,
573            );
574        }
575
576        Ok(DoFlushMemtablesResult {
577            file_metas,
578            flushed_bytes,
579            series_count,
580            encoded_part_count,
581            flush_metrics,
582            sst_infos: all_sst_infos,
583        })
584    }
585
586    async fn flush_flat_mem_ranges(
587        &self,
588        version: &VersionRef,
589        write_opts: &WriteOptions,
590        mem_ranges: MemtableRanges,
591    ) -> Result<FlushFlatMemResult> {
592        let batch_schema = to_flat_sst_arrow_schema(
593            &version.metadata,
594            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
595        );
596        let field_column_start =
597            flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
598        let flat_sources = memtable_flat_sources(
599            batch_schema,
600            mem_ranges,
601            &version.options,
602            field_column_start,
603        )?;
604        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
605        let num_encoded = flat_sources.encoded.len();
606        for (source, max_sequence) in flat_sources.sources {
607            let write_request = self.new_write_request(version, max_sequence, source);
608            let access_layer = self.access_layer.clone();
609            let write_opts = write_opts.clone();
610            let semaphore = self.flush_semaphore.clone();
611            let task = common_runtime::spawn_global(async move {
612                let _permit = semaphore.acquire().await.unwrap();
613                let mut metrics = Metrics::new(WriteType::Flush);
614                let ssts = access_layer
615                    .write_sst(write_request, &write_opts, &mut metrics)
616                    .await?;
617                FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
618                Ok((max_sequence, ssts, metrics))
619            });
620            tasks.push(task);
621        }
622        for (encoded, max_sequence) in flat_sources.encoded {
623            let access_layer = self.access_layer.clone();
624            let cache_manager = self.cache_manager.clone();
625            let region_id = version.metadata.region_id;
626            let semaphore = self.flush_semaphore.clone();
627            let task = common_runtime::spawn_global(async move {
628                let _permit = semaphore.acquire().await.unwrap();
629                let metrics = access_layer
630                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
631                    .await?;
632                FLUSH_FILE_TOTAL.inc();
633                Ok((max_sequence, smallvec![encoded.sst_info], metrics))
634            });
635            tasks.push(task);
636        }
637        let num_sources = tasks.len();
638        let results = futures::future::try_join_all(tasks)
639            .await
640            .context(JoinSnafu)?;
641        Ok(FlushFlatMemResult {
642            num_encoded,
643            num_sources,
644            results,
645        })
646    }
647
648    fn new_file_meta(
649        region_id: RegionId,
650        max_sequence: u64,
651        sst_info: &SstInfo,
652        partition_expr: Option<PartitionExpr>,
653        primary_key_range: Option<(Bytes, Bytes)>,
654    ) -> FileMeta {
655        let (primary_key_min, primary_key_max) = match primary_key_range {
656            Some((min, max)) => (Some(min), Some(max)),
657            None => (None, None),
658        };
659        FileMeta {
660            region_id,
661            file_id: sst_info.file_id,
662            time_range: sst_info.time_range,
663            level: 0,
664            file_size: sst_info.file_size,
665            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
666            available_indexes: sst_info.index_metadata.build_available_indexes(),
667            indexes: sst_info.index_metadata.build_indexes(),
668            index_file_size: sst_info.index_metadata.file_size,
669            index_version: 0,
670            num_rows: sst_info.num_rows as u64,
671            num_row_groups: sst_info.num_row_groups,
672            sequence: NonZeroU64::new(max_sequence),
673            partition_expr,
674            num_series: sst_info.num_series,
675            primary_key_min,
676            primary_key_max,
677        }
678    }
679
680    fn new_write_request(
681        &self,
682        version: &VersionRef,
683        max_sequence: u64,
684        source: FlatSource,
685    ) -> SstWriteRequest {
686        let flat_format = version
687            .options
688            .sst_format
689            .map(|f| f == FormatType::Flat)
690            .unwrap_or(self.engine_config.default_flat_format);
691        SstWriteRequest {
692            op_type: OperationType::Flush,
693            metadata: version.metadata.clone(),
694            source,
695            cache_manager: self.cache_manager.clone(),
696            storage: version.options.storage.clone(),
697            max_sequence: Some(max_sequence),
698            sst_write_format: if flat_format {
699                FormatType::Flat
700            } else {
701                FormatType::PrimaryKey
702            },
703            index_options: self.index_options.clone(),
704            index_config: self.engine_config.index.clone(),
705            inverted_index_config: self.engine_config.inverted_index.clone(),
706            fulltext_index_config: self.engine_config.fulltext_index.clone(),
707            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
708            #[cfg(feature = "vector_index")]
709            vector_index_config: self.engine_config.vector_index.clone(),
710        }
711    }
712
713    /// Notify flush job status.
714    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
715        if let Err(e) = self
716            .request_sender
717            .send(WorkerRequestWithTime::new(request))
718            .await
719        {
720            error!(
721                "Failed to notify flush job status for region {}, request: {:?}",
722                self.region_id, e.0
723            );
724        }
725    }
726
727    /// Merge two flush tasks.
728    fn merge(&mut self, mut other: RegionFlushTask) {
729        assert_eq!(self.region_id, other.region_id);
730        // Now we only merge senders. They share the same flush reason.
731        self.senders.append(&mut other.senders);
732    }
733}
734
735struct FlushFlatMemResult {
736    num_encoded: usize,
737    num_sources: usize,
738    results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
739}
740
741struct DoFlushMemtablesResult {
742    file_metas: Vec<FileMeta>,
743    flushed_bytes: u64,
744    series_count: usize,
745    encoded_part_count: usize,
746    flush_metrics: Metrics,
747    sst_infos: Vec<SstInfo>,
748}
749
750struct FlatSources {
751    sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
752    encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
753}
754
755/// Returns the max sequence and [FlatSource] for the given memtable.
756fn memtable_flat_sources(
757    schema: SchemaRef,
758    mem_ranges: MemtableRanges,
759    options: &RegionOptions,
760    field_column_start: usize,
761) -> Result<FlatSources> {
762    let MemtableRanges { ranges } = mem_ranges;
763    let mut flat_sources = FlatSources {
764        sources: SmallVec::new(),
765        encoded: SmallVec::new(),
766    };
767
768    if ranges.len() == 1 {
769        debug!("Flushing single flat range");
770
771        let only_range = ranges.into_values().next().unwrap();
772        let max_sequence = only_range.stats().max_sequence();
773        if let Some(encoded) = only_range.encoded() {
774            flat_sources.encoded.push((encoded, max_sequence));
775        } else {
776            let iter = only_range.build_record_batch_iter(None, None)?;
777            // Dedup according to append mode and merge mode.
778            // Even single range may have duplicate rows.
779            let iter = maybe_dedup_one(
780                options.append_mode,
781                options.merge_mode(),
782                field_column_start,
783                iter,
784            );
785            flat_sources
786                .sources
787                .push((FlatSource::new_iter(schema, iter), max_sequence));
788        };
789    } else {
790        let min_flush_rows = *ENCODE_ROW_THRESHOLD;
791        // Calculate total rows from non-encoded ranges.
792        let total_rows: usize = ranges
793            .values()
794            .filter(|r| r.encoded().is_none())
795            .map(|r| r.num_rows())
796            .sum();
797        debug!(
798            "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
799            total_rows,
800            min_flush_rows,
801            ranges.len()
802        );
803        let mut rows_remaining = total_rows;
804        let mut last_iter_rows = 0;
805        let num_ranges = ranges.len();
806        let mut input_iters = Vec::with_capacity(num_ranges);
807        let mut current_ranges = Vec::new();
808        for (_range_id, range) in ranges {
809            if let Some(encoded) = range.encoded() {
810                let max_sequence = range.stats().max_sequence();
811                flat_sources.encoded.push((encoded, max_sequence));
812                continue;
813            }
814
815            let iter = range.build_record_batch_iter(None, None)?;
816            input_iters.push(iter);
817            let range_rows = range.num_rows();
818            last_iter_rows += range_rows;
819            rows_remaining -= range_rows;
820            current_ranges.push(range);
821
822            // Flush if we have enough rows, but don't flush if the remaining rows
823            // would be less than DEFAULT_ROW_GROUP_SIZE (to avoid small last files).
824            if last_iter_rows >= min_flush_rows
825                && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
826            {
827                debug!(
828                    "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
829                    last_iter_rows,
830                    min_flush_rows,
831                    input_iters.len(),
832                    rows_remaining
833                );
834
835                // Calculate max_sequence from all merged ranges
836                let max_sequence = current_ranges
837                    .iter()
838                    .map(|r| r.stats().max_sequence())
839                    .max()
840                    .unwrap_or(0);
841
842                let maybe_dedup = merge_and_dedup(
843                    &schema,
844                    options.append_mode,
845                    options.merge_mode(),
846                    field_column_start,
847                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
848                )?;
849
850                flat_sources.sources.push((
851                    FlatSource::new_iter(schema.clone(), maybe_dedup),
852                    max_sequence,
853                ));
854                last_iter_rows = 0;
855                current_ranges.clear();
856            }
857        }
858
859        // Handle remaining iters.
860        if !input_iters.is_empty() {
861            debug!(
862                "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
863                last_iter_rows,
864                min_flush_rows,
865                input_iters.len(),
866                rows_remaining
867            );
868            let max_sequence = current_ranges
869                .iter()
870                .map(|r| r.stats().max_sequence())
871                .max()
872                .unwrap_or(0);
873
874            let maybe_dedup = merge_and_dedup(
875                &schema,
876                options.append_mode,
877                options.merge_mode(),
878                field_column_start,
879                input_iters,
880            )?;
881
882            flat_sources
883                .sources
884                .push((FlatSource::new_iter(schema, maybe_dedup), max_sequence));
885        }
886    }
887
888    Ok(flat_sources)
889}
890
891/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
892///
893/// This function is used during the flush process to combine data from multiple memtable ranges
894/// into a single stream while handling duplicate records according to the configured merge strategy.
895///
896/// # Arguments
897///
898/// * `schema` - The Arrow schema reference that defines the structure of the record batches
899/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
900///                  This is used for append-only workloads where duplicate handling is not required.
901/// * `merge_mode` - The strategy used for deduplication when not in append mode:
902///   - `MergeMode::LastRow`: Keeps the last record for each primary key
903///   - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
904/// * `field_column_start` - The starting column index for fields in the record batch.
905///                          Used when `MergeMode::LastNonNull` to identify which columns
906///                          contain field values versus primary key columns.
907/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
908///
909/// # Returns
910///
911/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
912/// record batches.
913///
914/// # Behavior
915///
916/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
917///    primary key and timestamp
918/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
919/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
920///    applies the specified merge mode:
921///    - `LastRow`: Removes duplicate rows, keeping only the last one
922///    - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
923///
924/// # Examples
925///
926/// ```ignore
927/// let merged_iter = merge_and_dedup(
928///     &schema,
929///     false,  // not append mode, apply dedup
930///     MergeMode::LastRow,
931///     2,  // fields start at column 2 after primary key columns
932///     vec![iter1, iter2, iter3],
933/// )?;
934/// ```
935pub fn merge_and_dedup(
936    schema: &SchemaRef,
937    append_mode: bool,
938    merge_mode: MergeMode,
939    field_column_start: usize,
940    input_iters: Vec<BoxedRecordBatchIterator>,
941) -> Result<BoxedRecordBatchIterator> {
942    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
943    let maybe_dedup = if append_mode {
944        // No dedup in append mode
945        Box::new(merge_iter) as _
946    } else {
947        // Dedup according to merge mode.
948        match merge_mode {
949            MergeMode::LastRow => {
950                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
951            }
952            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
953                merge_iter,
954                FlatLastNonNull::new(field_column_start, false),
955            )) as _,
956        }
957    };
958    Ok(maybe_dedup)
959}
960
961pub fn maybe_dedup_one(
962    append_mode: bool,
963    merge_mode: MergeMode,
964    field_column_start: usize,
965    input_iter: BoxedRecordBatchIterator,
966) -> BoxedRecordBatchIterator {
967    if append_mode {
968        // No dedup in append mode
969        input_iter
970    } else {
971        // Dedup according to merge mode.
972        match merge_mode {
973            MergeMode::LastRow => {
974                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
975            }
976            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
977                input_iter,
978                FlatLastNonNull::new(field_column_start, false),
979            )),
980        }
981    }
982}
983
984/// Manages background flushes of a worker.
985pub(crate) struct FlushScheduler {
986    /// Tracks regions need to flush.
987    region_status: HashMap<RegionId, FlushStatus>,
988    /// Background job scheduler.
989    scheduler: SchedulerRef,
990}
991
992impl FlushScheduler {
993    /// Creates a new flush scheduler.
994    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
995        FlushScheduler {
996            region_status: HashMap::new(),
997            scheduler,
998        }
999    }
1000
1001    /// Returns true if the region already requested flush.
1002    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
1003        self.region_status.contains_key(&region_id)
1004    }
1005
1006    fn schedule_flush_task(
1007        &mut self,
1008        version_control: &VersionControlRef,
1009        task: RegionFlushTask,
1010    ) -> Result<()> {
1011        let region_id = task.region_id;
1012
1013        // If current region doesn't have flush status, we can flush the region directly.
1014        if let Err(e) = version_control.freeze_mutable() {
1015            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
1016
1017            return Err(e);
1018        }
1019        // Submit a flush job.
1020        let job = task.into_flush_job(version_control);
1021        if let Err(e) = self.scheduler.schedule(job) {
1022            // If scheduler returns error, senders in the job will be dropped and waiters
1023            // can get recv errors.
1024            error!(e; "Failed to schedule flush job for region {}", region_id);
1025
1026            return Err(e);
1027        }
1028        Ok(())
1029    }
1030
1031    /// Schedules a flush `task` for specific `region`.
1032    pub(crate) fn schedule_flush(
1033        &mut self,
1034        region_id: RegionId,
1035        version_control: &VersionControlRef,
1036        task: RegionFlushTask,
1037    ) -> Result<()> {
1038        debug_assert_eq!(region_id, task.region_id);
1039
1040        let version = version_control.current().version;
1041        if version.memtables.is_empty() {
1042            debug_assert!(!self.region_status.contains_key(&region_id));
1043            // The region has nothing to flush.
1044            task.on_success();
1045            return Ok(());
1046        }
1047
1048        // Don't increase the counter if a region has nothing to flush.
1049        FLUSH_REQUESTS_TOTAL
1050            .with_label_values(&[task.reason.as_str()])
1051            .inc();
1052
1053        // If current region has flush status, merge the task.
1054        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
1055            // Checks whether we can flush the region now.
1056            debug!("Merging flush task for region {}", region_id);
1057            flush_status.merge_task(task);
1058            return Ok(());
1059        }
1060
1061        self.schedule_flush_task(version_control, task)?;
1062
1063        // Add this region to status map.
1064        let _ = self.region_status.insert(
1065            region_id,
1066            FlushStatus::new(region_id, version_control.clone()),
1067        );
1068
1069        Ok(())
1070    }
1071
1072    /// Notifies the scheduler that the flush job is finished.
1073    ///
1074    /// Returns all pending requests if the region doesn't need to flush again.
1075    pub(crate) fn on_flush_success(
1076        &mut self,
1077        region_id: RegionId,
1078    ) -> Option<(
1079        Vec<SenderDdlRequest>,
1080        Vec<SenderWriteRequest>,
1081        Vec<SenderBulkRequest>,
1082    )> {
1083        let flush_status = self.region_status.get_mut(&region_id)?;
1084        // If region doesn't have any pending flush task, we need to remove it from the status.
1085        if flush_status.pending_task.is_none() {
1086            // The region doesn't have any pending flush task.
1087            // Safety: The flush status must exist.
1088            debug!(
1089                "Region {} doesn't have any pending flush task, removing it from the status",
1090                region_id
1091            );
1092            let flush_status = self.region_status.remove(&region_id).unwrap();
1093            return Some((
1094                flush_status.pending_ddls,
1095                flush_status.pending_writes,
1096                flush_status.pending_bulk_writes,
1097            ));
1098        }
1099
1100        // If region has pending task, but has nothing to flush, we need to remove it from the status.
1101        let version_data = flush_status.version_control.current();
1102        if version_data.version.memtables.is_empty() {
1103            // The region has nothing to flush, we also need to remove it from the status.
1104            // Safety: The pending task is not None.
1105            let task = flush_status.pending_task.take().unwrap();
1106            // The region has nothing to flush. We can notify pending task.
1107            task.on_success();
1108            debug!(
1109                "Region {} has nothing to flush, removing it from the status",
1110                region_id
1111            );
1112            // Safety: The flush status must exist.
1113            let flush_status = self.region_status.remove(&region_id).unwrap();
1114            return Some((
1115                flush_status.pending_ddls,
1116                flush_status.pending_writes,
1117                flush_status.pending_bulk_writes,
1118            ));
1119        }
1120
1121        // If region has pending task and has something to flush, we need to schedule it.
1122        debug!("Scheduling pending flush task for region {}", region_id);
1123        // Safety: The flush status must exist.
1124        let task = flush_status.pending_task.take().unwrap();
1125        let version_control = flush_status.version_control.clone();
1126        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1127            error!(
1128                err;
1129                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1130            );
1131        }
1132        // We can flush the region again, keep it in the region status.
1133        None
1134    }
1135
1136    /// Notifies the scheduler that the flush job is failed.
1137    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1138        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1139
1140        FLUSH_FAILURE_TOTAL.inc();
1141
1142        // Remove this region.
1143        let Some(flush_status) = self.region_status.remove(&region_id) else {
1144            return;
1145        };
1146
1147        // Fast fail: cancels all pending tasks and sends error to their waiters.
1148        flush_status.on_failure(err);
1149    }
1150
1151    /// Notifies the scheduler that the region is dropped.
1152    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1153        self.remove_region_on_failure(
1154            region_id,
1155            Arc::new(RegionDroppedSnafu { region_id }.build()),
1156        );
1157    }
1158
1159    /// Notifies the scheduler that the region is closed.
1160    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1161        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1162    }
1163
1164    /// Notifies the scheduler that the region is truncated.
1165    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1166        self.remove_region_on_failure(
1167            region_id,
1168            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1169        );
1170    }
1171
1172    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1173        // Remove this region.
1174        let Some(flush_status) = self.region_status.remove(&region_id) else {
1175            return;
1176        };
1177
1178        // Notifies all pending tasks.
1179        flush_status.on_failure(err);
1180    }
1181
1182    /// Add ddl request to pending queue.
1183    ///
1184    /// # Panics
1185    /// Panics if region didn't request flush.
1186    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1187        let status = self.region_status.get_mut(&request.region_id).unwrap();
1188        status.pending_ddls.push(request);
1189    }
1190
1191    /// Add write request to pending queue.
1192    ///
1193    /// # Panics
1194    /// Panics if region didn't request flush.
1195    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1196        let status = self
1197            .region_status
1198            .get_mut(&request.request.region_id)
1199            .unwrap();
1200        status.pending_writes.push(request);
1201    }
1202
1203    /// Add bulk write request to pending queue.
1204    ///
1205    /// # Panics
1206    /// Panics if region didn't request flush.
1207    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1208        let status = self.region_status.get_mut(&request.region_id).unwrap();
1209        status.pending_bulk_writes.push(request);
1210    }
1211
1212    /// Returns true if the region has pending DDLs.
1213    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1214        self.region_status
1215            .get(&region_id)
1216            .map(|status| !status.pending_ddls.is_empty())
1217            .unwrap_or(false)
1218    }
1219}
1220
1221impl Drop for FlushScheduler {
1222    fn drop(&mut self) {
1223        for (region_id, flush_status) in self.region_status.drain() {
1224            // We are shutting down so notify all pending tasks.
1225            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1226        }
1227    }
1228}
1229
1230/// Flush status of a region scheduled by the [FlushScheduler].
1231///
1232/// Tracks running and pending flush tasks and all pending requests of a region.
1233struct FlushStatus {
1234    /// Current region.
1235    region_id: RegionId,
1236    /// Version control of the region.
1237    version_control: VersionControlRef,
1238    /// Task waiting for next flush.
1239    pending_task: Option<RegionFlushTask>,
1240    /// Pending ddl requests.
1241    pending_ddls: Vec<SenderDdlRequest>,
1242    /// Requests waiting to write after altering the region.
1243    pending_writes: Vec<SenderWriteRequest>,
1244    /// Bulk requests waiting to write after altering the region.
1245    pending_bulk_writes: Vec<SenderBulkRequest>,
1246}
1247
1248impl FlushStatus {
1249    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1250        FlushStatus {
1251            region_id,
1252            version_control,
1253            pending_task: None,
1254            pending_ddls: Vec::new(),
1255            pending_writes: Vec::new(),
1256            pending_bulk_writes: Vec::new(),
1257        }
1258    }
1259
1260    /// Merges the task to pending task.
1261    fn merge_task(&mut self, task: RegionFlushTask) {
1262        if let Some(pending) = &mut self.pending_task {
1263            pending.merge(task);
1264        } else {
1265            self.pending_task = Some(task);
1266        }
1267    }
1268
1269    fn on_failure(self, err: Arc<Error>) {
1270        if let Some(mut task) = self.pending_task {
1271            task.on_failure(err.clone());
1272        }
1273        for ddl in self.pending_ddls {
1274            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1275                region_id: self.region_id,
1276            }));
1277        }
1278        for write_req in self.pending_writes {
1279            write_req
1280                .sender
1281                .send(Err(err.clone()).context(FlushRegionSnafu {
1282                    region_id: self.region_id,
1283                }));
1284        }
1285    }
1286}
1287
1288#[cfg(test)]
1289mod tests {
1290    use mito_codec::row_converter::build_primary_key_codec;
1291    use tokio::sync::oneshot;
1292
1293    use super::*;
1294    use crate::cache::CacheManager;
1295    use crate::memtable::bulk::part::BulkPartConverter;
1296    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1297    use crate::memtable::{Memtable, RangesOptions};
1298    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1299    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1300    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1301    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1302
1303    #[test]
1304    fn test_get_mutable_limit() {
1305        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1306        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1307        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1308        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1309    }
1310
1311    #[test]
1312    fn test_over_mutable_limit() {
1313        // Mutable limit is 500.
1314        let manager = WriteBufferManagerImpl::new(1000);
1315        manager.reserve_mem(400);
1316        assert!(!manager.should_flush_engine());
1317        assert!(!manager.should_stall());
1318
1319        // More than mutable limit.
1320        manager.reserve_mem(400);
1321        assert!(manager.should_flush_engine());
1322
1323        // Freezes mutable.
1324        manager.schedule_free_mem(400);
1325        assert!(!manager.should_flush_engine());
1326        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1327        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1328
1329        // Releases immutable.
1330        manager.free_mem(400);
1331        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1332        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1333    }
1334
1335    #[test]
1336    fn test_over_global() {
1337        // Mutable limit is 500.
1338        let manager = WriteBufferManagerImpl::new(1000);
1339        manager.reserve_mem(1100);
1340        assert!(manager.should_stall());
1341        // Global usage is still 1100.
1342        manager.schedule_free_mem(200);
1343        assert!(manager.should_flush_engine());
1344        assert!(manager.should_stall());
1345
1346        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1347        manager.schedule_free_mem(450);
1348        assert!(manager.should_flush_engine());
1349        assert!(manager.should_stall());
1350
1351        // Now mutable is enough.
1352        manager.reserve_mem(50);
1353        assert!(manager.should_flush_engine());
1354        manager.reserve_mem(100);
1355        assert!(manager.should_flush_engine());
1356    }
1357
1358    #[test]
1359    fn test_manager_notify() {
1360        let (sender, receiver) = watch::channel(());
1361        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1362        manager.reserve_mem(500);
1363        assert!(!receiver.has_changed().unwrap());
1364        manager.schedule_free_mem(500);
1365        assert!(!receiver.has_changed().unwrap());
1366        manager.free_mem(500);
1367        assert!(receiver.has_changed().unwrap());
1368    }
1369
1370    #[tokio::test]
1371    async fn test_schedule_empty() {
1372        let env = SchedulerEnv::new().await;
1373        let (tx, _rx) = mpsc::channel(4);
1374        let mut scheduler = env.mock_flush_scheduler();
1375        let builder = VersionControlBuilder::new();
1376
1377        let version_control = Arc::new(builder.build());
1378        let (output_tx, output_rx) = oneshot::channel();
1379        let mut task = RegionFlushTask {
1380            region_id: builder.region_id(),
1381            reason: FlushReason::Manual,
1382            senders: Vec::new(),
1383            request_sender: tx,
1384            access_layer: env.access_layer.clone(),
1385            listener: WorkerListener::default(),
1386            engine_config: Arc::new(MitoConfig::default()),
1387            row_group_size: None,
1388            cache_manager: Arc::new(CacheManager::default()),
1389            manifest_ctx: env
1390                .mock_manifest_context(version_control.current().version.metadata.clone())
1391                .await,
1392            index_options: IndexOptions::default(),
1393            flush_semaphore: Arc::new(Semaphore::new(2)),
1394            is_staging: false,
1395            partition_expr: None,
1396        };
1397        task.push_sender(OptionOutputTx::from(output_tx));
1398        scheduler
1399            .schedule_flush(builder.region_id(), &version_control, task)
1400            .unwrap();
1401        assert!(scheduler.region_status.is_empty());
1402        let output = output_rx.await.unwrap().unwrap();
1403        assert_eq!(output, 0);
1404        assert!(scheduler.region_status.is_empty());
1405    }
1406
1407    #[tokio::test]
1408    async fn test_schedule_pending_request() {
1409        let job_scheduler = Arc::new(VecScheduler::default());
1410        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1411        let (tx, _rx) = mpsc::channel(4);
1412        let mut scheduler = env.mock_flush_scheduler();
1413        let mut builder = VersionControlBuilder::new();
1414        // Overwrites the empty memtable builder.
1415        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1416        let version_control = Arc::new(builder.build());
1417        // Writes data to the memtable so it is not empty.
1418        let version_data = version_control.current();
1419        write_rows_to_version(&version_data.version, "host0", 0, 10);
1420        let manifest_ctx = env
1421            .mock_manifest_context(version_data.version.metadata.clone())
1422            .await;
1423        // Creates 3 tasks.
1424        let mut tasks: Vec<_> = (0..3)
1425            .map(|_| RegionFlushTask {
1426                region_id: builder.region_id(),
1427                reason: FlushReason::Manual,
1428                senders: Vec::new(),
1429                request_sender: tx.clone(),
1430                access_layer: env.access_layer.clone(),
1431                listener: WorkerListener::default(),
1432                engine_config: Arc::new(MitoConfig::default()),
1433                row_group_size: None,
1434                cache_manager: Arc::new(CacheManager::default()),
1435                manifest_ctx: manifest_ctx.clone(),
1436                index_options: IndexOptions::default(),
1437                flush_semaphore: Arc::new(Semaphore::new(2)),
1438                is_staging: false,
1439                partition_expr: None,
1440            })
1441            .collect();
1442        // Schedule first task.
1443        let task = tasks.pop().unwrap();
1444        scheduler
1445            .schedule_flush(builder.region_id(), &version_control, task)
1446            .unwrap();
1447        // Should schedule 1 flush.
1448        assert_eq!(1, scheduler.region_status.len());
1449        assert_eq!(1, job_scheduler.num_jobs());
1450        // Check the new version.
1451        let version_data = version_control.current();
1452        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1453        // Schedule remaining tasks.
1454        let output_rxs: Vec<_> = tasks
1455            .into_iter()
1456            .map(|mut task| {
1457                let (output_tx, output_rx) = oneshot::channel();
1458                task.push_sender(OptionOutputTx::from(output_tx));
1459                scheduler
1460                    .schedule_flush(builder.region_id(), &version_control, task)
1461                    .unwrap();
1462                output_rx
1463            })
1464            .collect();
1465        // Assumes the flush job is finished.
1466        version_control.apply_edit(
1467            Some(RegionEdit {
1468                files_to_add: Vec::new(),
1469                files_to_remove: Vec::new(),
1470                timestamp_ms: None,
1471                compaction_time_window: None,
1472                flushed_entry_id: None,
1473                flushed_sequence: None,
1474                committed_sequence: None,
1475            }),
1476            &[0],
1477            builder.file_purger(),
1478        );
1479        scheduler.on_flush_success(builder.region_id());
1480        // No new flush task.
1481        assert_eq!(1, job_scheduler.num_jobs());
1482        // The flush status is cleared.
1483        assert!(scheduler.region_status.is_empty());
1484        for output_rx in output_rxs {
1485            let output = output_rx.await.unwrap().unwrap();
1486            assert_eq!(output, 0);
1487        }
1488    }
1489
1490    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1491    #[test]
1492    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1493        // Build test metadata and flat schema
1494        let metadata = metadata_for_test();
1495        let schema = to_flat_sst_arrow_schema(
1496            &metadata,
1497            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1498        );
1499
1500        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1501        // Two rows with identical keys and timestamps (ts = 1000), different field values
1502        let capacity = 16;
1503        let pk_codec = build_primary_key_codec(&metadata);
1504        let mut converter =
1505            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1506        let kvs = build_key_values_with_ts_seq_values(
1507            &metadata,
1508            "dup_key".to_string(),
1509            1,
1510            vec![1000i64, 1000i64].into_iter(),
1511            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1512            1,
1513        );
1514        converter.append_key_values(&kvs).unwrap();
1515        let part = converter.convert().unwrap();
1516
1517        // Helper to build MemtableRanges with a single range from one bulk part.
1518        // We use BulkMemtable directly because it produces record batch iterators.
1519        let build_ranges = |append_mode: bool| -> MemtableRanges {
1520            let memtable = crate::memtable::bulk::BulkMemtable::new(
1521                1,
1522                crate::memtable::bulk::BulkMemtableConfig::default(),
1523                metadata.clone(),
1524                None,
1525                None,
1526                append_mode,
1527                MergeMode::LastRow,
1528            );
1529            memtable.write_bulk(part.clone()).unwrap();
1530            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1531        };
1532
1533        // Case 1: append_mode = false => dedup happens, total rows should be 1
1534        {
1535            let mem_ranges = build_ranges(false);
1536            assert_eq!(1, mem_ranges.ranges.len());
1537
1538            let options = RegionOptions {
1539                append_mode: false,
1540                merge_mode: Some(MergeMode::LastRow),
1541                ..Default::default()
1542            };
1543
1544            let flat_sources = memtable_flat_sources(
1545                schema.clone(),
1546                mem_ranges,
1547                &options,
1548                metadata.primary_key.len(),
1549            )
1550            .unwrap();
1551            assert!(flat_sources.encoded.is_empty());
1552            assert_eq!(1, flat_sources.sources.len());
1553
1554            // Consume the iterator and count rows
1555            let mut total_rows = 0usize;
1556            for (source, _sequence) in flat_sources.sources {
1557                total_rows += source
1558                    .take_iter()
1559                    .map(|x| x.unwrap().num_rows())
1560                    .sum::<usize>();
1561            }
1562            assert_eq!(1, total_rows, "dedup should keep a single row");
1563        }
1564
1565        // Case 2: append_mode = true => no dedup, total rows should be 2
1566        {
1567            let mem_ranges = build_ranges(true);
1568            assert_eq!(1, mem_ranges.ranges.len());
1569
1570            let options = RegionOptions {
1571                append_mode: true,
1572                ..Default::default()
1573            };
1574
1575            let flat_sources =
1576                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1577                    .unwrap();
1578            assert!(flat_sources.encoded.is_empty());
1579            assert_eq!(1, flat_sources.sources.len());
1580
1581            let mut total_rows = 0usize;
1582            for (source, _sequence) in flat_sources.sources {
1583                total_rows += source
1584                    .take_iter()
1585                    .map(|x| x.unwrap().num_rows())
1586                    .sum::<usize>();
1587            }
1588            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1589        }
1590    }
1591
1592    #[tokio::test]
1593    async fn test_schedule_pending_request_on_flush_success() {
1594        common_telemetry::init_default_ut_logging();
1595        let job_scheduler = Arc::new(VecScheduler::default());
1596        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1597        let (tx, _rx) = mpsc::channel(4);
1598        let mut scheduler = env.mock_flush_scheduler();
1599        let mut builder = VersionControlBuilder::new();
1600        // Overwrites the empty memtable builder.
1601        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1602        let version_control = Arc::new(builder.build());
1603        // Writes data to the memtable so it is not empty.
1604        let version_data = version_control.current();
1605        write_rows_to_version(&version_data.version, "host0", 0, 10);
1606        let manifest_ctx = env
1607            .mock_manifest_context(version_data.version.metadata.clone())
1608            .await;
1609        // Creates 2 tasks.
1610        let mut tasks: Vec<_> = (0..2)
1611            .map(|_| RegionFlushTask {
1612                region_id: builder.region_id(),
1613                reason: FlushReason::Manual,
1614                senders: Vec::new(),
1615                request_sender: tx.clone(),
1616                access_layer: env.access_layer.clone(),
1617                listener: WorkerListener::default(),
1618                engine_config: Arc::new(MitoConfig::default()),
1619                row_group_size: None,
1620                cache_manager: Arc::new(CacheManager::default()),
1621                manifest_ctx: manifest_ctx.clone(),
1622                index_options: IndexOptions::default(),
1623                flush_semaphore: Arc::new(Semaphore::new(2)),
1624                is_staging: false,
1625                partition_expr: None,
1626            })
1627            .collect();
1628        // Schedule first task.
1629        let task = tasks.pop().unwrap();
1630        scheduler
1631            .schedule_flush(builder.region_id(), &version_control, task)
1632            .unwrap();
1633        // Should schedule 1 flush.
1634        assert_eq!(1, scheduler.region_status.len());
1635        assert_eq!(1, job_scheduler.num_jobs());
1636        // Schedule second task.
1637        let task = tasks.pop().unwrap();
1638        scheduler
1639            .schedule_flush(builder.region_id(), &version_control, task)
1640            .unwrap();
1641        assert!(
1642            scheduler
1643                .region_status
1644                .get(&builder.region_id())
1645                .unwrap()
1646                .pending_task
1647                .is_some()
1648        );
1649
1650        // Check the new version.
1651        let version_data = version_control.current();
1652        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1653        // Assumes the flush job is finished.
1654        version_control.apply_edit(
1655            Some(RegionEdit {
1656                files_to_add: Vec::new(),
1657                files_to_remove: Vec::new(),
1658                timestamp_ms: None,
1659                compaction_time_window: None,
1660                flushed_entry_id: None,
1661                flushed_sequence: None,
1662                committed_sequence: None,
1663            }),
1664            &[0],
1665            builder.file_purger(),
1666        );
1667        write_rows_to_version(&version_data.version, "host1", 0, 10);
1668        scheduler.on_flush_success(builder.region_id());
1669        assert_eq!(2, job_scheduler.num_jobs());
1670        // The pending task is cleared.
1671        assert!(
1672            scheduler
1673                .region_status
1674                .get(&builder.region_id())
1675                .unwrap()
1676                .pending_task
1677                .is_none()
1678        );
1679    }
1680}