mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod picker;
18pub mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{CompactionTask, new_picker};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
66use crate::schedule::remote_job_scheduler::{
67    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74/// Region compaction request.
75pub struct CompactionRequest {
76    pub(crate) engine_config: Arc<MitoConfig>,
77    pub(crate) current_version: CompactionVersion,
78    pub(crate) access_layer: AccessLayerRef,
79    /// Sender to send notification to the region worker.
80    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
81    /// Waiters of the compaction request.
82    pub(crate) waiters: Vec<OutputTx>,
83    /// Start time of compaction task.
84    pub(crate) start_time: Instant,
85    pub(crate) cache_manager: CacheManagerRef,
86    pub(crate) manifest_ctx: ManifestContextRef,
87    pub(crate) listener: WorkerListener,
88    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89    pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93    pub(crate) fn region_id(&self) -> RegionId {
94        self.current_version.metadata.region_id
95    }
96}
97
98/// Compaction scheduler tracks and manages compaction tasks.
99pub(crate) struct CompactionScheduler {
100    scheduler: SchedulerRef,
101    /// Compacting regions.
102    region_status: HashMap<RegionId, CompactionStatus>,
103    /// Request sender of the worker that this scheduler belongs to.
104    request_sender: Sender<WorkerRequestWithTime>,
105    cache_manager: CacheManagerRef,
106    engine_config: Arc<MitoConfig>,
107    listener: WorkerListener,
108    /// Plugins for the compaction scheduler.
109    plugins: Plugins,
110}
111
112impl CompactionScheduler {
113    pub(crate) fn new(
114        scheduler: SchedulerRef,
115        request_sender: Sender<WorkerRequestWithTime>,
116        cache_manager: CacheManagerRef,
117        engine_config: Arc<MitoConfig>,
118        listener: WorkerListener,
119        plugins: Plugins,
120    ) -> Self {
121        Self {
122            scheduler,
123            region_status: HashMap::new(),
124            request_sender,
125            cache_manager,
126            engine_config,
127            listener,
128            plugins,
129        }
130    }
131
132    /// Schedules a compaction for the region.
133    #[allow(clippy::too_many_arguments)]
134    pub(crate) async fn schedule_compaction(
135        &mut self,
136        region_id: RegionId,
137        compact_options: compact_request::Options,
138        version_control: &VersionControlRef,
139        access_layer: &AccessLayerRef,
140        waiter: OptionOutputTx,
141        manifest_ctx: &ManifestContextRef,
142        schema_metadata_manager: SchemaMetadataManagerRef,
143        max_parallelism: usize,
144    ) -> Result<()> {
145        // skip compaction if region is in staging state
146        let current_state = manifest_ctx.current_state();
147        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
148            info!(
149                "Skipping compaction for region {} in staging mode, options: {:?}",
150                region_id, compact_options
151            );
152            waiter.send(Ok(0));
153            return Ok(());
154        }
155
156        if let Some(status) = self.region_status.get_mut(&region_id) {
157            match compact_options {
158                Options::Regular(_) => {
159                    // Region is compacting. Add the waiter to pending list.
160                    status.merge_waiter(waiter);
161                }
162                options @ Options::StrictWindow(_) => {
163                    // Incoming compaction request is manually triggered.
164                    status.set_pending_request(PendingCompaction {
165                        options,
166                        waiter,
167                        max_parallelism,
168                    });
169                    info!(
170                        "Region {} is compacting, manually compaction will be re-scheduled.",
171                        region_id
172                    );
173                }
174            }
175            return Ok(());
176        }
177
178        // The region can compact directly.
179        let mut status =
180            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
181        let request = status.new_compaction_request(
182            self.request_sender.clone(),
183            waiter,
184            self.engine_config.clone(),
185            self.cache_manager.clone(),
186            manifest_ctx,
187            self.listener.clone(),
188            schema_metadata_manager,
189            max_parallelism,
190        );
191        self.region_status.insert(region_id, status);
192        let result = self
193            .schedule_compaction_request(request, compact_options)
194            .await;
195
196        self.listener.on_compaction_scheduled(region_id);
197        result
198    }
199
200    /// Notifies the scheduler that the compaction job is finished successfully.
201    pub(crate) async fn on_compaction_finished(
202        &mut self,
203        region_id: RegionId,
204        manifest_ctx: &ManifestContextRef,
205        schema_metadata_manager: SchemaMetadataManagerRef,
206    ) {
207        let Some(status) = self.region_status.get_mut(&region_id) else {
208            return;
209        };
210
211        if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
212            let PendingCompaction {
213                options,
214                waiter,
215                max_parallelism,
216            } = pending_request;
217
218            let request = status.new_compaction_request(
219                self.request_sender.clone(),
220                waiter,
221                self.engine_config.clone(),
222                self.cache_manager.clone(),
223                manifest_ctx,
224                self.listener.clone(),
225                schema_metadata_manager,
226                max_parallelism,
227            );
228
229            if let Err(e) = self.schedule_compaction_request(request, options).await {
230                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
231            } else {
232                debug!(
233                    "Successfully scheduled manual compaction for region id: {}",
234                    region_id
235                );
236            }
237            return;
238        }
239
240        // We should always try to compact the region until picker returns None.
241        let request = status.new_compaction_request(
242            self.request_sender.clone(),
243            OptionOutputTx::none(),
244            self.engine_config.clone(),
245            self.cache_manager.clone(),
246            manifest_ctx,
247            self.listener.clone(),
248            schema_metadata_manager,
249            MAX_PARALLEL_COMPACTION,
250        );
251        // Try to schedule next compaction task for this region.
252        if let Err(e) = self
253            .schedule_compaction_request(
254                request,
255                compact_request::Options::Regular(Default::default()),
256            )
257            .await
258        {
259            error!(e; "Failed to schedule next compaction for region {}", region_id);
260        }
261    }
262
263    /// Notifies the scheduler that the compaction job is failed.
264    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
265        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
266        // Remove this region.
267        let Some(status) = self.region_status.remove(&region_id) else {
268            return;
269        };
270
271        // Fast fail: cancels all pending tasks and sends error to their waiters.
272        status.on_failure(err);
273    }
274
275    /// Notifies the scheduler that the region is dropped.
276    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
277        self.remove_region_on_failure(
278            region_id,
279            Arc::new(RegionDroppedSnafu { region_id }.build()),
280        );
281    }
282
283    /// Notifies the scheduler that the region is closed.
284    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
285        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
286    }
287
288    /// Notifies the scheduler that the region is truncated.
289    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
290        self.remove_region_on_failure(
291            region_id,
292            Arc::new(RegionTruncatedSnafu { region_id }.build()),
293        );
294    }
295
296    /// Schedules a compaction request.
297    ///
298    /// If the region has nothing to compact, it removes the region from the status map.
299    async fn schedule_compaction_request(
300        &mut self,
301        request: CompactionRequest,
302        options: compact_request::Options,
303    ) -> Result<()> {
304        let picker = new_picker(
305            &options,
306            &request.current_version.options.compaction,
307            request.current_version.options.append_mode,
308        );
309        let region_id = request.region_id();
310        let CompactionRequest {
311            engine_config,
312            current_version,
313            access_layer,
314            request_sender,
315            waiters,
316            start_time,
317            cache_manager,
318            manifest_ctx,
319            listener,
320            schema_metadata_manager,
321            max_parallelism,
322        } = request;
323
324        let ttl = find_ttl(
325            region_id.table_id(),
326            current_version.options.ttl,
327            &schema_metadata_manager,
328        )
329        .await
330        .unwrap_or_else(|e| {
331            warn!(e; "Failed to get ttl for region: {}", region_id);
332            TimeToLive::default()
333        });
334
335        debug!(
336            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
337            picker, region_id, ttl
338        );
339
340        let compaction_region = CompactionRegion {
341            region_id,
342            current_version: current_version.clone(),
343            region_options: current_version.options.clone(),
344            engine_config: engine_config.clone(),
345            region_metadata: current_version.metadata.clone(),
346            cache_manager: cache_manager.clone(),
347            access_layer: access_layer.clone(),
348            manifest_ctx: manifest_ctx.clone(),
349            file_purger: None,
350            ttl: Some(ttl),
351            max_parallelism,
352        };
353
354        let picker_output = {
355            let _pick_timer = COMPACTION_STAGE_ELAPSED
356                .with_label_values(&["pick"])
357                .start_timer();
358            picker.pick(&compaction_region)
359        };
360
361        let picker_output = if let Some(picker_output) = picker_output {
362            picker_output
363        } else {
364            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
365            for waiter in waiters {
366                waiter.send(Ok(0));
367            }
368            self.region_status.remove(&region_id);
369            return Ok(());
370        };
371
372        // If specified to run compaction remotely, we schedule the compaction job remotely.
373        // It will fall back to local compaction if there is no remote job scheduler.
374        let waiters = if current_version.options.compaction.remote_compaction() {
375            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
376                let remote_compaction_job = CompactionJob {
377                    compaction_region: compaction_region.clone(),
378                    picker_output: picker_output.clone(),
379                    start_time,
380                    waiters,
381                    ttl,
382                };
383
384                let result = remote_job_scheduler
385                    .schedule(
386                        RemoteJob::CompactionJob(remote_compaction_job),
387                        Box::new(DefaultNotifier {
388                            request_sender: request_sender.clone(),
389                        }),
390                    )
391                    .await;
392
393                match result {
394                    Ok(job_id) => {
395                        info!(
396                            "Scheduled remote compaction job {} for region {}",
397                            job_id, region_id
398                        );
399                        INFLIGHT_COMPACTION_COUNT.inc();
400                        return Ok(());
401                    }
402                    Err(e) => {
403                        if !current_version.options.compaction.fallback_to_local() {
404                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
405                            return RemoteCompactionSnafu {
406                                region_id,
407                                job_id: None,
408                                reason: e.reason,
409                            }
410                            .fail();
411                        }
412
413                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
414
415                        // Return the waiters back to the caller for local compaction.
416                        e.waiters
417                    }
418                }
419            } else {
420                debug!(
421                    "Remote compaction is not enabled, fallback to local compaction for region {}",
422                    region_id
423                );
424                waiters
425            }
426        } else {
427            waiters
428        };
429
430        // Create a local compaction task.
431        let mut local_compaction_task = Box::new(CompactionTaskImpl {
432            request_sender,
433            waiters,
434            start_time,
435            listener,
436            picker_output,
437            compaction_region,
438            compactor: Arc::new(DefaultCompactor {}),
439        });
440
441        // Submit the compaction task.
442        self.scheduler
443            .schedule(Box::pin(async move {
444                INFLIGHT_COMPACTION_COUNT.inc();
445                local_compaction_task.run().await;
446                INFLIGHT_COMPACTION_COUNT.dec();
447            }))
448            .map_err(|e| {
449                error!(e; "Failed to submit compaction request for region {}", region_id);
450                // If failed to submit the job, we need to remove the region from the scheduler.
451                self.region_status.remove(&region_id);
452                e
453            })
454    }
455
456    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
457        // Remove this region.
458        let Some(status) = self.region_status.remove(&region_id) else {
459            return;
460        };
461
462        // Notifies all pending tasks.
463        status.on_failure(err);
464    }
465}
466
467impl Drop for CompactionScheduler {
468    fn drop(&mut self) {
469        for (region_id, status) in self.region_status.drain() {
470            // We are shutting down so notify all pending tasks.
471            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
472        }
473    }
474}
475
476/// Finds TTL of table by first examine table options then database options.
477async fn find_ttl(
478    table_id: TableId,
479    table_ttl: Option<TimeToLive>,
480    schema_metadata_manager: &SchemaMetadataManagerRef,
481) -> Result<TimeToLive> {
482    // If table TTL is set, we use it.
483    if let Some(table_ttl) = table_ttl {
484        return Ok(table_ttl);
485    }
486
487    let ttl = tokio::time::timeout(
488        crate::config::FETCH_OPTION_TIMEOUT,
489        schema_metadata_manager.get_schema_options_by_table_id(table_id),
490    )
491    .await
492    .context(TimeoutSnafu)?
493    .context(GetSchemaMetadataSnafu)?
494    .and_then(|options| options.ttl)
495    .unwrap_or_default()
496    .into();
497
498    Ok(ttl)
499}
500
501/// Status of running and pending region compaction tasks.
502struct CompactionStatus {
503    /// Id of the region.
504    region_id: RegionId,
505    /// Version control of the region.
506    version_control: VersionControlRef,
507    /// Access layer of the region.
508    access_layer: AccessLayerRef,
509    /// Pending waiters for compaction.
510    waiters: Vec<OutputTx>,
511    /// Pending compactions that are supposed to run as soon as current compaction task finished.
512    pending_request: Option<PendingCompaction>,
513}
514
515impl CompactionStatus {
516    /// Creates a new [CompactionStatus]
517    fn new(
518        region_id: RegionId,
519        version_control: VersionControlRef,
520        access_layer: AccessLayerRef,
521    ) -> CompactionStatus {
522        CompactionStatus {
523            region_id,
524            version_control,
525            access_layer,
526            waiters: Vec::new(),
527            pending_request: None,
528        }
529    }
530
531    /// Merge the waiter to the pending compaction.
532    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
533        if let Some(waiter) = waiter.take_inner() {
534            self.waiters.push(waiter);
535        }
536    }
537
538    /// Set pending compaction request or replace current value if already exist.
539    fn set_pending_request(&mut self, pending: PendingCompaction) {
540        if let Some(prev) = self.pending_request.replace(pending) {
541            debug!(
542                "Replace pending compaction options with new request {:?} for region: {}",
543                prev.options, self.region_id
544            );
545            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
546        }
547    }
548
549    fn on_failure(mut self, err: Arc<Error>) {
550        for waiter in self.waiters.drain(..) {
551            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
552                region_id: self.region_id,
553            }));
554        }
555
556        if let Some(pending_compaction) = self.pending_request {
557            pending_compaction
558                .waiter
559                .send(Err(err.clone()).context(CompactRegionSnafu {
560                    region_id: self.region_id,
561                }));
562        }
563    }
564
565    /// Creates a new compaction request for compaction picker.
566    ///
567    /// It consumes all pending compaction waiters.
568    #[allow(clippy::too_many_arguments)]
569    fn new_compaction_request(
570        &mut self,
571        request_sender: Sender<WorkerRequestWithTime>,
572        mut waiter: OptionOutputTx,
573        engine_config: Arc<MitoConfig>,
574        cache_manager: CacheManagerRef,
575        manifest_ctx: &ManifestContextRef,
576        listener: WorkerListener,
577        schema_metadata_manager: SchemaMetadataManagerRef,
578        max_parallelism: usize,
579    ) -> CompactionRequest {
580        let current_version = CompactionVersion::from(self.version_control.current().version);
581        let start_time = Instant::now();
582        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
583        waiters.extend(std::mem::take(&mut self.waiters));
584
585        if let Some(waiter) = waiter.take_inner() {
586            waiters.push(waiter);
587        }
588
589        CompactionRequest {
590            engine_config,
591            current_version,
592            access_layer: self.access_layer.clone(),
593            request_sender: request_sender.clone(),
594            waiters,
595            start_time,
596            cache_manager,
597            manifest_ctx: manifest_ctx.clone(),
598            listener,
599            schema_metadata_manager,
600            max_parallelism,
601        }
602    }
603}
604
605#[derive(Debug, Clone)]
606pub struct CompactionOutput {
607    /// Compaction output file level.
608    pub output_level: Level,
609    /// Compaction input files.
610    pub inputs: Vec<FileHandle>,
611    /// Whether to remove deletion markers.
612    pub filter_deleted: bool,
613    /// Compaction output time range. Only windowed compaction specifies output time range.
614    pub output_time_range: Option<TimestampRange>,
615}
616
617/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
618#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct SerializedCompactionOutput {
620    output_level: Level,
621    inputs: Vec<FileMeta>,
622    filter_deleted: bool,
623    output_time_range: Option<TimestampRange>,
624}
625
626/// Builders to create [BoxedBatchReader] for compaction.
627struct CompactionSstReaderBuilder<'a> {
628    metadata: RegionMetadataRef,
629    sst_layer: AccessLayerRef,
630    cache: CacheManagerRef,
631    inputs: &'a [FileHandle],
632    append_mode: bool,
633    filter_deleted: bool,
634    time_range: Option<TimestampRange>,
635    merge_mode: MergeMode,
636}
637
638impl CompactionSstReaderBuilder<'_> {
639    /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
640    async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
641        let scan_input = self.build_scan_input(false)?.with_compaction(true);
642
643        SeqScan::new(scan_input).build_reader_for_compaction().await
644    }
645
646    /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
647    async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
648        let scan_input = self.build_scan_input(true)?.with_compaction(true);
649
650        SeqScan::new(scan_input)
651            .build_flat_reader_for_compaction()
652            .await
653    }
654
655    fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
656        let mut scan_input = ScanInput::new(
657            self.sst_layer,
658            ProjectionMapper::all(&self.metadata, flat_format)?,
659        )
660        .with_files(self.inputs.to_vec())
661        .with_append_mode(self.append_mode)
662        // We use special cache strategy for compaction.
663        .with_cache(CacheStrategy::Compaction(self.cache))
664        .with_filter_deleted(self.filter_deleted)
665        // We ignore file not found error during compaction.
666        .with_ignore_file_not_found(true)
667        .with_merge_mode(self.merge_mode)
668        .with_flat_format(flat_format);
669
670        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
671        // by converting time ranges into predicate.
672        if let Some(time_range) = self.time_range {
673            scan_input =
674                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
675        }
676
677        Ok(scan_input)
678    }
679}
680
681/// Converts time range to predicates so that rows outside the range will be filtered.
682fn time_range_to_predicate(
683    range: TimestampRange,
684    metadata: &RegionMetadataRef,
685) -> Result<PredicateGroup> {
686    let ts_col = metadata.time_index_column();
687
688    // safety: time index column's type must be a valid timestamp type.
689    let ts_col_unit = ts_col
690        .column_schema
691        .data_type
692        .as_timestamp()
693        .unwrap()
694        .unit();
695
696    let exprs = match (range.start(), range.end()) {
697        (Some(start), Some(end)) => {
698            vec![
699                datafusion_expr::col(ts_col.column_schema.name.clone())
700                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
701                datafusion_expr::col(ts_col.column_schema.name.clone())
702                    .lt(ts_to_lit(*end, ts_col_unit)?),
703            ]
704        }
705        (Some(start), None) => {
706            vec![
707                datafusion_expr::col(ts_col.column_schema.name.clone())
708                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
709            ]
710        }
711
712        (None, Some(end)) => {
713            vec![
714                datafusion_expr::col(ts_col.column_schema.name.clone())
715                    .lt(ts_to_lit(*end, ts_col_unit)?),
716            ]
717        }
718        (None, None) => {
719            return Ok(PredicateGroup::default());
720        }
721    };
722
723    let predicate = PredicateGroup::new(metadata, &exprs)?;
724    Ok(predicate)
725}
726
727fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
728    let ts = ts
729        .convert_to(ts_col_unit)
730        .context(TimeRangePredicateOverflowSnafu {
731            timestamp: ts,
732            unit: ts_col_unit,
733        })?;
734    let val = ts.value();
735    let scalar_value = match ts_col_unit {
736        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
737        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
738        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
739        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
740    };
741    Ok(datafusion_expr::lit(scalar_value))
742}
743
744/// Finds all expired SSTs across levels.
745fn get_expired_ssts(
746    levels: &[LevelMeta],
747    ttl: Option<TimeToLive>,
748    now: Timestamp,
749) -> Vec<FileHandle> {
750    let Some(ttl) = ttl else {
751        return vec![];
752    };
753
754    levels
755        .iter()
756        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
757        .collect()
758}
759
760/// Pending compaction request that is supposed to run after current task is finished,
761/// typically used for manual compactions.
762struct PendingCompaction {
763    /// Compaction options. Currently, it can only be [StrictWindow].
764    pub(crate) options: compact_request::Options,
765    /// Waiters of pending requests.
766    pub(crate) waiter: OptionOutputTx,
767    /// Max parallelism for pending compaction.
768    pub(crate) max_parallelism: usize,
769}
770
771#[cfg(test)]
772mod tests {
773    use api::v1::region::StrictWindow;
774    use common_datasource::compression::CompressionType;
775    use tokio::sync::oneshot;
776
777    use super::*;
778    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
779    use crate::region::ManifestContext;
780    use crate::sst::FormatType;
781    use crate::test_util::mock_schema_metadata_manager;
782    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
783    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
784
785    #[tokio::test]
786    async fn test_schedule_empty() {
787        let env = SchedulerEnv::new().await;
788        let (tx, _rx) = mpsc::channel(4);
789        let mut scheduler = env.mock_compaction_scheduler(tx);
790        let mut builder = VersionControlBuilder::new();
791        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
792        schema_metadata_manager
793            .register_region_table_info(
794                builder.region_id().table_id(),
795                "test_table",
796                "test_catalog",
797                "test_schema",
798                None,
799                kv_backend,
800            )
801            .await;
802        // Nothing to compact.
803        let version_control = Arc::new(builder.build());
804        let (output_tx, output_rx) = oneshot::channel();
805        let waiter = OptionOutputTx::from(output_tx);
806        let manifest_ctx = env
807            .mock_manifest_context(version_control.current().version.metadata.clone())
808            .await;
809        scheduler
810            .schedule_compaction(
811                builder.region_id(),
812                compact_request::Options::Regular(Default::default()),
813                &version_control,
814                &env.access_layer,
815                waiter,
816                &manifest_ctx,
817                schema_metadata_manager.clone(),
818                1,
819            )
820            .await
821            .unwrap();
822        let output = output_rx.await.unwrap().unwrap();
823        assert_eq!(output, 0);
824        assert!(scheduler.region_status.is_empty());
825
826        // Only one file, picker won't compact it.
827        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
828        let (output_tx, output_rx) = oneshot::channel();
829        let waiter = OptionOutputTx::from(output_tx);
830        scheduler
831            .schedule_compaction(
832                builder.region_id(),
833                compact_request::Options::Regular(Default::default()),
834                &version_control,
835                &env.access_layer,
836                waiter,
837                &manifest_ctx,
838                schema_metadata_manager,
839                1,
840            )
841            .await
842            .unwrap();
843        let output = output_rx.await.unwrap().unwrap();
844        assert_eq!(output, 0);
845        assert!(scheduler.region_status.is_empty());
846    }
847
848    #[tokio::test]
849    async fn test_schedule_on_finished() {
850        common_telemetry::init_default_ut_logging();
851        let job_scheduler = Arc::new(VecScheduler::default());
852        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
853        let (tx, _rx) = mpsc::channel(4);
854        let mut scheduler = env.mock_compaction_scheduler(tx);
855        let mut builder = VersionControlBuilder::new();
856        let purger = builder.file_purger();
857        let region_id = builder.region_id();
858
859        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
860        schema_metadata_manager
861            .register_region_table_info(
862                builder.region_id().table_id(),
863                "test_table",
864                "test_catalog",
865                "test_schema",
866                None,
867                kv_backend,
868            )
869            .await;
870
871        // 5 files to compact.
872        let end = 1000 * 1000;
873        let version_control = Arc::new(
874            builder
875                .push_l0_file(0, end)
876                .push_l0_file(10, end)
877                .push_l0_file(50, end)
878                .push_l0_file(80, end)
879                .push_l0_file(90, end)
880                .build(),
881        );
882        let manifest_ctx = env
883            .mock_manifest_context(version_control.current().version.metadata.clone())
884            .await;
885        scheduler
886            .schedule_compaction(
887                region_id,
888                compact_request::Options::Regular(Default::default()),
889                &version_control,
890                &env.access_layer,
891                OptionOutputTx::none(),
892                &manifest_ctx,
893                schema_metadata_manager.clone(),
894                1,
895            )
896            .await
897            .unwrap();
898        // Should schedule 1 compaction.
899        assert_eq!(1, scheduler.region_status.len());
900        assert_eq!(1, job_scheduler.num_jobs());
901        let data = version_control.current();
902        let file_metas: Vec<_> = data.version.ssts.levels()[0]
903            .files
904            .values()
905            .map(|file| file.meta_ref().clone())
906            .collect();
907
908        // 5 files for next compaction and removes old files.
909        apply_edit(
910            &version_control,
911            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
912            &file_metas,
913            purger.clone(),
914        );
915        // The task is pending.
916        let (tx, _rx) = oneshot::channel();
917        scheduler
918            .schedule_compaction(
919                region_id,
920                compact_request::Options::Regular(Default::default()),
921                &version_control,
922                &env.access_layer,
923                OptionOutputTx::new(Some(OutputTx::new(tx))),
924                &manifest_ctx,
925                schema_metadata_manager.clone(),
926                1,
927            )
928            .await
929            .unwrap();
930        assert_eq!(1, scheduler.region_status.len());
931        assert_eq!(1, job_scheduler.num_jobs());
932        assert!(
933            !scheduler
934                .region_status
935                .get(&builder.region_id())
936                .unwrap()
937                .waiters
938                .is_empty()
939        );
940
941        // On compaction finished and schedule next compaction.
942        scheduler
943            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
944            .await;
945        assert_eq!(1, scheduler.region_status.len());
946        assert_eq!(2, job_scheduler.num_jobs());
947
948        // 5 files for next compaction.
949        apply_edit(
950            &version_control,
951            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
952            &[],
953            purger.clone(),
954        );
955        let (tx, _rx) = oneshot::channel();
956        // The task is pending.
957        scheduler
958            .schedule_compaction(
959                region_id,
960                compact_request::Options::Regular(Default::default()),
961                &version_control,
962                &env.access_layer,
963                OptionOutputTx::new(Some(OutputTx::new(tx))),
964                &manifest_ctx,
965                schema_metadata_manager,
966                1,
967            )
968            .await
969            .unwrap();
970        assert_eq!(2, job_scheduler.num_jobs());
971        assert!(
972            !scheduler
973                .region_status
974                .get(&builder.region_id())
975                .unwrap()
976                .waiters
977                .is_empty()
978        );
979    }
980
981    #[tokio::test]
982    async fn test_manual_compaction_when_compaction_in_progress() {
983        common_telemetry::init_default_ut_logging();
984        let job_scheduler = Arc::new(VecScheduler::default());
985        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
986        let (tx, _rx) = mpsc::channel(4);
987        let mut scheduler = env.mock_compaction_scheduler(tx);
988        let mut builder = VersionControlBuilder::new();
989        let purger = builder.file_purger();
990        let region_id = builder.region_id();
991
992        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
993        schema_metadata_manager
994            .register_region_table_info(
995                builder.region_id().table_id(),
996                "test_table",
997                "test_catalog",
998                "test_schema",
999                None,
1000                kv_backend,
1001            )
1002            .await;
1003
1004        // 5 files to compact.
1005        let end = 1000 * 1000;
1006        let version_control = Arc::new(
1007            builder
1008                .push_l0_file(0, end)
1009                .push_l0_file(10, end)
1010                .push_l0_file(50, end)
1011                .push_l0_file(80, end)
1012                .push_l0_file(90, end)
1013                .build(),
1014        );
1015        let manifest_ctx = env
1016            .mock_manifest_context(version_control.current().version.metadata.clone())
1017            .await;
1018
1019        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1020            .files
1021            .values()
1022            .map(|file| file.meta_ref().clone())
1023            .collect();
1024
1025        // 5 files for next compaction and removes old files.
1026        apply_edit(
1027            &version_control,
1028            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1029            &file_metas,
1030            purger.clone(),
1031        );
1032
1033        scheduler
1034            .schedule_compaction(
1035                region_id,
1036                compact_request::Options::Regular(Default::default()),
1037                &version_control,
1038                &env.access_layer,
1039                OptionOutputTx::none(),
1040                &manifest_ctx,
1041                schema_metadata_manager.clone(),
1042                1,
1043            )
1044            .await
1045            .unwrap();
1046        // Should schedule 1 compaction.
1047        assert_eq!(1, scheduler.region_status.len());
1048        assert_eq!(1, job_scheduler.num_jobs());
1049        assert!(
1050            scheduler
1051                .region_status
1052                .get(&region_id)
1053                .unwrap()
1054                .pending_request
1055                .is_none()
1056        );
1057
1058        // Schedule another manual compaction.
1059        let (tx, _rx) = oneshot::channel();
1060        scheduler
1061            .schedule_compaction(
1062                region_id,
1063                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1064                &version_control,
1065                &env.access_layer,
1066                OptionOutputTx::new(Some(OutputTx::new(tx))),
1067                &manifest_ctx,
1068                schema_metadata_manager.clone(),
1069                1,
1070            )
1071            .await
1072            .unwrap();
1073        assert_eq!(1, scheduler.region_status.len());
1074        // Current job num should be 1 since compaction is in progress.
1075        assert_eq!(1, job_scheduler.num_jobs());
1076        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1077        assert!(status.pending_request.is_some());
1078
1079        // On compaction finished and schedule next compaction.
1080        scheduler
1081            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1082            .await;
1083        assert_eq!(1, scheduler.region_status.len());
1084        assert_eq!(2, job_scheduler.num_jobs());
1085
1086        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1087        assert!(status.pending_request.is_none());
1088    }
1089
1090    #[tokio::test]
1091    async fn test_compaction_bypass_in_staging_mode() {
1092        let env = SchedulerEnv::new().await;
1093        let (tx, _rx) = mpsc::channel(4);
1094        let mut scheduler = env.mock_compaction_scheduler(tx);
1095
1096        // Create version control and manifest context for staging mode
1097        let builder = VersionControlBuilder::new();
1098        let version_control = Arc::new(builder.build());
1099        let region_id = version_control.current().version.metadata.region_id;
1100
1101        // Create staging manifest context using the same pattern as SchedulerEnv
1102        let staging_manifest_ctx = {
1103            let manager = RegionManifestManager::new(
1104                version_control.current().version.metadata.clone(),
1105                0,
1106                RegionManifestOptions {
1107                    manifest_dir: "".to_string(),
1108                    object_store: env.access_layer.object_store().clone(),
1109                    compress_type: CompressionType::Uncompressed,
1110                    checkpoint_distance: 10,
1111                    remove_file_options: Default::default(),
1112                },
1113                Default::default(),
1114                Default::default(),
1115                FormatType::PrimaryKey,
1116            )
1117            .await
1118            .unwrap();
1119            Arc::new(ManifestContext::new(
1120                manager,
1121                RegionRoleState::Leader(RegionLeaderState::Staging),
1122            ))
1123        };
1124
1125        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1126
1127        // Test regular compaction bypass in staging mode
1128        let (tx, rx) = oneshot::channel();
1129        scheduler
1130            .schedule_compaction(
1131                region_id,
1132                compact_request::Options::Regular(Default::default()),
1133                &version_control,
1134                &env.access_layer,
1135                OptionOutputTx::new(Some(OutputTx::new(tx))),
1136                &staging_manifest_ctx,
1137                schema_metadata_manager,
1138                1,
1139            )
1140            .await
1141            .unwrap();
1142
1143        let result = rx.await.unwrap();
1144        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1145        assert_eq!(0, scheduler.region_status.len());
1146    }
1147}