mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod picker;
18pub mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{new_picker, CompactionTask};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::BoxedBatchReader;
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
66use crate::schedule::remote_job_scheduler::{
67    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74/// Region compaction request.
75pub struct CompactionRequest {
76    pub(crate) engine_config: Arc<MitoConfig>,
77    pub(crate) current_version: CompactionVersion,
78    pub(crate) access_layer: AccessLayerRef,
79    /// Sender to send notification to the region worker.
80    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
81    /// Waiters of the compaction request.
82    pub(crate) waiters: Vec<OutputTx>,
83    /// Start time of compaction task.
84    pub(crate) start_time: Instant,
85    pub(crate) cache_manager: CacheManagerRef,
86    pub(crate) manifest_ctx: ManifestContextRef,
87    pub(crate) listener: WorkerListener,
88    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89    pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93    pub(crate) fn region_id(&self) -> RegionId {
94        self.current_version.metadata.region_id
95    }
96}
97
98/// Compaction scheduler tracks and manages compaction tasks.
99pub(crate) struct CompactionScheduler {
100    scheduler: SchedulerRef,
101    /// Compacting regions.
102    region_status: HashMap<RegionId, CompactionStatus>,
103    /// Request sender of the worker that this scheduler belongs to.
104    request_sender: Sender<WorkerRequestWithTime>,
105    cache_manager: CacheManagerRef,
106    engine_config: Arc<MitoConfig>,
107    listener: WorkerListener,
108    /// Plugins for the compaction scheduler.
109    plugins: Plugins,
110}
111
112impl CompactionScheduler {
113    pub(crate) fn new(
114        scheduler: SchedulerRef,
115        request_sender: Sender<WorkerRequestWithTime>,
116        cache_manager: CacheManagerRef,
117        engine_config: Arc<MitoConfig>,
118        listener: WorkerListener,
119        plugins: Plugins,
120    ) -> Self {
121        Self {
122            scheduler,
123            region_status: HashMap::new(),
124            request_sender,
125            cache_manager,
126            engine_config,
127            listener,
128            plugins,
129        }
130    }
131
132    /// Schedules a compaction for the region.
133    #[allow(clippy::too_many_arguments)]
134    pub(crate) async fn schedule_compaction(
135        &mut self,
136        region_id: RegionId,
137        compact_options: compact_request::Options,
138        version_control: &VersionControlRef,
139        access_layer: &AccessLayerRef,
140        waiter: OptionOutputTx,
141        manifest_ctx: &ManifestContextRef,
142        schema_metadata_manager: SchemaMetadataManagerRef,
143        max_parallelism: usize,
144    ) -> Result<()> {
145        // skip compaction if region is in staging state
146        let current_state = manifest_ctx.current_state();
147        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
148            info!(
149                "Skipping compaction for region {} in staging mode, options: {:?}",
150                region_id, compact_options
151            );
152            waiter.send(Ok(0));
153            return Ok(());
154        }
155
156        if let Some(status) = self.region_status.get_mut(&region_id) {
157            match compact_options {
158                Options::Regular(_) => {
159                    // Region is compacting. Add the waiter to pending list.
160                    status.merge_waiter(waiter);
161                }
162                options @ Options::StrictWindow(_) => {
163                    // Incoming compaction request is manually triggered.
164                    status.set_pending_request(PendingCompaction {
165                        options,
166                        waiter,
167                        max_parallelism,
168                    });
169                    info!(
170                        "Region {} is compacting, manually compaction will be re-scheduled.",
171                        region_id
172                    );
173                }
174            }
175            return Ok(());
176        }
177
178        // The region can compact directly.
179        let mut status =
180            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
181        let request = status.new_compaction_request(
182            self.request_sender.clone(),
183            waiter,
184            self.engine_config.clone(),
185            self.cache_manager.clone(),
186            manifest_ctx,
187            self.listener.clone(),
188            schema_metadata_manager,
189            max_parallelism,
190        );
191        self.region_status.insert(region_id, status);
192        let result = self
193            .schedule_compaction_request(request, compact_options)
194            .await;
195
196        self.listener.on_compaction_scheduled(region_id);
197        result
198    }
199
200    /// Notifies the scheduler that the compaction job is finished successfully.
201    pub(crate) async fn on_compaction_finished(
202        &mut self,
203        region_id: RegionId,
204        manifest_ctx: &ManifestContextRef,
205        schema_metadata_manager: SchemaMetadataManagerRef,
206    ) {
207        let Some(status) = self.region_status.get_mut(&region_id) else {
208            return;
209        };
210
211        if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
212            let PendingCompaction {
213                options,
214                waiter,
215                max_parallelism,
216            } = pending_request;
217
218            let request = status.new_compaction_request(
219                self.request_sender.clone(),
220                waiter,
221                self.engine_config.clone(),
222                self.cache_manager.clone(),
223                manifest_ctx,
224                self.listener.clone(),
225                schema_metadata_manager,
226                max_parallelism,
227            );
228
229            if let Err(e) = self.schedule_compaction_request(request, options).await {
230                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
231            } else {
232                debug!(
233                    "Successfully scheduled manual compaction for region id: {}",
234                    region_id
235                );
236            }
237            return;
238        }
239
240        // We should always try to compact the region until picker returns None.
241        let request = status.new_compaction_request(
242            self.request_sender.clone(),
243            OptionOutputTx::none(),
244            self.engine_config.clone(),
245            self.cache_manager.clone(),
246            manifest_ctx,
247            self.listener.clone(),
248            schema_metadata_manager,
249            MAX_PARALLEL_COMPACTION,
250        );
251        // Try to schedule next compaction task for this region.
252        if let Err(e) = self
253            .schedule_compaction_request(
254                request,
255                compact_request::Options::Regular(Default::default()),
256            )
257            .await
258        {
259            error!(e; "Failed to schedule next compaction for region {}", region_id);
260        }
261    }
262
263    /// Notifies the scheduler that the compaction job is failed.
264    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
265        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
266        // Remove this region.
267        let Some(status) = self.region_status.remove(&region_id) else {
268            return;
269        };
270
271        // Fast fail: cancels all pending tasks and sends error to their waiters.
272        status.on_failure(err);
273    }
274
275    /// Notifies the scheduler that the region is dropped.
276    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
277        self.remove_region_on_failure(
278            region_id,
279            Arc::new(RegionDroppedSnafu { region_id }.build()),
280        );
281    }
282
283    /// Notifies the scheduler that the region is closed.
284    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
285        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
286    }
287
288    /// Notifies the scheduler that the region is truncated.
289    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
290        self.remove_region_on_failure(
291            region_id,
292            Arc::new(RegionTruncatedSnafu { region_id }.build()),
293        );
294    }
295
296    /// Schedules a compaction request.
297    ///
298    /// If the region has nothing to compact, it removes the region from the status map.
299    async fn schedule_compaction_request(
300        &mut self,
301        request: CompactionRequest,
302        options: compact_request::Options,
303    ) -> Result<()> {
304        let picker = new_picker(
305            &options,
306            &request.current_version.options.compaction,
307            request.current_version.options.append_mode,
308        );
309        let region_id = request.region_id();
310        let CompactionRequest {
311            engine_config,
312            current_version,
313            access_layer,
314            request_sender,
315            waiters,
316            start_time,
317            cache_manager,
318            manifest_ctx,
319            listener,
320            schema_metadata_manager,
321            max_parallelism,
322        } = request;
323
324        let ttl = find_ttl(
325            region_id.table_id(),
326            current_version.options.ttl,
327            &schema_metadata_manager,
328        )
329        .await
330        .unwrap_or_else(|e| {
331            warn!(e; "Failed to get ttl for region: {}", region_id);
332            TimeToLive::default()
333        });
334
335        debug!(
336            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
337            picker, region_id, ttl
338        );
339
340        let compaction_region = CompactionRegion {
341            region_id,
342            current_version: current_version.clone(),
343            region_options: current_version.options.clone(),
344            engine_config: engine_config.clone(),
345            region_metadata: current_version.metadata.clone(),
346            cache_manager: cache_manager.clone(),
347            access_layer: access_layer.clone(),
348            manifest_ctx: manifest_ctx.clone(),
349            file_purger: None,
350            ttl: Some(ttl),
351            max_parallelism,
352        };
353
354        let picker_output = {
355            let _pick_timer = COMPACTION_STAGE_ELAPSED
356                .with_label_values(&["pick"])
357                .start_timer();
358            picker.pick(&compaction_region)
359        };
360
361        let picker_output = if let Some(picker_output) = picker_output {
362            picker_output
363        } else {
364            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
365            for waiter in waiters {
366                waiter.send(Ok(0));
367            }
368            self.region_status.remove(&region_id);
369            return Ok(());
370        };
371
372        // If specified to run compaction remotely, we schedule the compaction job remotely.
373        // It will fall back to local compaction if there is no remote job scheduler.
374        let waiters = if current_version.options.compaction.remote_compaction() {
375            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
376                let remote_compaction_job = CompactionJob {
377                    compaction_region: compaction_region.clone(),
378                    picker_output: picker_output.clone(),
379                    start_time,
380                    waiters,
381                    ttl,
382                };
383
384                let result = remote_job_scheduler
385                    .schedule(
386                        RemoteJob::CompactionJob(remote_compaction_job),
387                        Box::new(DefaultNotifier {
388                            request_sender: request_sender.clone(),
389                        }),
390                    )
391                    .await;
392
393                match result {
394                    Ok(job_id) => {
395                        info!(
396                            "Scheduled remote compaction job {} for region {}",
397                            job_id, region_id
398                        );
399                        INFLIGHT_COMPACTION_COUNT.inc();
400                        return Ok(());
401                    }
402                    Err(e) => {
403                        if !current_version.options.compaction.fallback_to_local() {
404                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
405                            return RemoteCompactionSnafu {
406                                region_id,
407                                job_id: None,
408                                reason: e.reason,
409                            }
410                            .fail();
411                        }
412
413                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
414
415                        // Return the waiters back to the caller for local compaction.
416                        e.waiters
417                    }
418                }
419            } else {
420                debug!(
421                    "Remote compaction is not enabled, fallback to local compaction for region {}",
422                    region_id
423                );
424                waiters
425            }
426        } else {
427            waiters
428        };
429
430        // Create a local compaction task.
431        let mut local_compaction_task = Box::new(CompactionTaskImpl {
432            request_sender,
433            waiters,
434            start_time,
435            listener,
436            picker_output,
437            compaction_region,
438            compactor: Arc::new(DefaultCompactor {}),
439        });
440
441        // Submit the compaction task.
442        self.scheduler
443            .schedule(Box::pin(async move {
444                INFLIGHT_COMPACTION_COUNT.inc();
445                local_compaction_task.run().await;
446                INFLIGHT_COMPACTION_COUNT.dec();
447            }))
448            .map_err(|e| {
449                error!(e; "Failed to submit compaction request for region {}", region_id);
450                // If failed to submit the job, we need to remove the region from the scheduler.
451                self.region_status.remove(&region_id);
452                e
453            })
454    }
455
456    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
457        // Remove this region.
458        let Some(status) = self.region_status.remove(&region_id) else {
459            return;
460        };
461
462        // Notifies all pending tasks.
463        status.on_failure(err);
464    }
465}
466
467impl Drop for CompactionScheduler {
468    fn drop(&mut self) {
469        for (region_id, status) in self.region_status.drain() {
470            // We are shutting down so notify all pending tasks.
471            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
472        }
473    }
474}
475
476/// Finds TTL of table by first examine table options then database options.
477async fn find_ttl(
478    table_id: TableId,
479    table_ttl: Option<TimeToLive>,
480    schema_metadata_manager: &SchemaMetadataManagerRef,
481) -> Result<TimeToLive> {
482    // If table TTL is set, we use it.
483    if let Some(table_ttl) = table_ttl {
484        return Ok(table_ttl);
485    }
486
487    let ttl = tokio::time::timeout(
488        crate::config::FETCH_OPTION_TIMEOUT,
489        schema_metadata_manager.get_schema_options_by_table_id(table_id),
490    )
491    .await
492    .context(TimeoutSnafu)?
493    .context(GetSchemaMetadataSnafu)?
494    .and_then(|options| options.ttl)
495    .unwrap_or_default()
496    .into();
497
498    Ok(ttl)
499}
500
501/// Status of running and pending region compaction tasks.
502struct CompactionStatus {
503    /// Id of the region.
504    region_id: RegionId,
505    /// Version control of the region.
506    version_control: VersionControlRef,
507    /// Access layer of the region.
508    access_layer: AccessLayerRef,
509    /// Pending waiters for compaction.
510    waiters: Vec<OutputTx>,
511    /// Pending compactions that are supposed to run as soon as current compaction task finished.
512    pending_request: Option<PendingCompaction>,
513}
514
515impl CompactionStatus {
516    /// Creates a new [CompactionStatus]
517    fn new(
518        region_id: RegionId,
519        version_control: VersionControlRef,
520        access_layer: AccessLayerRef,
521    ) -> CompactionStatus {
522        CompactionStatus {
523            region_id,
524            version_control,
525            access_layer,
526            waiters: Vec::new(),
527            pending_request: None,
528        }
529    }
530
531    /// Merge the waiter to the pending compaction.
532    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
533        if let Some(waiter) = waiter.take_inner() {
534            self.waiters.push(waiter);
535        }
536    }
537
538    /// Set pending compaction request or replace current value if already exist.
539    fn set_pending_request(&mut self, pending: PendingCompaction) {
540        if let Some(prev) = self.pending_request.replace(pending) {
541            debug!(
542                "Replace pending compaction options with new request {:?} for region: {}",
543                prev.options, self.region_id
544            );
545            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
546        }
547    }
548
549    fn on_failure(mut self, err: Arc<Error>) {
550        for waiter in self.waiters.drain(..) {
551            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
552                region_id: self.region_id,
553            }));
554        }
555
556        if let Some(pending_compaction) = self.pending_request {
557            pending_compaction
558                .waiter
559                .send(Err(err.clone()).context(CompactRegionSnafu {
560                    region_id: self.region_id,
561                }));
562        }
563    }
564
565    /// Creates a new compaction request for compaction picker.
566    ///
567    /// It consumes all pending compaction waiters.
568    #[allow(clippy::too_many_arguments)]
569    fn new_compaction_request(
570        &mut self,
571        request_sender: Sender<WorkerRequestWithTime>,
572        mut waiter: OptionOutputTx,
573        engine_config: Arc<MitoConfig>,
574        cache_manager: CacheManagerRef,
575        manifest_ctx: &ManifestContextRef,
576        listener: WorkerListener,
577        schema_metadata_manager: SchemaMetadataManagerRef,
578        max_parallelism: usize,
579    ) -> CompactionRequest {
580        let current_version = CompactionVersion::from(self.version_control.current().version);
581        let start_time = Instant::now();
582        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
583        waiters.extend(std::mem::take(&mut self.waiters));
584
585        if let Some(waiter) = waiter.take_inner() {
586            waiters.push(waiter);
587        }
588
589        CompactionRequest {
590            engine_config,
591            current_version,
592            access_layer: self.access_layer.clone(),
593            request_sender: request_sender.clone(),
594            waiters,
595            start_time,
596            cache_manager,
597            manifest_ctx: manifest_ctx.clone(),
598            listener,
599            schema_metadata_manager,
600            max_parallelism,
601        }
602    }
603}
604
605#[derive(Debug, Clone)]
606pub struct CompactionOutput {
607    /// Compaction output file level.
608    pub output_level: Level,
609    /// Compaction input files.
610    pub inputs: Vec<FileHandle>,
611    /// Whether to remove deletion markers.
612    pub filter_deleted: bool,
613    /// Compaction output time range. Only windowed compaction specifies output time range.
614    pub output_time_range: Option<TimestampRange>,
615}
616
617/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
618#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct SerializedCompactionOutput {
620    output_level: Level,
621    inputs: Vec<FileMeta>,
622    filter_deleted: bool,
623    output_time_range: Option<TimestampRange>,
624}
625
626/// Builders to create [BoxedBatchReader] for compaction.
627struct CompactionSstReaderBuilder<'a> {
628    metadata: RegionMetadataRef,
629    sst_layer: AccessLayerRef,
630    cache: CacheManagerRef,
631    inputs: &'a [FileHandle],
632    append_mode: bool,
633    filter_deleted: bool,
634    time_range: Option<TimestampRange>,
635    merge_mode: MergeMode,
636}
637
638impl CompactionSstReaderBuilder<'_> {
639    /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
640    async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
641        let mut scan_input = ScanInput::new(
642            self.sst_layer,
643            ProjectionMapper::all(&self.metadata, false)?,
644        )
645        .with_files(self.inputs.to_vec())
646        .with_append_mode(self.append_mode)
647        // We use special cache strategy for compaction.
648        .with_cache(CacheStrategy::Compaction(self.cache))
649        .with_filter_deleted(self.filter_deleted)
650        // We ignore file not found error during compaction.
651        .with_ignore_file_not_found(true)
652        .with_merge_mode(self.merge_mode);
653
654        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
655        // by converting time ranges into predicate.
656        if let Some(time_range) = self.time_range {
657            scan_input =
658                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
659        }
660
661        SeqScan::new(scan_input, true)
662            .build_reader_for_compaction()
663            .await
664    }
665}
666
667/// Converts time range to predicates so that rows outside the range will be filtered.
668fn time_range_to_predicate(
669    range: TimestampRange,
670    metadata: &RegionMetadataRef,
671) -> Result<PredicateGroup> {
672    let ts_col = metadata.time_index_column();
673
674    // safety: time index column's type must be a valid timestamp type.
675    let ts_col_unit = ts_col
676        .column_schema
677        .data_type
678        .as_timestamp()
679        .unwrap()
680        .unit();
681
682    let exprs = match (range.start(), range.end()) {
683        (Some(start), Some(end)) => {
684            vec![
685                datafusion_expr::col(ts_col.column_schema.name.clone())
686                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
687                datafusion_expr::col(ts_col.column_schema.name.clone())
688                    .lt(ts_to_lit(*end, ts_col_unit)?),
689            ]
690        }
691        (Some(start), None) => {
692            vec![datafusion_expr::col(ts_col.column_schema.name.clone())
693                .gt_eq(ts_to_lit(*start, ts_col_unit)?)]
694        }
695
696        (None, Some(end)) => {
697            vec![datafusion_expr::col(ts_col.column_schema.name.clone())
698                .lt(ts_to_lit(*end, ts_col_unit)?)]
699        }
700        (None, None) => {
701            return Ok(PredicateGroup::default());
702        }
703    };
704
705    let predicate = PredicateGroup::new(metadata, &exprs);
706    Ok(predicate)
707}
708
709fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
710    let ts = ts
711        .convert_to(ts_col_unit)
712        .context(TimeRangePredicateOverflowSnafu {
713            timestamp: ts,
714            unit: ts_col_unit,
715        })?;
716    let val = ts.value();
717    let scalar_value = match ts_col_unit {
718        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
719        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
720        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
721        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
722    };
723    Ok(datafusion_expr::lit(scalar_value))
724}
725
726/// Finds all expired SSTs across levels.
727fn get_expired_ssts(
728    levels: &[LevelMeta],
729    ttl: Option<TimeToLive>,
730    now: Timestamp,
731) -> Vec<FileHandle> {
732    let Some(ttl) = ttl else {
733        return vec![];
734    };
735
736    levels
737        .iter()
738        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
739        .collect()
740}
741
742/// Pending compaction request that is supposed to run after current task is finished,
743/// typically used for manual compactions.
744struct PendingCompaction {
745    /// Compaction options. Currently, it can only be [StrictWindow].
746    pub(crate) options: compact_request::Options,
747    /// Waiters of pending requests.
748    pub(crate) waiter: OptionOutputTx,
749    /// Max parallelism for pending compaction.
750    pub(crate) max_parallelism: usize,
751}
752
753#[cfg(test)]
754mod tests {
755    use api::v1::region::StrictWindow;
756    use common_datasource::compression::CompressionType;
757    use tokio::sync::oneshot;
758
759    use super::*;
760    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
761    use crate::region::ManifestContext;
762    use crate::test_util::mock_schema_metadata_manager;
763    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
764    use crate::test_util::version_util::{apply_edit, VersionControlBuilder};
765
766    #[tokio::test]
767    async fn test_schedule_empty() {
768        let env = SchedulerEnv::new().await;
769        let (tx, _rx) = mpsc::channel(4);
770        let mut scheduler = env.mock_compaction_scheduler(tx);
771        let mut builder = VersionControlBuilder::new();
772        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
773        schema_metadata_manager
774            .register_region_table_info(
775                builder.region_id().table_id(),
776                "test_table",
777                "test_catalog",
778                "test_schema",
779                None,
780                kv_backend,
781            )
782            .await;
783        // Nothing to compact.
784        let version_control = Arc::new(builder.build());
785        let (output_tx, output_rx) = oneshot::channel();
786        let waiter = OptionOutputTx::from(output_tx);
787        let manifest_ctx = env
788            .mock_manifest_context(version_control.current().version.metadata.clone())
789            .await;
790        scheduler
791            .schedule_compaction(
792                builder.region_id(),
793                compact_request::Options::Regular(Default::default()),
794                &version_control,
795                &env.access_layer,
796                waiter,
797                &manifest_ctx,
798                schema_metadata_manager.clone(),
799                1,
800            )
801            .await
802            .unwrap();
803        let output = output_rx.await.unwrap().unwrap();
804        assert_eq!(output, 0);
805        assert!(scheduler.region_status.is_empty());
806
807        // Only one file, picker won't compact it.
808        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
809        let (output_tx, output_rx) = oneshot::channel();
810        let waiter = OptionOutputTx::from(output_tx);
811        scheduler
812            .schedule_compaction(
813                builder.region_id(),
814                compact_request::Options::Regular(Default::default()),
815                &version_control,
816                &env.access_layer,
817                waiter,
818                &manifest_ctx,
819                schema_metadata_manager,
820                1,
821            )
822            .await
823            .unwrap();
824        let output = output_rx.await.unwrap().unwrap();
825        assert_eq!(output, 0);
826        assert!(scheduler.region_status.is_empty());
827    }
828
829    #[tokio::test]
830    async fn test_schedule_on_finished() {
831        common_telemetry::init_default_ut_logging();
832        let job_scheduler = Arc::new(VecScheduler::default());
833        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
834        let (tx, _rx) = mpsc::channel(4);
835        let mut scheduler = env.mock_compaction_scheduler(tx);
836        let mut builder = VersionControlBuilder::new();
837        let purger = builder.file_purger();
838        let region_id = builder.region_id();
839
840        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
841        schema_metadata_manager
842            .register_region_table_info(
843                builder.region_id().table_id(),
844                "test_table",
845                "test_catalog",
846                "test_schema",
847                None,
848                kv_backend,
849            )
850            .await;
851
852        // 5 files to compact.
853        let end = 1000 * 1000;
854        let version_control = Arc::new(
855            builder
856                .push_l0_file(0, end)
857                .push_l0_file(10, end)
858                .push_l0_file(50, end)
859                .push_l0_file(80, end)
860                .push_l0_file(90, end)
861                .build(),
862        );
863        let manifest_ctx = env
864            .mock_manifest_context(version_control.current().version.metadata.clone())
865            .await;
866        scheduler
867            .schedule_compaction(
868                region_id,
869                compact_request::Options::Regular(Default::default()),
870                &version_control,
871                &env.access_layer,
872                OptionOutputTx::none(),
873                &manifest_ctx,
874                schema_metadata_manager.clone(),
875                1,
876            )
877            .await
878            .unwrap();
879        // Should schedule 1 compaction.
880        assert_eq!(1, scheduler.region_status.len());
881        assert_eq!(1, job_scheduler.num_jobs());
882        let data = version_control.current();
883        let file_metas: Vec<_> = data.version.ssts.levels()[0]
884            .files
885            .values()
886            .map(|file| file.meta_ref().clone())
887            .collect();
888
889        // 5 files for next compaction and removes old files.
890        apply_edit(
891            &version_control,
892            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
893            &file_metas,
894            purger.clone(),
895        );
896        // The task is pending.
897        let (tx, _rx) = oneshot::channel();
898        scheduler
899            .schedule_compaction(
900                region_id,
901                compact_request::Options::Regular(Default::default()),
902                &version_control,
903                &env.access_layer,
904                OptionOutputTx::new(Some(OutputTx::new(tx))),
905                &manifest_ctx,
906                schema_metadata_manager.clone(),
907                1,
908            )
909            .await
910            .unwrap();
911        assert_eq!(1, scheduler.region_status.len());
912        assert_eq!(1, job_scheduler.num_jobs());
913        assert!(!scheduler
914            .region_status
915            .get(&builder.region_id())
916            .unwrap()
917            .waiters
918            .is_empty());
919
920        // On compaction finished and schedule next compaction.
921        scheduler
922            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
923            .await;
924        assert_eq!(1, scheduler.region_status.len());
925        assert_eq!(2, job_scheduler.num_jobs());
926
927        // 5 files for next compaction.
928        apply_edit(
929            &version_control,
930            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
931            &[],
932            purger.clone(),
933        );
934        let (tx, _rx) = oneshot::channel();
935        // The task is pending.
936        scheduler
937            .schedule_compaction(
938                region_id,
939                compact_request::Options::Regular(Default::default()),
940                &version_control,
941                &env.access_layer,
942                OptionOutputTx::new(Some(OutputTx::new(tx))),
943                &manifest_ctx,
944                schema_metadata_manager,
945                1,
946            )
947            .await
948            .unwrap();
949        assert_eq!(2, job_scheduler.num_jobs());
950        assert!(!scheduler
951            .region_status
952            .get(&builder.region_id())
953            .unwrap()
954            .waiters
955            .is_empty());
956    }
957
958    #[tokio::test]
959    async fn test_manual_compaction_when_compaction_in_progress() {
960        common_telemetry::init_default_ut_logging();
961        let job_scheduler = Arc::new(VecScheduler::default());
962        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
963        let (tx, _rx) = mpsc::channel(4);
964        let mut scheduler = env.mock_compaction_scheduler(tx);
965        let mut builder = VersionControlBuilder::new();
966        let purger = builder.file_purger();
967        let region_id = builder.region_id();
968
969        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
970        schema_metadata_manager
971            .register_region_table_info(
972                builder.region_id().table_id(),
973                "test_table",
974                "test_catalog",
975                "test_schema",
976                None,
977                kv_backend,
978            )
979            .await;
980
981        // 5 files to compact.
982        let end = 1000 * 1000;
983        let version_control = Arc::new(
984            builder
985                .push_l0_file(0, end)
986                .push_l0_file(10, end)
987                .push_l0_file(50, end)
988                .push_l0_file(80, end)
989                .push_l0_file(90, end)
990                .build(),
991        );
992        let manifest_ctx = env
993            .mock_manifest_context(version_control.current().version.metadata.clone())
994            .await;
995
996        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
997            .files
998            .values()
999            .map(|file| file.meta_ref().clone())
1000            .collect();
1001
1002        // 5 files for next compaction and removes old files.
1003        apply_edit(
1004            &version_control,
1005            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1006            &file_metas,
1007            purger.clone(),
1008        );
1009
1010        scheduler
1011            .schedule_compaction(
1012                region_id,
1013                compact_request::Options::Regular(Default::default()),
1014                &version_control,
1015                &env.access_layer,
1016                OptionOutputTx::none(),
1017                &manifest_ctx,
1018                schema_metadata_manager.clone(),
1019                1,
1020            )
1021            .await
1022            .unwrap();
1023        // Should schedule 1 compaction.
1024        assert_eq!(1, scheduler.region_status.len());
1025        assert_eq!(1, job_scheduler.num_jobs());
1026        assert!(scheduler
1027            .region_status
1028            .get(&region_id)
1029            .unwrap()
1030            .pending_request
1031            .is_none());
1032
1033        // Schedule another manual compaction.
1034        let (tx, _rx) = oneshot::channel();
1035        scheduler
1036            .schedule_compaction(
1037                region_id,
1038                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1039                &version_control,
1040                &env.access_layer,
1041                OptionOutputTx::new(Some(OutputTx::new(tx))),
1042                &manifest_ctx,
1043                schema_metadata_manager.clone(),
1044                1,
1045            )
1046            .await
1047            .unwrap();
1048        assert_eq!(1, scheduler.region_status.len());
1049        // Current job num should be 1 since compaction is in progress.
1050        assert_eq!(1, job_scheduler.num_jobs());
1051        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1052        assert!(status.pending_request.is_some());
1053
1054        // On compaction finished and schedule next compaction.
1055        scheduler
1056            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1057            .await;
1058        assert_eq!(1, scheduler.region_status.len());
1059        assert_eq!(2, job_scheduler.num_jobs());
1060
1061        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1062        assert!(status.pending_request.is_none());
1063    }
1064
1065    #[tokio::test]
1066    async fn test_compaction_bypass_in_staging_mode() {
1067        let env = SchedulerEnv::new().await;
1068        let (tx, _rx) = mpsc::channel(4);
1069        let mut scheduler = env.mock_compaction_scheduler(tx);
1070
1071        // Create version control and manifest context for staging mode
1072        let builder = VersionControlBuilder::new();
1073        let version_control = Arc::new(builder.build());
1074        let region_id = version_control.current().version.metadata.region_id;
1075
1076        // Create staging manifest context using the same pattern as SchedulerEnv
1077        let staging_manifest_ctx = {
1078            let manager = RegionManifestManager::new(
1079                version_control.current().version.metadata.clone(),
1080                0,
1081                RegionManifestOptions {
1082                    manifest_dir: "".to_string(),
1083                    object_store: env.access_layer.object_store().clone(),
1084                    compress_type: CompressionType::Uncompressed,
1085                    checkpoint_distance: 10,
1086                    remove_file_options: Default::default(),
1087                },
1088                Default::default(),
1089                Default::default(),
1090            )
1091            .await
1092            .unwrap();
1093            Arc::new(ManifestContext::new(
1094                manager,
1095                RegionRoleState::Leader(RegionLeaderState::Staging),
1096            ))
1097        };
1098
1099        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1100
1101        // Test regular compaction bypass in staging mode
1102        let (tx, rx) = oneshot::channel();
1103        scheduler
1104            .schedule_compaction(
1105                region_id,
1106                compact_request::Options::Regular(Default::default()),
1107                &version_control,
1108                &env.access_layer,
1109                OptionOutputTx::new(Some(OutputTx::new(tx))),
1110                &staging_manifest_ctx,
1111                schema_metadata_manager,
1112                1,
1113            )
1114            .await
1115            .unwrap();
1116
1117        let result = rx.await.unwrap();
1118        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1119        assert_eq!(0, scheduler.region_status.len());
1120    }
1121}