mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{
44    BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
45};
46use crate::metrics::{
47    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
48    INFLIGHT_FLUSH_COUNT,
49};
50use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
51use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
52use crate::read::flat_merge::FlatMergeIterator;
53use crate::read::merge::MergeReaderBuilder;
54use crate::read::{FlatSource, Source};
55use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
56use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
57use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
58use crate::request::{
59    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
60    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::FileMeta;
64use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
65use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68/// Global write buffer (memtable) manager.
69///
70/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
71pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72    /// Returns whether to trigger the engine.
73    fn should_flush_engine(&self) -> bool;
74
75    /// Returns whether to stall write requests.
76    fn should_stall(&self) -> bool;
77
78    /// Reserves `mem` bytes.
79    fn reserve_mem(&self, mem: usize);
80
81    /// Tells the manager we are freeing `mem` bytes.
82    ///
83    /// We are in the process of freeing `mem` bytes, so it is not considered
84    /// when checking the soft limit.
85    fn schedule_free_mem(&self, mem: usize);
86
87    /// We have freed `mem` bytes.
88    fn free_mem(&self, mem: usize);
89
90    /// Returns the total memory used by memtables.
91    fn memory_usage(&self) -> usize;
92
93    /// Returns the mutable memtable memory limit.
94    ///
95    /// The write buffer manager should flush memtables when the mutable memory usage
96    /// exceeds this limit.
97    fn flush_limit(&self) -> usize;
98}
99
100pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
101
102/// Default [WriteBufferManager] implementation.
103///
104/// Inspired by RocksDB's WriteBufferManager.
105/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
106#[derive(Debug)]
107pub struct WriteBufferManagerImpl {
108    /// Write buffer size for the engine.
109    global_write_buffer_size: usize,
110    /// Mutable memtable memory size limit.
111    mutable_limit: usize,
112    /// Memory in used (e.g. used by mutable and immutable memtables).
113    memory_used: AtomicUsize,
114    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
115    memory_active: AtomicUsize,
116    /// Optional notifier.
117    /// The manager can wake up the worker once we free the write buffer.
118    notifier: Option<watch::Sender<()>>,
119}
120
121impl WriteBufferManagerImpl {
122    /// Returns a new manager with specific `global_write_buffer_size`.
123    pub fn new(global_write_buffer_size: usize) -> Self {
124        Self {
125            global_write_buffer_size,
126            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
127            memory_used: AtomicUsize::new(0),
128            memory_active: AtomicUsize::new(0),
129            notifier: None,
130        }
131    }
132
133    /// Attaches a notifier to the manager.
134    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
135        self.notifier = Some(notifier);
136        self
137    }
138
139    /// Returns memory usage of mutable memtables.
140    pub fn mutable_usage(&self) -> usize {
141        self.memory_active.load(Ordering::Relaxed)
142    }
143
144    /// Returns the size limit for mutable memtables.
145    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
146        // Reserves half of the write buffer for mutable memtable.
147        global_write_buffer_size / 2
148    }
149}
150
151impl WriteBufferManager for WriteBufferManagerImpl {
152    fn should_flush_engine(&self) -> bool {
153        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
154        if mutable_memtable_memory_usage >= self.mutable_limit {
155            debug!(
156                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
157                mutable_memtable_memory_usage,
158                self.memory_usage(),
159                self.mutable_limit,
160                self.global_write_buffer_size,
161            );
162            return true;
163        }
164
165        let memory_usage = self.memory_used.load(Ordering::Relaxed);
166        if memory_usage >= self.global_write_buffer_size {
167            return true;
168        }
169
170        false
171    }
172
173    fn should_stall(&self) -> bool {
174        self.memory_usage() >= self.global_write_buffer_size
175    }
176
177    fn reserve_mem(&self, mem: usize) {
178        self.memory_used.fetch_add(mem, Ordering::Relaxed);
179        self.memory_active.fetch_add(mem, Ordering::Relaxed);
180    }
181
182    fn schedule_free_mem(&self, mem: usize) {
183        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
184    }
185
186    fn free_mem(&self, mem: usize) {
187        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
188        if let Some(notifier) = &self.notifier {
189            // Notifies the worker after the memory usage is decreased. When we drop the memtable
190            // outside of the worker, the worker may still stall requests because the memory usage
191            // is not updated. So we need to notify the worker to handle stalled requests again.
192            let _ = notifier.send(());
193        }
194    }
195
196    fn memory_usage(&self) -> usize {
197        self.memory_used.load(Ordering::Relaxed)
198    }
199
200    fn flush_limit(&self) -> usize {
201        self.mutable_limit
202    }
203}
204
205/// Reason of a flush task.
206#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
207pub enum FlushReason {
208    /// Other reasons.
209    Others,
210    /// Engine reaches flush threshold.
211    EngineFull,
212    /// Manual flush.
213    Manual,
214    /// Flush to alter table.
215    Alter,
216    /// Flush periodically.
217    Periodically,
218    /// Flush memtable during downgrading state.
219    Downgrading,
220    /// Enter staging mode.
221    EnterStaging,
222}
223
224impl FlushReason {
225    /// Get flush reason as static str.
226    fn as_str(&self) -> &'static str {
227        self.into()
228    }
229}
230
231/// Task to flush a region.
232pub(crate) struct RegionFlushTask {
233    /// Region to flush.
234    pub(crate) region_id: RegionId,
235    /// Reason to flush.
236    pub(crate) reason: FlushReason,
237    /// Flush result senders.
238    pub(crate) senders: Vec<OutputTx>,
239    /// Request sender to notify the worker.
240    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
241
242    pub(crate) access_layer: AccessLayerRef,
243    pub(crate) listener: WorkerListener,
244    pub(crate) engine_config: Arc<MitoConfig>,
245    pub(crate) row_group_size: Option<usize>,
246    pub(crate) cache_manager: CacheManagerRef,
247    pub(crate) manifest_ctx: ManifestContextRef,
248
249    /// Index options for the region.
250    pub(crate) index_options: IndexOptions,
251    /// Semaphore to control flush concurrency.
252    pub(crate) flush_semaphore: Arc<Semaphore>,
253    /// Whether the region is in staging mode.
254    pub(crate) is_staging: bool,
255}
256
257impl RegionFlushTask {
258    /// Push the sender if it is not none.
259    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
260        if let Some(sender) = sender.take_inner() {
261            self.senders.push(sender);
262        }
263    }
264
265    /// Consumes the task and notify the sender the job is success.
266    fn on_success(self) {
267        for sender in self.senders {
268            sender.send(Ok(0));
269        }
270    }
271
272    /// Send flush error to waiter.
273    fn on_failure(&mut self, err: Arc<Error>) {
274        for sender in self.senders.drain(..) {
275            sender.send(Err(err.clone()).context(FlushRegionSnafu {
276                region_id: self.region_id,
277            }));
278        }
279    }
280
281    /// Converts the flush task into a background job.
282    ///
283    /// We must call this in the region worker.
284    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
285        // Get a version of this region before creating a job to get current
286        // wal entry id, sequence and immutable memtables.
287        let version_data = version_control.current();
288
289        Box::pin(async move {
290            INFLIGHT_FLUSH_COUNT.inc();
291            self.do_flush(version_data).await;
292            INFLIGHT_FLUSH_COUNT.dec();
293        })
294    }
295
296    /// Runs the flush task.
297    async fn do_flush(&mut self, version_data: VersionControlData) {
298        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
299        self.listener.on_flush_begin(self.region_id).await;
300
301        let worker_request = match self.flush_memtables(&version_data).await {
302            Ok(edit) => {
303                let memtables_to_remove = version_data
304                    .version
305                    .memtables
306                    .immutables()
307                    .iter()
308                    .map(|m| m.id())
309                    .collect();
310                let flush_finished = FlushFinished {
311                    region_id: self.region_id,
312                    // The last entry has been flushed.
313                    flushed_entry_id: version_data.last_entry_id,
314                    senders: std::mem::take(&mut self.senders),
315                    _timer: timer,
316                    edit,
317                    memtables_to_remove,
318                    is_staging: self.is_staging,
319                };
320                WorkerRequest::Background {
321                    region_id: self.region_id,
322                    notify: BackgroundNotify::FlushFinished(flush_finished),
323                }
324            }
325            Err(e) => {
326                error!(e; "Failed to flush region {}", self.region_id);
327                // Discard the timer.
328                timer.stop_and_discard();
329
330                let err = Arc::new(e);
331                self.on_failure(err.clone());
332                WorkerRequest::Background {
333                    region_id: self.region_id,
334                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
335                }
336            }
337        };
338        self.send_worker_request(worker_request).await;
339    }
340
341    /// Flushes memtables to level 0 SSTs and updates the manifest.
342    /// Returns the [RegionEdit] to apply.
343    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
344        // We must use the immutable memtables list and entry ids from the `version_data`
345        // for consistency as others might already modify the version in the `version_control`.
346        let version = &version_data.version;
347        let timer = FLUSH_ELAPSED
348            .with_label_values(&["flush_memtables"])
349            .start_timer();
350
351        let mut write_opts = WriteOptions {
352            write_buffer_size: self.engine_config.sst_write_buffer_size,
353            ..Default::default()
354        };
355        if let Some(row_group_size) = self.row_group_size {
356            write_opts.row_group_size = row_group_size;
357        }
358
359        let DoFlushMemtablesResult {
360            file_metas,
361            flushed_bytes,
362            series_count,
363            flush_metrics,
364        } = self.do_flush_memtables(version, write_opts).await?;
365
366        if !file_metas.is_empty() {
367            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
368        }
369
370        let mut file_ids = Vec::with_capacity(file_metas.len());
371        let mut total_rows = 0;
372        let mut total_bytes = 0;
373        for meta in &file_metas {
374            file_ids.push(meta.file_id);
375            total_rows += meta.num_rows;
376            total_bytes += meta.file_size;
377        }
378        info!(
379            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
380            self.region_id,
381            self.reason.as_str(),
382            file_ids,
383            series_count,
384            total_rows,
385            total_bytes,
386            timer.stop_and_record(),
387            flush_metrics,
388        );
389        flush_metrics.observe();
390
391        let edit = RegionEdit {
392            files_to_add: file_metas,
393            files_to_remove: Vec::new(),
394            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
395            compaction_time_window: None,
396            // The last entry has been flushed.
397            flushed_entry_id: Some(version_data.last_entry_id),
398            flushed_sequence: Some(version_data.committed_sequence),
399            committed_sequence: None,
400        };
401        info!(
402            "Applying {edit:?} to region {}, is_staging: {}",
403            self.region_id, self.is_staging
404        );
405
406        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
407
408        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
409            RegionLeaderState::Downgrading
410        } else {
411            // Check if region is in staging mode
412            let current_state = self.manifest_ctx.current_state();
413            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
414                RegionLeaderState::Staging
415            } else {
416                RegionLeaderState::Writable
417            }
418        };
419        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
420        // add a cleanup job to remove them later.
421        let version = self
422            .manifest_ctx
423            .update_manifest(expected_state, action_list, self.is_staging)
424            .await?;
425        info!(
426            "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
427            self.region_id,
428            self.is_staging,
429            self.reason.as_str()
430        );
431
432        Ok(edit)
433    }
434
435    async fn do_flush_memtables(
436        &self,
437        version: &VersionRef,
438        write_opts: WriteOptions,
439    ) -> Result<DoFlushMemtablesResult> {
440        let memtables = version.memtables.immutables();
441        let mut file_metas = Vec::with_capacity(memtables.len());
442        let mut flushed_bytes = 0;
443        let mut series_count = 0;
444        // Convert partition expression once outside the map
445        let partition_expr = match &version.metadata.partition_expr {
446            None => None,
447            Some(json_expr) if json_expr.is_empty() => None,
448            Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
449                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
450        };
451        let mut flush_metrics = Metrics::new(WriteType::Flush);
452        for mem in memtables {
453            if mem.is_empty() {
454                // Skip empty memtables.
455                continue;
456            }
457
458            // Compact the memtable first, this waits the background compaction to finish.
459            let compact_start = std::time::Instant::now();
460            if let Err(e) = mem.compact(true) {
461                common_telemetry::error!(e; "Failed to compact memtable before flush");
462            }
463            let compact_cost = compact_start.elapsed();
464            flush_metrics.compact_memtable += compact_cost;
465
466            // Sets `for_flush` flag to true.
467            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
468            let num_mem_ranges = mem_ranges.ranges.len();
469            let num_mem_rows = mem_ranges.stats.num_rows();
470            let memtable_id = mem.id();
471            // Increases series count for each mem range. We consider each mem range has different series so
472            // the counter may have more series than the actual series count.
473            series_count += mem_ranges.stats.series_count();
474
475            if mem_ranges.is_record_batch() {
476                let flush_start = Instant::now();
477                let FlushFlatMemResult {
478                    num_encoded,
479                    max_sequence,
480                    num_sources,
481                    results,
482                } = self
483                    .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
484                    .await?;
485                for (source_idx, result) in results.into_iter().enumerate() {
486                    let (ssts_written, metrics) = result?;
487                    if ssts_written.is_empty() {
488                        // No data written.
489                        continue;
490                    }
491
492                    common_telemetry::debug!(
493                        "Region {} flush one memtable {} {}/{}, metrics: {:?}",
494                        self.region_id,
495                        memtable_id,
496                        source_idx,
497                        num_sources,
498                        metrics
499                    );
500
501                    flush_metrics = flush_metrics.merge(metrics);
502
503                    file_metas.extend(ssts_written.into_iter().map(|sst_info| {
504                        flushed_bytes += sst_info.file_size;
505                        Self::new_file_meta(
506                            self.region_id,
507                            max_sequence,
508                            sst_info,
509                            partition_expr.clone(),
510                        )
511                    }));
512                }
513
514                common_telemetry::debug!(
515                    "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
516                    self.region_id,
517                    num_sources,
518                    memtable_id,
519                    num_mem_ranges,
520                    num_encoded,
521                    num_mem_rows,
522                    flush_start.elapsed(),
523                    compact_cost,
524                );
525            } else {
526                let max_sequence = mem_ranges.stats.max_sequence();
527                let source = memtable_source(mem_ranges, &version.options).await?;
528
529                // Flush to level 0.
530                let source = Either::Left(source);
531                let write_request = self.new_write_request(version, max_sequence, source);
532
533                let mut metrics = Metrics::new(WriteType::Flush);
534                let ssts_written = self
535                    .access_layer
536                    .write_sst(write_request, &write_opts, &mut metrics)
537                    .await?;
538                if ssts_written.is_empty() {
539                    // No data written.
540                    continue;
541                }
542
543                debug!(
544                    "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
545                    self.region_id, num_mem_ranges, num_mem_rows, metrics
546                );
547
548                flush_metrics = flush_metrics.merge(metrics);
549
550                file_metas.extend(ssts_written.into_iter().map(|sst_info| {
551                    flushed_bytes += sst_info.file_size;
552                    Self::new_file_meta(
553                        self.region_id,
554                        max_sequence,
555                        sst_info,
556                        partition_expr.clone(),
557                    )
558                }));
559            };
560        }
561
562        Ok(DoFlushMemtablesResult {
563            file_metas,
564            flushed_bytes,
565            series_count,
566            flush_metrics,
567        })
568    }
569
570    async fn flush_flat_mem_ranges(
571        &self,
572        version: &VersionRef,
573        write_opts: &WriteOptions,
574        mem_ranges: MemtableRanges,
575    ) -> Result<FlushFlatMemResult> {
576        let batch_schema = to_flat_sst_arrow_schema(
577            &version.metadata,
578            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
579        );
580        let flat_sources = memtable_flat_sources(
581            batch_schema,
582            mem_ranges,
583            &version.options,
584            version.metadata.primary_key.len(),
585        )?;
586        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
587        let num_encoded = flat_sources.encoded.len();
588        let max_sequence = flat_sources.max_sequence;
589        for source in flat_sources.sources {
590            let source = Either::Right(source);
591            let write_request = self.new_write_request(version, max_sequence, source);
592            let access_layer = self.access_layer.clone();
593            let write_opts = write_opts.clone();
594            let semaphore = self.flush_semaphore.clone();
595            let task = common_runtime::spawn_global(async move {
596                let _permit = semaphore.acquire().await.unwrap();
597                let mut metrics = Metrics::new(WriteType::Flush);
598                let ssts = access_layer
599                    .write_sst(write_request, &write_opts, &mut metrics)
600                    .await?;
601                Ok((ssts, metrics))
602            });
603            tasks.push(task);
604        }
605        for encoded in flat_sources.encoded {
606            let access_layer = self.access_layer.clone();
607            let cache_manager = self.cache_manager.clone();
608            let region_id = version.metadata.region_id;
609            let semaphore = self.flush_semaphore.clone();
610            let task = common_runtime::spawn_global(async move {
611                let _permit = semaphore.acquire().await.unwrap();
612                let metrics = access_layer
613                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
614                    .await?;
615                Ok((smallvec![encoded.sst_info], metrics))
616            });
617            tasks.push(task);
618        }
619        let num_sources = tasks.len();
620        let results = futures::future::try_join_all(tasks)
621            .await
622            .context(JoinSnafu)?;
623        Ok(FlushFlatMemResult {
624            num_encoded,
625            max_sequence,
626            num_sources,
627            results,
628        })
629    }
630
631    fn new_file_meta(
632        region_id: RegionId,
633        max_sequence: u64,
634        sst_info: SstInfo,
635        partition_expr: Option<PartitionExpr>,
636    ) -> FileMeta {
637        FileMeta {
638            region_id,
639            file_id: sst_info.file_id,
640            time_range: sst_info.time_range,
641            level: 0,
642            file_size: sst_info.file_size,
643            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
644            available_indexes: sst_info.index_metadata.build_available_indexes(),
645            indexes: sst_info.index_metadata.build_indexes(),
646            index_file_size: sst_info.index_metadata.file_size,
647            index_version: 0,
648            num_rows: sst_info.num_rows as u64,
649            num_row_groups: sst_info.num_row_groups,
650            sequence: NonZeroU64::new(max_sequence),
651            partition_expr,
652            num_series: sst_info.num_series,
653        }
654    }
655
656    fn new_write_request(
657        &self,
658        version: &VersionRef,
659        max_sequence: u64,
660        source: Either<Source, FlatSource>,
661    ) -> SstWriteRequest {
662        SstWriteRequest {
663            op_type: OperationType::Flush,
664            metadata: version.metadata.clone(),
665            source,
666            cache_manager: self.cache_manager.clone(),
667            storage: version.options.storage.clone(),
668            max_sequence: Some(max_sequence),
669            index_options: self.index_options.clone(),
670            index_config: self.engine_config.index.clone(),
671            inverted_index_config: self.engine_config.inverted_index.clone(),
672            fulltext_index_config: self.engine_config.fulltext_index.clone(),
673            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
674        }
675    }
676
677    /// Notify flush job status.
678    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
679        if let Err(e) = self
680            .request_sender
681            .send(WorkerRequestWithTime::new(request))
682            .await
683        {
684            error!(
685                "Failed to notify flush job status for region {}, request: {:?}",
686                self.region_id, e.0
687            );
688        }
689    }
690
691    /// Merge two flush tasks.
692    fn merge(&mut self, mut other: RegionFlushTask) {
693        assert_eq!(self.region_id, other.region_id);
694        // Now we only merge senders. They share the same flush reason.
695        self.senders.append(&mut other.senders);
696    }
697}
698
699struct FlushFlatMemResult {
700    num_encoded: usize,
701    max_sequence: u64,
702    num_sources: usize,
703    results: Vec<Result<(SstInfoArray, Metrics)>>,
704}
705
706struct DoFlushMemtablesResult {
707    file_metas: Vec<FileMeta>,
708    flushed_bytes: u64,
709    series_count: usize,
710    flush_metrics: Metrics,
711}
712
713/// Returns a [Source] for the given memtable.
714async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
715    let source = if mem_ranges.ranges.len() == 1 {
716        let only_range = mem_ranges.ranges.into_values().next().unwrap();
717        let iter = only_range.build_iter()?;
718        Source::Iter(iter)
719    } else {
720        // todo(hl): a workaround since sync version of MergeReader is wip.
721        let sources = mem_ranges
722            .ranges
723            .into_values()
724            .map(|r| r.build_iter().map(Source::Iter))
725            .collect::<Result<Vec<_>>>()?;
726        let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
727        let maybe_dedup = if options.append_mode {
728            // no dedup in append mode
729            Box::new(merge_reader) as _
730        } else {
731            // dedup according to merge mode
732            match options.merge_mode.unwrap_or(MergeMode::LastRow) {
733                MergeMode::LastRow => {
734                    Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
735                }
736                MergeMode::LastNonNull => Box::new(DedupReader::new(
737                    merge_reader,
738                    LastNonNull::new(false),
739                    None,
740                )) as _,
741            }
742        };
743        Source::Reader(maybe_dedup)
744    };
745    Ok(source)
746}
747
748struct FlatSources {
749    max_sequence: u64,
750    sources: SmallVec<[FlatSource; 4]>,
751    encoded: SmallVec<[EncodedRange; 4]>,
752}
753
754/// Returns the max sequence and [FlatSource] for the given memtable.
755fn memtable_flat_sources(
756    schema: SchemaRef,
757    mem_ranges: MemtableRanges,
758    options: &RegionOptions,
759    field_column_start: usize,
760) -> Result<FlatSources> {
761    let MemtableRanges { ranges, stats } = mem_ranges;
762    let max_sequence = stats.max_sequence();
763    let mut flat_sources = FlatSources {
764        max_sequence,
765        sources: SmallVec::new(),
766        encoded: SmallVec::new(),
767    };
768
769    if ranges.len() == 1 {
770        let only_range = ranges.into_values().next().unwrap();
771        if let Some(encoded) = only_range.encoded() {
772            flat_sources.encoded.push(encoded);
773        } else {
774            let iter = only_range.build_record_batch_iter(None)?;
775            // Dedup according to append mode and merge mode.
776            // Even single range may have duplicate rows.
777            let iter = maybe_dedup_one(options, field_column_start, iter);
778            flat_sources.sources.push(FlatSource::Iter(iter));
779        };
780    } else {
781        let min_flush_rows = stats.num_rows / 8;
782        let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
783        let mut last_iter_rows = 0;
784        let num_ranges = ranges.len();
785        let mut input_iters = Vec::with_capacity(num_ranges);
786        for (_range_id, range) in ranges {
787            if let Some(encoded) = range.encoded() {
788                flat_sources.encoded.push(encoded);
789                continue;
790            }
791
792            let iter = range.build_record_batch_iter(None)?;
793            input_iters.push(iter);
794            last_iter_rows += range.num_rows();
795
796            if last_iter_rows > min_flush_rows {
797                let maybe_dedup = merge_and_dedup(
798                    &schema,
799                    options,
800                    field_column_start,
801                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
802                )?;
803
804                flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
805                last_iter_rows = 0;
806            }
807        }
808
809        // Handle remaining iters.
810        if !input_iters.is_empty() {
811            let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
812
813            flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
814        }
815    }
816
817    Ok(flat_sources)
818}
819
820fn merge_and_dedup(
821    schema: &SchemaRef,
822    options: &RegionOptions,
823    field_column_start: usize,
824    input_iters: Vec<BoxedRecordBatchIterator>,
825) -> Result<BoxedRecordBatchIterator> {
826    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
827    let maybe_dedup = if options.append_mode {
828        // No dedup in append mode
829        Box::new(merge_iter) as _
830    } else {
831        // Dedup according to merge mode.
832        match options.merge_mode() {
833            MergeMode::LastRow => {
834                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
835            }
836            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
837                merge_iter,
838                FlatLastNonNull::new(field_column_start, false),
839            )) as _,
840        }
841    };
842    Ok(maybe_dedup)
843}
844
845fn maybe_dedup_one(
846    options: &RegionOptions,
847    field_column_start: usize,
848    input_iter: BoxedRecordBatchIterator,
849) -> BoxedRecordBatchIterator {
850    if options.append_mode {
851        // No dedup in append mode
852        input_iter
853    } else {
854        // Dedup according to merge mode.
855        match options.merge_mode() {
856            MergeMode::LastRow => {
857                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
858            }
859            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
860                input_iter,
861                FlatLastNonNull::new(field_column_start, false),
862            )),
863        }
864    }
865}
866
867/// Manages background flushes of a worker.
868pub(crate) struct FlushScheduler {
869    /// Tracks regions need to flush.
870    region_status: HashMap<RegionId, FlushStatus>,
871    /// Background job scheduler.
872    scheduler: SchedulerRef,
873}
874
875impl FlushScheduler {
876    /// Creates a new flush scheduler.
877    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
878        FlushScheduler {
879            region_status: HashMap::new(),
880            scheduler,
881        }
882    }
883
884    /// Returns true if the region already requested flush.
885    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
886        self.region_status.contains_key(&region_id)
887    }
888
889    fn schedule_flush_task(
890        &mut self,
891        version_control: &VersionControlRef,
892        task: RegionFlushTask,
893    ) -> Result<()> {
894        let region_id = task.region_id;
895
896        // If current region doesn't have flush status, we can flush the region directly.
897        if let Err(e) = version_control.freeze_mutable() {
898            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
899
900            return Err(e);
901        }
902        // Submit a flush job.
903        let job = task.into_flush_job(version_control);
904        if let Err(e) = self.scheduler.schedule(job) {
905            // If scheduler returns error, senders in the job will be dropped and waiters
906            // can get recv errors.
907            error!(e; "Failed to schedule flush job for region {}", region_id);
908
909            return Err(e);
910        }
911        Ok(())
912    }
913
914    /// Schedules a flush `task` for specific `region`.
915    pub(crate) fn schedule_flush(
916        &mut self,
917        region_id: RegionId,
918        version_control: &VersionControlRef,
919        task: RegionFlushTask,
920    ) -> Result<()> {
921        debug_assert_eq!(region_id, task.region_id);
922
923        let version = version_control.current().version;
924        if version.memtables.is_empty() {
925            debug_assert!(!self.region_status.contains_key(&region_id));
926            // The region has nothing to flush.
927            task.on_success();
928            return Ok(());
929        }
930
931        // Don't increase the counter if a region has nothing to flush.
932        FLUSH_REQUESTS_TOTAL
933            .with_label_values(&[task.reason.as_str()])
934            .inc();
935
936        // If current region has flush status, merge the task.
937        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
938            // Checks whether we can flush the region now.
939            debug!("Merging flush task for region {}", region_id);
940            flush_status.merge_task(task);
941            return Ok(());
942        }
943
944        self.schedule_flush_task(version_control, task)?;
945
946        // Add this region to status map.
947        let _ = self.region_status.insert(
948            region_id,
949            FlushStatus::new(region_id, version_control.clone()),
950        );
951
952        Ok(())
953    }
954
955    /// Notifies the scheduler that the flush job is finished.
956    ///
957    /// Returns all pending requests if the region doesn't need to flush again.
958    pub(crate) fn on_flush_success(
959        &mut self,
960        region_id: RegionId,
961    ) -> Option<(
962        Vec<SenderDdlRequest>,
963        Vec<SenderWriteRequest>,
964        Vec<SenderBulkRequest>,
965    )> {
966        let flush_status = self.region_status.get_mut(&region_id)?;
967        // If region doesn't have any pending flush task, we need to remove it from the status.
968        if flush_status.pending_task.is_none() {
969            // The region doesn't have any pending flush task.
970            // Safety: The flush status must exist.
971            debug!(
972                "Region {} doesn't have any pending flush task, removing it from the status",
973                region_id
974            );
975            let flush_status = self.region_status.remove(&region_id).unwrap();
976            return Some((
977                flush_status.pending_ddls,
978                flush_status.pending_writes,
979                flush_status.pending_bulk_writes,
980            ));
981        }
982
983        // If region has pending task, but has nothing to flush, we need to remove it from the status.
984        let version_data = flush_status.version_control.current();
985        if version_data.version.memtables.is_empty() {
986            // The region has nothing to flush, we also need to remove it from the status.
987            // Safety: The pending task is not None.
988            let task = flush_status.pending_task.take().unwrap();
989            // The region has nothing to flush. We can notify pending task.
990            task.on_success();
991            debug!(
992                "Region {} has nothing to flush, removing it from the status",
993                region_id
994            );
995            // Safety: The flush status must exist.
996            let flush_status = self.region_status.remove(&region_id).unwrap();
997            return Some((
998                flush_status.pending_ddls,
999                flush_status.pending_writes,
1000                flush_status.pending_bulk_writes,
1001            ));
1002        }
1003
1004        // If region has pending task and has something to flush, we need to schedule it.
1005        debug!("Scheduling pending flush task for region {}", region_id);
1006        // Safety: The flush status must exist.
1007        let task = flush_status.pending_task.take().unwrap();
1008        let version_control = flush_status.version_control.clone();
1009        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1010            error!(
1011                err;
1012                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1013            );
1014        }
1015        // We can flush the region again, keep it in the region status.
1016        None
1017    }
1018
1019    /// Notifies the scheduler that the flush job is failed.
1020    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1021        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1022
1023        FLUSH_FAILURE_TOTAL.inc();
1024
1025        // Remove this region.
1026        let Some(flush_status) = self.region_status.remove(&region_id) else {
1027            return;
1028        };
1029
1030        // Fast fail: cancels all pending tasks and sends error to their waiters.
1031        flush_status.on_failure(err);
1032    }
1033
1034    /// Notifies the scheduler that the region is dropped.
1035    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1036        self.remove_region_on_failure(
1037            region_id,
1038            Arc::new(RegionDroppedSnafu { region_id }.build()),
1039        );
1040    }
1041
1042    /// Notifies the scheduler that the region is closed.
1043    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1044        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1045    }
1046
1047    /// Notifies the scheduler that the region is truncated.
1048    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1049        self.remove_region_on_failure(
1050            region_id,
1051            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1052        );
1053    }
1054
1055    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1056        // Remove this region.
1057        let Some(flush_status) = self.region_status.remove(&region_id) else {
1058            return;
1059        };
1060
1061        // Notifies all pending tasks.
1062        flush_status.on_failure(err);
1063    }
1064
1065    /// Add ddl request to pending queue.
1066    ///
1067    /// # Panics
1068    /// Panics if region didn't request flush.
1069    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1070        let status = self.region_status.get_mut(&request.region_id).unwrap();
1071        status.pending_ddls.push(request);
1072    }
1073
1074    /// Add write request to pending queue.
1075    ///
1076    /// # Panics
1077    /// Panics if region didn't request flush.
1078    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1079        let status = self
1080            .region_status
1081            .get_mut(&request.request.region_id)
1082            .unwrap();
1083        status.pending_writes.push(request);
1084    }
1085
1086    /// Add bulk write request to pending queue.
1087    ///
1088    /// # Panics
1089    /// Panics if region didn't request flush.
1090    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1091        let status = self.region_status.get_mut(&request.region_id).unwrap();
1092        status.pending_bulk_writes.push(request);
1093    }
1094
1095    /// Returns true if the region has pending DDLs.
1096    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1097        self.region_status
1098            .get(&region_id)
1099            .map(|status| !status.pending_ddls.is_empty())
1100            .unwrap_or(false)
1101    }
1102}
1103
1104impl Drop for FlushScheduler {
1105    fn drop(&mut self) {
1106        for (region_id, flush_status) in self.region_status.drain() {
1107            // We are shutting down so notify all pending tasks.
1108            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1109        }
1110    }
1111}
1112
1113/// Flush status of a region scheduled by the [FlushScheduler].
1114///
1115/// Tracks running and pending flush tasks and all pending requests of a region.
1116struct FlushStatus {
1117    /// Current region.
1118    region_id: RegionId,
1119    /// Version control of the region.
1120    version_control: VersionControlRef,
1121    /// Task waiting for next flush.
1122    pending_task: Option<RegionFlushTask>,
1123    /// Pending ddl requests.
1124    pending_ddls: Vec<SenderDdlRequest>,
1125    /// Requests waiting to write after altering the region.
1126    pending_writes: Vec<SenderWriteRequest>,
1127    /// Bulk requests waiting to write after altering the region.
1128    pending_bulk_writes: Vec<SenderBulkRequest>,
1129}
1130
1131impl FlushStatus {
1132    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1133        FlushStatus {
1134            region_id,
1135            version_control,
1136            pending_task: None,
1137            pending_ddls: Vec::new(),
1138            pending_writes: Vec::new(),
1139            pending_bulk_writes: Vec::new(),
1140        }
1141    }
1142
1143    /// Merges the task to pending task.
1144    fn merge_task(&mut self, task: RegionFlushTask) {
1145        if let Some(pending) = &mut self.pending_task {
1146            pending.merge(task);
1147        } else {
1148            self.pending_task = Some(task);
1149        }
1150    }
1151
1152    fn on_failure(self, err: Arc<Error>) {
1153        if let Some(mut task) = self.pending_task {
1154            task.on_failure(err.clone());
1155        }
1156        for ddl in self.pending_ddls {
1157            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1158                region_id: self.region_id,
1159            }));
1160        }
1161        for write_req in self.pending_writes {
1162            write_req
1163                .sender
1164                .send(Err(err.clone()).context(FlushRegionSnafu {
1165                    region_id: self.region_id,
1166                }));
1167        }
1168    }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173    use mito_codec::row_converter::build_primary_key_codec;
1174    use tokio::sync::oneshot;
1175
1176    use super::*;
1177    use crate::cache::CacheManager;
1178    use crate::memtable::bulk::part::BulkPartConverter;
1179    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1180    use crate::memtable::{Memtable, RangesOptions};
1181    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1182    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1183    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1184    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1185
1186    #[test]
1187    fn test_get_mutable_limit() {
1188        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1189        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1190        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1191        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1192    }
1193
1194    #[test]
1195    fn test_over_mutable_limit() {
1196        // Mutable limit is 500.
1197        let manager = WriteBufferManagerImpl::new(1000);
1198        manager.reserve_mem(400);
1199        assert!(!manager.should_flush_engine());
1200        assert!(!manager.should_stall());
1201
1202        // More than mutable limit.
1203        manager.reserve_mem(400);
1204        assert!(manager.should_flush_engine());
1205
1206        // Freezes mutable.
1207        manager.schedule_free_mem(400);
1208        assert!(!manager.should_flush_engine());
1209        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1210        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1211
1212        // Releases immutable.
1213        manager.free_mem(400);
1214        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1215        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1216    }
1217
1218    #[test]
1219    fn test_over_global() {
1220        // Mutable limit is 500.
1221        let manager = WriteBufferManagerImpl::new(1000);
1222        manager.reserve_mem(1100);
1223        assert!(manager.should_stall());
1224        // Global usage is still 1100.
1225        manager.schedule_free_mem(200);
1226        assert!(manager.should_flush_engine());
1227        assert!(manager.should_stall());
1228
1229        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1230        manager.schedule_free_mem(450);
1231        assert!(manager.should_flush_engine());
1232        assert!(manager.should_stall());
1233
1234        // Now mutable is enough.
1235        manager.reserve_mem(50);
1236        assert!(manager.should_flush_engine());
1237        manager.reserve_mem(100);
1238        assert!(manager.should_flush_engine());
1239    }
1240
1241    #[test]
1242    fn test_manager_notify() {
1243        let (sender, receiver) = watch::channel(());
1244        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1245        manager.reserve_mem(500);
1246        assert!(!receiver.has_changed().unwrap());
1247        manager.schedule_free_mem(500);
1248        assert!(!receiver.has_changed().unwrap());
1249        manager.free_mem(500);
1250        assert!(receiver.has_changed().unwrap());
1251    }
1252
1253    #[tokio::test]
1254    async fn test_schedule_empty() {
1255        let env = SchedulerEnv::new().await;
1256        let (tx, _rx) = mpsc::channel(4);
1257        let mut scheduler = env.mock_flush_scheduler();
1258        let builder = VersionControlBuilder::new();
1259
1260        let version_control = Arc::new(builder.build());
1261        let (output_tx, output_rx) = oneshot::channel();
1262        let mut task = RegionFlushTask {
1263            region_id: builder.region_id(),
1264            reason: FlushReason::Others,
1265            senders: Vec::new(),
1266            request_sender: tx,
1267            access_layer: env.access_layer.clone(),
1268            listener: WorkerListener::default(),
1269            engine_config: Arc::new(MitoConfig::default()),
1270            row_group_size: None,
1271            cache_manager: Arc::new(CacheManager::default()),
1272            manifest_ctx: env
1273                .mock_manifest_context(version_control.current().version.metadata.clone())
1274                .await,
1275            index_options: IndexOptions::default(),
1276            flush_semaphore: Arc::new(Semaphore::new(2)),
1277            is_staging: false,
1278        };
1279        task.push_sender(OptionOutputTx::from(output_tx));
1280        scheduler
1281            .schedule_flush(builder.region_id(), &version_control, task)
1282            .unwrap();
1283        assert!(scheduler.region_status.is_empty());
1284        let output = output_rx.await.unwrap().unwrap();
1285        assert_eq!(output, 0);
1286        assert!(scheduler.region_status.is_empty());
1287    }
1288
1289    #[tokio::test]
1290    async fn test_schedule_pending_request() {
1291        let job_scheduler = Arc::new(VecScheduler::default());
1292        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1293        let (tx, _rx) = mpsc::channel(4);
1294        let mut scheduler = env.mock_flush_scheduler();
1295        let mut builder = VersionControlBuilder::new();
1296        // Overwrites the empty memtable builder.
1297        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1298        let version_control = Arc::new(builder.build());
1299        // Writes data to the memtable so it is not empty.
1300        let version_data = version_control.current();
1301        write_rows_to_version(&version_data.version, "host0", 0, 10);
1302        let manifest_ctx = env
1303            .mock_manifest_context(version_data.version.metadata.clone())
1304            .await;
1305        // Creates 3 tasks.
1306        let mut tasks: Vec<_> = (0..3)
1307            .map(|_| RegionFlushTask {
1308                region_id: builder.region_id(),
1309                reason: FlushReason::Others,
1310                senders: Vec::new(),
1311                request_sender: tx.clone(),
1312                access_layer: env.access_layer.clone(),
1313                listener: WorkerListener::default(),
1314                engine_config: Arc::new(MitoConfig::default()),
1315                row_group_size: None,
1316                cache_manager: Arc::new(CacheManager::default()),
1317                manifest_ctx: manifest_ctx.clone(),
1318                index_options: IndexOptions::default(),
1319                flush_semaphore: Arc::new(Semaphore::new(2)),
1320                is_staging: false,
1321            })
1322            .collect();
1323        // Schedule first task.
1324        let task = tasks.pop().unwrap();
1325        scheduler
1326            .schedule_flush(builder.region_id(), &version_control, task)
1327            .unwrap();
1328        // Should schedule 1 flush.
1329        assert_eq!(1, scheduler.region_status.len());
1330        assert_eq!(1, job_scheduler.num_jobs());
1331        // Check the new version.
1332        let version_data = version_control.current();
1333        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1334        // Schedule remaining tasks.
1335        let output_rxs: Vec<_> = tasks
1336            .into_iter()
1337            .map(|mut task| {
1338                let (output_tx, output_rx) = oneshot::channel();
1339                task.push_sender(OptionOutputTx::from(output_tx));
1340                scheduler
1341                    .schedule_flush(builder.region_id(), &version_control, task)
1342                    .unwrap();
1343                output_rx
1344            })
1345            .collect();
1346        // Assumes the flush job is finished.
1347        version_control.apply_edit(
1348            Some(RegionEdit {
1349                files_to_add: Vec::new(),
1350                files_to_remove: Vec::new(),
1351                timestamp_ms: None,
1352                compaction_time_window: None,
1353                flushed_entry_id: None,
1354                flushed_sequence: None,
1355                committed_sequence: None,
1356            }),
1357            &[0],
1358            builder.file_purger(),
1359        );
1360        scheduler.on_flush_success(builder.region_id());
1361        // No new flush task.
1362        assert_eq!(1, job_scheduler.num_jobs());
1363        // The flush status is cleared.
1364        assert!(scheduler.region_status.is_empty());
1365        for output_rx in output_rxs {
1366            let output = output_rx.await.unwrap().unwrap();
1367            assert_eq!(output, 0);
1368        }
1369    }
1370
1371    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1372    #[test]
1373    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1374        // Build test metadata and flat schema
1375        let metadata = metadata_for_test();
1376        let schema = to_flat_sst_arrow_schema(
1377            &metadata,
1378            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1379        );
1380
1381        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1382        // Two rows with identical keys and timestamps (ts = 1000), different field values
1383        let capacity = 16;
1384        let pk_codec = build_primary_key_codec(&metadata);
1385        let mut converter =
1386            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1387        let kvs = build_key_values_with_ts_seq_values(
1388            &metadata,
1389            "dup_key".to_string(),
1390            1,
1391            vec![1000i64, 1000i64].into_iter(),
1392            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1393            1,
1394        );
1395        converter.append_key_values(&kvs).unwrap();
1396        let part = converter.convert().unwrap();
1397
1398        // Helper to build MemtableRanges with a single range from one bulk part.
1399        // We use BulkMemtable directly because it produces record batch iterators.
1400        let build_ranges = |append_mode: bool| -> MemtableRanges {
1401            let memtable = crate::memtable::bulk::BulkMemtable::new(
1402                1,
1403                metadata.clone(),
1404                None,
1405                None,
1406                append_mode,
1407                MergeMode::LastRow,
1408            );
1409            memtable.write_bulk(part.clone()).unwrap();
1410            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1411        };
1412
1413        // Case 1: append_mode = false => dedup happens, total rows should be 1
1414        {
1415            let mem_ranges = build_ranges(false);
1416            assert_eq!(1, mem_ranges.ranges.len());
1417
1418            let options = RegionOptions {
1419                append_mode: false,
1420                merge_mode: Some(MergeMode::LastRow),
1421                ..Default::default()
1422            };
1423
1424            let flat_sources = memtable_flat_sources(
1425                schema.clone(),
1426                mem_ranges,
1427                &options,
1428                metadata.primary_key.len(),
1429            )
1430            .unwrap();
1431            assert!(flat_sources.encoded.is_empty());
1432            assert_eq!(1, flat_sources.sources.len());
1433
1434            // Consume the iterator and count rows
1435            let mut total_rows = 0usize;
1436            for source in flat_sources.sources {
1437                match source {
1438                    crate::read::FlatSource::Iter(iter) => {
1439                        for rb in iter {
1440                            total_rows += rb.unwrap().num_rows();
1441                        }
1442                    }
1443                    crate::read::FlatSource::Stream(_) => unreachable!(),
1444                }
1445            }
1446            assert_eq!(1, total_rows, "dedup should keep a single row");
1447        }
1448
1449        // Case 2: append_mode = true => no dedup, total rows should be 2
1450        {
1451            let mem_ranges = build_ranges(true);
1452            assert_eq!(1, mem_ranges.ranges.len());
1453
1454            let options = RegionOptions {
1455                append_mode: true,
1456                ..Default::default()
1457            };
1458
1459            let flat_sources =
1460                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1461                    .unwrap();
1462            assert!(flat_sources.encoded.is_empty());
1463            assert_eq!(1, flat_sources.sources.len());
1464
1465            let mut total_rows = 0usize;
1466            for source in flat_sources.sources {
1467                match source {
1468                    crate::read::FlatSource::Iter(iter) => {
1469                        for rb in iter {
1470                            total_rows += rb.unwrap().num_rows();
1471                        }
1472                    }
1473                    crate::read::FlatSource::Stream(_) => unreachable!(),
1474                }
1475            }
1476            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1477        }
1478    }
1479
1480    #[tokio::test]
1481    async fn test_schedule_pending_request_on_flush_success() {
1482        common_telemetry::init_default_ut_logging();
1483        let job_scheduler = Arc::new(VecScheduler::default());
1484        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1485        let (tx, _rx) = mpsc::channel(4);
1486        let mut scheduler = env.mock_flush_scheduler();
1487        let mut builder = VersionControlBuilder::new();
1488        // Overwrites the empty memtable builder.
1489        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1490        let version_control = Arc::new(builder.build());
1491        // Writes data to the memtable so it is not empty.
1492        let version_data = version_control.current();
1493        write_rows_to_version(&version_data.version, "host0", 0, 10);
1494        let manifest_ctx = env
1495            .mock_manifest_context(version_data.version.metadata.clone())
1496            .await;
1497        // Creates 2 tasks.
1498        let mut tasks: Vec<_> = (0..2)
1499            .map(|_| RegionFlushTask {
1500                region_id: builder.region_id(),
1501                reason: FlushReason::Others,
1502                senders: Vec::new(),
1503                request_sender: tx.clone(),
1504                access_layer: env.access_layer.clone(),
1505                listener: WorkerListener::default(),
1506                engine_config: Arc::new(MitoConfig::default()),
1507                row_group_size: None,
1508                cache_manager: Arc::new(CacheManager::default()),
1509                manifest_ctx: manifest_ctx.clone(),
1510                index_options: IndexOptions::default(),
1511                flush_semaphore: Arc::new(Semaphore::new(2)),
1512                is_staging: false,
1513            })
1514            .collect();
1515        // Schedule first task.
1516        let task = tasks.pop().unwrap();
1517        scheduler
1518            .schedule_flush(builder.region_id(), &version_control, task)
1519            .unwrap();
1520        // Should schedule 1 flush.
1521        assert_eq!(1, scheduler.region_status.len());
1522        assert_eq!(1, job_scheduler.num_jobs());
1523        // Schedule second task.
1524        let task = tasks.pop().unwrap();
1525        scheduler
1526            .schedule_flush(builder.region_id(), &version_control, task)
1527            .unwrap();
1528        assert!(
1529            scheduler
1530                .region_status
1531                .get(&builder.region_id())
1532                .unwrap()
1533                .pending_task
1534                .is_some()
1535        );
1536
1537        // Check the new version.
1538        let version_data = version_control.current();
1539        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1540        // Assumes the flush job is finished.
1541        version_control.apply_edit(
1542            Some(RegionEdit {
1543                files_to_add: Vec::new(),
1544                files_to_remove: Vec::new(),
1545                timestamp_ms: None,
1546                compaction_time_window: None,
1547                flushed_entry_id: None,
1548                flushed_sequence: None,
1549                committed_sequence: None,
1550            }),
1551            &[0],
1552            builder.file_purger(),
1553        );
1554        write_rows_to_version(&version_data.version, "host1", 0, 10);
1555        scheduler.on_flush_success(builder.region_id());
1556        assert_eq!(2, job_scheduler.num_jobs());
1557        // The pending task is cleared.
1558        assert!(
1559            scheduler
1560                .region_status
1561                .get(&builder.region_id())
1562                .unwrap()
1563                .pending_task
1564                .is_none()
1565        );
1566    }
1567}