mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod picker;
18pub mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{CompactionTask, new_picker};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
66use crate::schedule::remote_job_scheduler::{
67    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74/// Region compaction request.
75pub struct CompactionRequest {
76    pub(crate) engine_config: Arc<MitoConfig>,
77    pub(crate) current_version: CompactionVersion,
78    pub(crate) access_layer: AccessLayerRef,
79    /// Sender to send notification to the region worker.
80    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
81    /// Waiters of the compaction request.
82    pub(crate) waiters: Vec<OutputTx>,
83    /// Start time of compaction task.
84    pub(crate) start_time: Instant,
85    pub(crate) cache_manager: CacheManagerRef,
86    pub(crate) manifest_ctx: ManifestContextRef,
87    pub(crate) listener: WorkerListener,
88    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89    pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93    pub(crate) fn region_id(&self) -> RegionId {
94        self.current_version.metadata.region_id
95    }
96}
97
98/// Compaction scheduler tracks and manages compaction tasks.
99pub(crate) struct CompactionScheduler {
100    scheduler: SchedulerRef,
101    /// Compacting regions.
102    region_status: HashMap<RegionId, CompactionStatus>,
103    /// Request sender of the worker that this scheduler belongs to.
104    request_sender: Sender<WorkerRequestWithTime>,
105    cache_manager: CacheManagerRef,
106    engine_config: Arc<MitoConfig>,
107    listener: WorkerListener,
108    /// Plugins for the compaction scheduler.
109    plugins: Plugins,
110}
111
112impl CompactionScheduler {
113    pub(crate) fn new(
114        scheduler: SchedulerRef,
115        request_sender: Sender<WorkerRequestWithTime>,
116        cache_manager: CacheManagerRef,
117        engine_config: Arc<MitoConfig>,
118        listener: WorkerListener,
119        plugins: Plugins,
120    ) -> Self {
121        Self {
122            scheduler,
123            region_status: HashMap::new(),
124            request_sender,
125            cache_manager,
126            engine_config,
127            listener,
128            plugins,
129        }
130    }
131
132    /// Schedules a compaction for the region.
133    #[allow(clippy::too_many_arguments)]
134    pub(crate) async fn schedule_compaction(
135        &mut self,
136        region_id: RegionId,
137        compact_options: compact_request::Options,
138        version_control: &VersionControlRef,
139        access_layer: &AccessLayerRef,
140        waiter: OptionOutputTx,
141        manifest_ctx: &ManifestContextRef,
142        schema_metadata_manager: SchemaMetadataManagerRef,
143        max_parallelism: usize,
144    ) -> Result<()> {
145        // skip compaction if region is in staging state
146        let current_state = manifest_ctx.current_state();
147        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
148            info!(
149                "Skipping compaction for region {} in staging mode, options: {:?}",
150                region_id, compact_options
151            );
152            waiter.send(Ok(0));
153            return Ok(());
154        }
155
156        if let Some(status) = self.region_status.get_mut(&region_id) {
157            match compact_options {
158                Options::Regular(_) => {
159                    // Region is compacting. Add the waiter to pending list.
160                    status.merge_waiter(waiter);
161                }
162                options @ Options::StrictWindow(_) => {
163                    // Incoming compaction request is manually triggered.
164                    status.set_pending_request(PendingCompaction {
165                        options,
166                        waiter,
167                        max_parallelism,
168                    });
169                    info!(
170                        "Region {} is compacting, manually compaction will be re-scheduled.",
171                        region_id
172                    );
173                }
174            }
175            return Ok(());
176        }
177
178        // The region can compact directly.
179        let mut status =
180            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
181        let request = status.new_compaction_request(
182            self.request_sender.clone(),
183            waiter,
184            self.engine_config.clone(),
185            self.cache_manager.clone(),
186            manifest_ctx,
187            self.listener.clone(),
188            schema_metadata_manager,
189            max_parallelism,
190        );
191        self.region_status.insert(region_id, status);
192        let result = self
193            .schedule_compaction_request(request, compact_options)
194            .await;
195
196        self.listener.on_compaction_scheduled(region_id);
197        result
198    }
199
200    /// Notifies the scheduler that the compaction job is finished successfully.
201    pub(crate) async fn on_compaction_finished(
202        &mut self,
203        region_id: RegionId,
204        manifest_ctx: &ManifestContextRef,
205        schema_metadata_manager: SchemaMetadataManagerRef,
206    ) {
207        let Some(status) = self.region_status.get_mut(&region_id) else {
208            return;
209        };
210
211        if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
212            let PendingCompaction {
213                options,
214                waiter,
215                max_parallelism,
216            } = pending_request;
217
218            let request = status.new_compaction_request(
219                self.request_sender.clone(),
220                waiter,
221                self.engine_config.clone(),
222                self.cache_manager.clone(),
223                manifest_ctx,
224                self.listener.clone(),
225                schema_metadata_manager,
226                max_parallelism,
227            );
228
229            if let Err(e) = self.schedule_compaction_request(request, options).await {
230                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
231            } else {
232                debug!(
233                    "Successfully scheduled manual compaction for region id: {}",
234                    region_id
235                );
236            }
237            return;
238        }
239
240        // We should always try to compact the region until picker returns None.
241        let request = status.new_compaction_request(
242            self.request_sender.clone(),
243            OptionOutputTx::none(),
244            self.engine_config.clone(),
245            self.cache_manager.clone(),
246            manifest_ctx,
247            self.listener.clone(),
248            schema_metadata_manager,
249            MAX_PARALLEL_COMPACTION,
250        );
251        // Try to schedule next compaction task for this region.
252        if let Err(e) = self
253            .schedule_compaction_request(
254                request,
255                compact_request::Options::Regular(Default::default()),
256            )
257            .await
258        {
259            error!(e; "Failed to schedule next compaction for region {}", region_id);
260        }
261    }
262
263    /// Notifies the scheduler that the compaction job is failed.
264    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
265        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
266        // Remove this region.
267        let Some(status) = self.region_status.remove(&region_id) else {
268            return;
269        };
270
271        // Fast fail: cancels all pending tasks and sends error to their waiters.
272        status.on_failure(err);
273    }
274
275    /// Notifies the scheduler that the region is dropped.
276    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
277        self.remove_region_on_failure(
278            region_id,
279            Arc::new(RegionDroppedSnafu { region_id }.build()),
280        );
281    }
282
283    /// Notifies the scheduler that the region is closed.
284    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
285        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
286    }
287
288    /// Notifies the scheduler that the region is truncated.
289    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
290        self.remove_region_on_failure(
291            region_id,
292            Arc::new(RegionTruncatedSnafu { region_id }.build()),
293        );
294    }
295
296    /// Schedules a compaction request.
297    ///
298    /// If the region has nothing to compact, it removes the region from the status map.
299    async fn schedule_compaction_request(
300        &mut self,
301        request: CompactionRequest,
302        options: compact_request::Options,
303    ) -> Result<()> {
304        let picker = new_picker(
305            &options,
306            &request.current_version.options.compaction,
307            request.current_version.options.append_mode,
308            Some(self.engine_config.max_background_compactions),
309        );
310        let region_id = request.region_id();
311        let CompactionRequest {
312            engine_config,
313            current_version,
314            access_layer,
315            request_sender,
316            waiters,
317            start_time,
318            cache_manager,
319            manifest_ctx,
320            listener,
321            schema_metadata_manager,
322            max_parallelism,
323        } = request;
324
325        let ttl = find_ttl(
326            region_id.table_id(),
327            current_version.options.ttl,
328            &schema_metadata_manager,
329        )
330        .await
331        .unwrap_or_else(|e| {
332            warn!(e; "Failed to get ttl for region: {}", region_id);
333            TimeToLive::default()
334        });
335
336        debug!(
337            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
338            picker, region_id, ttl
339        );
340
341        let compaction_region = CompactionRegion {
342            region_id,
343            current_version: current_version.clone(),
344            region_options: current_version.options.clone(),
345            engine_config: engine_config.clone(),
346            region_metadata: current_version.metadata.clone(),
347            cache_manager: cache_manager.clone(),
348            access_layer: access_layer.clone(),
349            manifest_ctx: manifest_ctx.clone(),
350            file_purger: None,
351            ttl: Some(ttl),
352            max_parallelism,
353        };
354
355        let picker_output = {
356            let _pick_timer = COMPACTION_STAGE_ELAPSED
357                .with_label_values(&["pick"])
358                .start_timer();
359            picker.pick(&compaction_region)
360        };
361
362        let picker_output = if let Some(picker_output) = picker_output {
363            picker_output
364        } else {
365            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
366            for waiter in waiters {
367                waiter.send(Ok(0));
368            }
369            self.region_status.remove(&region_id);
370            return Ok(());
371        };
372
373        // If specified to run compaction remotely, we schedule the compaction job remotely.
374        // It will fall back to local compaction if there is no remote job scheduler.
375        let waiters = if current_version.options.compaction.remote_compaction() {
376            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
377                let remote_compaction_job = CompactionJob {
378                    compaction_region: compaction_region.clone(),
379                    picker_output: picker_output.clone(),
380                    start_time,
381                    waiters,
382                    ttl,
383                };
384
385                let result = remote_job_scheduler
386                    .schedule(
387                        RemoteJob::CompactionJob(remote_compaction_job),
388                        Box::new(DefaultNotifier {
389                            request_sender: request_sender.clone(),
390                        }),
391                    )
392                    .await;
393
394                match result {
395                    Ok(job_id) => {
396                        info!(
397                            "Scheduled remote compaction job {} for region {}",
398                            job_id, region_id
399                        );
400                        INFLIGHT_COMPACTION_COUNT.inc();
401                        return Ok(());
402                    }
403                    Err(e) => {
404                        if !current_version.options.compaction.fallback_to_local() {
405                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
406                            return RemoteCompactionSnafu {
407                                region_id,
408                                job_id: None,
409                                reason: e.reason,
410                            }
411                            .fail();
412                        }
413
414                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
415
416                        // Return the waiters back to the caller for local compaction.
417                        e.waiters
418                    }
419                }
420            } else {
421                debug!(
422                    "Remote compaction is not enabled, fallback to local compaction for region {}",
423                    region_id
424                );
425                waiters
426            }
427        } else {
428            waiters
429        };
430
431        // Create a local compaction task.
432        let mut local_compaction_task = Box::new(CompactionTaskImpl {
433            request_sender,
434            waiters,
435            start_time,
436            listener,
437            picker_output,
438            compaction_region,
439            compactor: Arc::new(DefaultCompactor {}),
440        });
441
442        // Submit the compaction task.
443        self.scheduler
444            .schedule(Box::pin(async move {
445                INFLIGHT_COMPACTION_COUNT.inc();
446                local_compaction_task.run().await;
447                INFLIGHT_COMPACTION_COUNT.dec();
448            }))
449            .map_err(|e| {
450                error!(e; "Failed to submit compaction request for region {}", region_id);
451                // If failed to submit the job, we need to remove the region from the scheduler.
452                self.region_status.remove(&region_id);
453                e
454            })
455    }
456
457    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
458        // Remove this region.
459        let Some(status) = self.region_status.remove(&region_id) else {
460            return;
461        };
462
463        // Notifies all pending tasks.
464        status.on_failure(err);
465    }
466}
467
468impl Drop for CompactionScheduler {
469    fn drop(&mut self) {
470        for (region_id, status) in self.region_status.drain() {
471            // We are shutting down so notify all pending tasks.
472            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
473        }
474    }
475}
476
477/// Finds TTL of table by first examine table options then database options.
478async fn find_ttl(
479    table_id: TableId,
480    table_ttl: Option<TimeToLive>,
481    schema_metadata_manager: &SchemaMetadataManagerRef,
482) -> Result<TimeToLive> {
483    // If table TTL is set, we use it.
484    if let Some(table_ttl) = table_ttl {
485        return Ok(table_ttl);
486    }
487
488    let ttl = tokio::time::timeout(
489        crate::config::FETCH_OPTION_TIMEOUT,
490        schema_metadata_manager.get_schema_options_by_table_id(table_id),
491    )
492    .await
493    .context(TimeoutSnafu)?
494    .context(GetSchemaMetadataSnafu)?
495    .and_then(|options| options.ttl)
496    .unwrap_or_default()
497    .into();
498
499    Ok(ttl)
500}
501
502/// Status of running and pending region compaction tasks.
503struct CompactionStatus {
504    /// Id of the region.
505    region_id: RegionId,
506    /// Version control of the region.
507    version_control: VersionControlRef,
508    /// Access layer of the region.
509    access_layer: AccessLayerRef,
510    /// Pending waiters for compaction.
511    waiters: Vec<OutputTx>,
512    /// Pending compactions that are supposed to run as soon as current compaction task finished.
513    pending_request: Option<PendingCompaction>,
514}
515
516impl CompactionStatus {
517    /// Creates a new [CompactionStatus]
518    fn new(
519        region_id: RegionId,
520        version_control: VersionControlRef,
521        access_layer: AccessLayerRef,
522    ) -> CompactionStatus {
523        CompactionStatus {
524            region_id,
525            version_control,
526            access_layer,
527            waiters: Vec::new(),
528            pending_request: None,
529        }
530    }
531
532    /// Merge the waiter to the pending compaction.
533    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
534        if let Some(waiter) = waiter.take_inner() {
535            self.waiters.push(waiter);
536        }
537    }
538
539    /// Set pending compaction request or replace current value if already exist.
540    fn set_pending_request(&mut self, pending: PendingCompaction) {
541        if let Some(prev) = self.pending_request.replace(pending) {
542            debug!(
543                "Replace pending compaction options with new request {:?} for region: {}",
544                prev.options, self.region_id
545            );
546            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
547        }
548    }
549
550    fn on_failure(mut self, err: Arc<Error>) {
551        for waiter in self.waiters.drain(..) {
552            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
553                region_id: self.region_id,
554            }));
555        }
556
557        if let Some(pending_compaction) = self.pending_request {
558            pending_compaction
559                .waiter
560                .send(Err(err.clone()).context(CompactRegionSnafu {
561                    region_id: self.region_id,
562                }));
563        }
564    }
565
566    /// Creates a new compaction request for compaction picker.
567    ///
568    /// It consumes all pending compaction waiters.
569    #[allow(clippy::too_many_arguments)]
570    fn new_compaction_request(
571        &mut self,
572        request_sender: Sender<WorkerRequestWithTime>,
573        mut waiter: OptionOutputTx,
574        engine_config: Arc<MitoConfig>,
575        cache_manager: CacheManagerRef,
576        manifest_ctx: &ManifestContextRef,
577        listener: WorkerListener,
578        schema_metadata_manager: SchemaMetadataManagerRef,
579        max_parallelism: usize,
580    ) -> CompactionRequest {
581        let current_version = CompactionVersion::from(self.version_control.current().version);
582        let start_time = Instant::now();
583        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
584        waiters.extend(std::mem::take(&mut self.waiters));
585
586        if let Some(waiter) = waiter.take_inner() {
587            waiters.push(waiter);
588        }
589
590        CompactionRequest {
591            engine_config,
592            current_version,
593            access_layer: self.access_layer.clone(),
594            request_sender: request_sender.clone(),
595            waiters,
596            start_time,
597            cache_manager,
598            manifest_ctx: manifest_ctx.clone(),
599            listener,
600            schema_metadata_manager,
601            max_parallelism,
602        }
603    }
604}
605
606#[derive(Debug, Clone)]
607pub struct CompactionOutput {
608    /// Compaction output file level.
609    pub output_level: Level,
610    /// Compaction input files.
611    pub inputs: Vec<FileHandle>,
612    /// Whether to remove deletion markers.
613    pub filter_deleted: bool,
614    /// Compaction output time range. Only windowed compaction specifies output time range.
615    pub output_time_range: Option<TimestampRange>,
616}
617
618/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
619#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct SerializedCompactionOutput {
621    output_level: Level,
622    inputs: Vec<FileMeta>,
623    filter_deleted: bool,
624    output_time_range: Option<TimestampRange>,
625}
626
627/// Builders to create [BoxedBatchReader] for compaction.
628struct CompactionSstReaderBuilder<'a> {
629    metadata: RegionMetadataRef,
630    sst_layer: AccessLayerRef,
631    cache: CacheManagerRef,
632    inputs: &'a [FileHandle],
633    append_mode: bool,
634    filter_deleted: bool,
635    time_range: Option<TimestampRange>,
636    merge_mode: MergeMode,
637}
638
639impl CompactionSstReaderBuilder<'_> {
640    /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
641    async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
642        let scan_input = self.build_scan_input(false)?.with_compaction(true);
643
644        SeqScan::new(scan_input).build_reader_for_compaction().await
645    }
646
647    /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
648    async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
649        let scan_input = self.build_scan_input(true)?.with_compaction(true);
650
651        SeqScan::new(scan_input)
652            .build_flat_reader_for_compaction()
653            .await
654    }
655
656    fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
657        let mut scan_input = ScanInput::new(
658            self.sst_layer,
659            ProjectionMapper::all(&self.metadata, flat_format)?,
660        )
661        .with_files(self.inputs.to_vec())
662        .with_append_mode(self.append_mode)
663        // We use special cache strategy for compaction.
664        .with_cache(CacheStrategy::Compaction(self.cache))
665        .with_filter_deleted(self.filter_deleted)
666        // We ignore file not found error during compaction.
667        .with_ignore_file_not_found(true)
668        .with_merge_mode(self.merge_mode)
669        .with_flat_format(flat_format);
670
671        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
672        // by converting time ranges into predicate.
673        if let Some(time_range) = self.time_range {
674            scan_input =
675                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
676        }
677
678        Ok(scan_input)
679    }
680}
681
682/// Converts time range to predicates so that rows outside the range will be filtered.
683fn time_range_to_predicate(
684    range: TimestampRange,
685    metadata: &RegionMetadataRef,
686) -> Result<PredicateGroup> {
687    let ts_col = metadata.time_index_column();
688
689    // safety: time index column's type must be a valid timestamp type.
690    let ts_col_unit = ts_col
691        .column_schema
692        .data_type
693        .as_timestamp()
694        .unwrap()
695        .unit();
696
697    let exprs = match (range.start(), range.end()) {
698        (Some(start), Some(end)) => {
699            vec![
700                datafusion_expr::col(ts_col.column_schema.name.clone())
701                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
702                datafusion_expr::col(ts_col.column_schema.name.clone())
703                    .lt(ts_to_lit(*end, ts_col_unit)?),
704            ]
705        }
706        (Some(start), None) => {
707            vec![
708                datafusion_expr::col(ts_col.column_schema.name.clone())
709                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
710            ]
711        }
712
713        (None, Some(end)) => {
714            vec![
715                datafusion_expr::col(ts_col.column_schema.name.clone())
716                    .lt(ts_to_lit(*end, ts_col_unit)?),
717            ]
718        }
719        (None, None) => {
720            return Ok(PredicateGroup::default());
721        }
722    };
723
724    let predicate = PredicateGroup::new(metadata, &exprs)?;
725    Ok(predicate)
726}
727
728fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
729    let ts = ts
730        .convert_to(ts_col_unit)
731        .context(TimeRangePredicateOverflowSnafu {
732            timestamp: ts,
733            unit: ts_col_unit,
734        })?;
735    let val = ts.value();
736    let scalar_value = match ts_col_unit {
737        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
738        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
739        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
740        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
741    };
742    Ok(datafusion_expr::lit(scalar_value))
743}
744
745/// Finds all expired SSTs across levels.
746fn get_expired_ssts(
747    levels: &[LevelMeta],
748    ttl: Option<TimeToLive>,
749    now: Timestamp,
750) -> Vec<FileHandle> {
751    let Some(ttl) = ttl else {
752        return vec![];
753    };
754
755    levels
756        .iter()
757        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
758        .collect()
759}
760
761/// Pending compaction request that is supposed to run after current task is finished,
762/// typically used for manual compactions.
763struct PendingCompaction {
764    /// Compaction options. Currently, it can only be [StrictWindow].
765    pub(crate) options: compact_request::Options,
766    /// Waiters of pending requests.
767    pub(crate) waiter: OptionOutputTx,
768    /// Max parallelism for pending compaction.
769    pub(crate) max_parallelism: usize,
770}
771
772#[cfg(test)]
773mod tests {
774    use api::v1::region::StrictWindow;
775    use common_datasource::compression::CompressionType;
776    use tokio::sync::oneshot;
777
778    use super::*;
779    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
780    use crate::region::ManifestContext;
781    use crate::sst::FormatType;
782    use crate::test_util::mock_schema_metadata_manager;
783    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
784    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
785
786    #[tokio::test]
787    async fn test_schedule_empty() {
788        let env = SchedulerEnv::new().await;
789        let (tx, _rx) = mpsc::channel(4);
790        let mut scheduler = env.mock_compaction_scheduler(tx);
791        let mut builder = VersionControlBuilder::new();
792        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
793        schema_metadata_manager
794            .register_region_table_info(
795                builder.region_id().table_id(),
796                "test_table",
797                "test_catalog",
798                "test_schema",
799                None,
800                kv_backend,
801            )
802            .await;
803        // Nothing to compact.
804        let version_control = Arc::new(builder.build());
805        let (output_tx, output_rx) = oneshot::channel();
806        let waiter = OptionOutputTx::from(output_tx);
807        let manifest_ctx = env
808            .mock_manifest_context(version_control.current().version.metadata.clone())
809            .await;
810        scheduler
811            .schedule_compaction(
812                builder.region_id(),
813                compact_request::Options::Regular(Default::default()),
814                &version_control,
815                &env.access_layer,
816                waiter,
817                &manifest_ctx,
818                schema_metadata_manager.clone(),
819                1,
820            )
821            .await
822            .unwrap();
823        let output = output_rx.await.unwrap().unwrap();
824        assert_eq!(output, 0);
825        assert!(scheduler.region_status.is_empty());
826
827        // Only one file, picker won't compact it.
828        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
829        let (output_tx, output_rx) = oneshot::channel();
830        let waiter = OptionOutputTx::from(output_tx);
831        scheduler
832            .schedule_compaction(
833                builder.region_id(),
834                compact_request::Options::Regular(Default::default()),
835                &version_control,
836                &env.access_layer,
837                waiter,
838                &manifest_ctx,
839                schema_metadata_manager,
840                1,
841            )
842            .await
843            .unwrap();
844        let output = output_rx.await.unwrap().unwrap();
845        assert_eq!(output, 0);
846        assert!(scheduler.region_status.is_empty());
847    }
848
849    #[tokio::test]
850    async fn test_schedule_on_finished() {
851        common_telemetry::init_default_ut_logging();
852        let job_scheduler = Arc::new(VecScheduler::default());
853        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
854        let (tx, _rx) = mpsc::channel(4);
855        let mut scheduler = env.mock_compaction_scheduler(tx);
856        let mut builder = VersionControlBuilder::new();
857        let purger = builder.file_purger();
858        let region_id = builder.region_id();
859
860        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
861        schema_metadata_manager
862            .register_region_table_info(
863                builder.region_id().table_id(),
864                "test_table",
865                "test_catalog",
866                "test_schema",
867                None,
868                kv_backend,
869            )
870            .await;
871
872        // 5 files to compact.
873        let end = 1000 * 1000;
874        let version_control = Arc::new(
875            builder
876                .push_l0_file(0, end)
877                .push_l0_file(10, end)
878                .push_l0_file(50, end)
879                .push_l0_file(80, end)
880                .push_l0_file(90, end)
881                .build(),
882        );
883        let manifest_ctx = env
884            .mock_manifest_context(version_control.current().version.metadata.clone())
885            .await;
886        scheduler
887            .schedule_compaction(
888                region_id,
889                compact_request::Options::Regular(Default::default()),
890                &version_control,
891                &env.access_layer,
892                OptionOutputTx::none(),
893                &manifest_ctx,
894                schema_metadata_manager.clone(),
895                1,
896            )
897            .await
898            .unwrap();
899        // Should schedule 1 compaction.
900        assert_eq!(1, scheduler.region_status.len());
901        assert_eq!(1, job_scheduler.num_jobs());
902        let data = version_control.current();
903        let file_metas: Vec<_> = data.version.ssts.levels()[0]
904            .files
905            .values()
906            .map(|file| file.meta_ref().clone())
907            .collect();
908
909        // 5 files for next compaction and removes old files.
910        apply_edit(
911            &version_control,
912            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
913            &file_metas,
914            purger.clone(),
915        );
916        // The task is pending.
917        let (tx, _rx) = oneshot::channel();
918        scheduler
919            .schedule_compaction(
920                region_id,
921                compact_request::Options::Regular(Default::default()),
922                &version_control,
923                &env.access_layer,
924                OptionOutputTx::new(Some(OutputTx::new(tx))),
925                &manifest_ctx,
926                schema_metadata_manager.clone(),
927                1,
928            )
929            .await
930            .unwrap();
931        assert_eq!(1, scheduler.region_status.len());
932        assert_eq!(1, job_scheduler.num_jobs());
933        assert!(
934            !scheduler
935                .region_status
936                .get(&builder.region_id())
937                .unwrap()
938                .waiters
939                .is_empty()
940        );
941
942        // On compaction finished and schedule next compaction.
943        scheduler
944            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
945            .await;
946        assert_eq!(1, scheduler.region_status.len());
947        assert_eq!(2, job_scheduler.num_jobs());
948
949        // 5 files for next compaction.
950        apply_edit(
951            &version_control,
952            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
953            &[],
954            purger.clone(),
955        );
956        let (tx, _rx) = oneshot::channel();
957        // The task is pending.
958        scheduler
959            .schedule_compaction(
960                region_id,
961                compact_request::Options::Regular(Default::default()),
962                &version_control,
963                &env.access_layer,
964                OptionOutputTx::new(Some(OutputTx::new(tx))),
965                &manifest_ctx,
966                schema_metadata_manager,
967                1,
968            )
969            .await
970            .unwrap();
971        assert_eq!(2, job_scheduler.num_jobs());
972        assert!(
973            !scheduler
974                .region_status
975                .get(&builder.region_id())
976                .unwrap()
977                .waiters
978                .is_empty()
979        );
980    }
981
982    #[tokio::test]
983    async fn test_manual_compaction_when_compaction_in_progress() {
984        common_telemetry::init_default_ut_logging();
985        let job_scheduler = Arc::new(VecScheduler::default());
986        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
987        let (tx, _rx) = mpsc::channel(4);
988        let mut scheduler = env.mock_compaction_scheduler(tx);
989        let mut builder = VersionControlBuilder::new();
990        let purger = builder.file_purger();
991        let region_id = builder.region_id();
992
993        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
994        schema_metadata_manager
995            .register_region_table_info(
996                builder.region_id().table_id(),
997                "test_table",
998                "test_catalog",
999                "test_schema",
1000                None,
1001                kv_backend,
1002            )
1003            .await;
1004
1005        // 5 files to compact.
1006        let end = 1000 * 1000;
1007        let version_control = Arc::new(
1008            builder
1009                .push_l0_file(0, end)
1010                .push_l0_file(10, end)
1011                .push_l0_file(50, end)
1012                .push_l0_file(80, end)
1013                .push_l0_file(90, end)
1014                .build(),
1015        );
1016        let manifest_ctx = env
1017            .mock_manifest_context(version_control.current().version.metadata.clone())
1018            .await;
1019
1020        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1021            .files
1022            .values()
1023            .map(|file| file.meta_ref().clone())
1024            .collect();
1025
1026        // 5 files for next compaction and removes old files.
1027        apply_edit(
1028            &version_control,
1029            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1030            &file_metas,
1031            purger.clone(),
1032        );
1033
1034        scheduler
1035            .schedule_compaction(
1036                region_id,
1037                compact_request::Options::Regular(Default::default()),
1038                &version_control,
1039                &env.access_layer,
1040                OptionOutputTx::none(),
1041                &manifest_ctx,
1042                schema_metadata_manager.clone(),
1043                1,
1044            )
1045            .await
1046            .unwrap();
1047        // Should schedule 1 compaction.
1048        assert_eq!(1, scheduler.region_status.len());
1049        assert_eq!(1, job_scheduler.num_jobs());
1050        assert!(
1051            scheduler
1052                .region_status
1053                .get(&region_id)
1054                .unwrap()
1055                .pending_request
1056                .is_none()
1057        );
1058
1059        // Schedule another manual compaction.
1060        let (tx, _rx) = oneshot::channel();
1061        scheduler
1062            .schedule_compaction(
1063                region_id,
1064                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1065                &version_control,
1066                &env.access_layer,
1067                OptionOutputTx::new(Some(OutputTx::new(tx))),
1068                &manifest_ctx,
1069                schema_metadata_manager.clone(),
1070                1,
1071            )
1072            .await
1073            .unwrap();
1074        assert_eq!(1, scheduler.region_status.len());
1075        // Current job num should be 1 since compaction is in progress.
1076        assert_eq!(1, job_scheduler.num_jobs());
1077        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1078        assert!(status.pending_request.is_some());
1079
1080        // On compaction finished and schedule next compaction.
1081        scheduler
1082            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1083            .await;
1084        assert_eq!(1, scheduler.region_status.len());
1085        assert_eq!(2, job_scheduler.num_jobs());
1086
1087        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1088        assert!(status.pending_request.is_none());
1089    }
1090
1091    #[tokio::test]
1092    async fn test_compaction_bypass_in_staging_mode() {
1093        let env = SchedulerEnv::new().await;
1094        let (tx, _rx) = mpsc::channel(4);
1095        let mut scheduler = env.mock_compaction_scheduler(tx);
1096
1097        // Create version control and manifest context for staging mode
1098        let builder = VersionControlBuilder::new();
1099        let version_control = Arc::new(builder.build());
1100        let region_id = version_control.current().version.metadata.region_id;
1101
1102        // Create staging manifest context using the same pattern as SchedulerEnv
1103        let staging_manifest_ctx = {
1104            let manager = RegionManifestManager::new(
1105                version_control.current().version.metadata.clone(),
1106                0,
1107                RegionManifestOptions {
1108                    manifest_dir: "".to_string(),
1109                    object_store: env.access_layer.object_store().clone(),
1110                    compress_type: CompressionType::Uncompressed,
1111                    checkpoint_distance: 10,
1112                    remove_file_options: Default::default(),
1113                },
1114                FormatType::PrimaryKey,
1115                &Default::default(),
1116            )
1117            .await
1118            .unwrap();
1119            Arc::new(ManifestContext::new(
1120                manager,
1121                RegionRoleState::Leader(RegionLeaderState::Staging),
1122            ))
1123        };
1124
1125        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1126
1127        // Test regular compaction bypass in staging mode
1128        let (tx, rx) = oneshot::channel();
1129        scheduler
1130            .schedule_compaction(
1131                region_id,
1132                compact_request::Options::Regular(Default::default()),
1133                &version_control,
1134                &env.access_layer,
1135                OptionOutputTx::new(Some(OutputTx::new(tx))),
1136                &staging_manifest_ctx,
1137                schema_metadata_manager,
1138                1,
1139            )
1140            .await
1141            .unwrap();
1142
1143        let result = rx.await.unwrap();
1144        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1145        assert_eq!(0, scheduler.region_status.len());
1146    }
1147}