mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::projection::ProjectionMapper;
62use crate::read::scan_region::{PredicateGroup, ScanInput};
63use crate::read::seq_scan::SeqScan;
64use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
65use crate::region::options::MergeMode;
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77/// Region compaction request.
78pub struct CompactionRequest {
79    pub(crate) engine_config: Arc<MitoConfig>,
80    pub(crate) current_version: CompactionVersion,
81    pub(crate) access_layer: AccessLayerRef,
82    /// Sender to send notification to the region worker.
83    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84    /// Waiters of the compaction request.
85    pub(crate) waiters: Vec<OutputTx>,
86    /// Start time of compaction task.
87    pub(crate) start_time: Instant,
88    pub(crate) cache_manager: CacheManagerRef,
89    pub(crate) manifest_ctx: ManifestContextRef,
90    pub(crate) listener: WorkerListener,
91    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92    pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96    pub(crate) fn region_id(&self) -> RegionId {
97        self.current_version.metadata.region_id
98    }
99}
100
101/// Compaction scheduler tracks and manages compaction tasks.
102pub(crate) struct CompactionScheduler {
103    scheduler: SchedulerRef,
104    /// Compacting regions.
105    region_status: HashMap<RegionId, CompactionStatus>,
106    /// Request sender of the worker that this scheduler belongs to.
107    request_sender: Sender<WorkerRequestWithTime>,
108    cache_manager: CacheManagerRef,
109    engine_config: Arc<MitoConfig>,
110    memory_manager: Arc<CompactionMemoryManager>,
111    memory_policy: OnExhaustedPolicy,
112    listener: WorkerListener,
113    /// Plugins for the compaction scheduler.
114    plugins: Plugins,
115}
116
117impl CompactionScheduler {
118    #[allow(clippy::too_many_arguments)]
119    pub(crate) fn new(
120        scheduler: SchedulerRef,
121        request_sender: Sender<WorkerRequestWithTime>,
122        cache_manager: CacheManagerRef,
123        engine_config: Arc<MitoConfig>,
124        listener: WorkerListener,
125        plugins: Plugins,
126        memory_manager: Arc<CompactionMemoryManager>,
127        memory_policy: OnExhaustedPolicy,
128    ) -> Self {
129        Self {
130            scheduler,
131            region_status: HashMap::new(),
132            request_sender,
133            cache_manager,
134            engine_config,
135            memory_manager,
136            memory_policy,
137            listener,
138            plugins,
139        }
140    }
141
142    /// Schedules a compaction for the region.
143    #[allow(clippy::too_many_arguments)]
144    pub(crate) async fn schedule_compaction(
145        &mut self,
146        region_id: RegionId,
147        compact_options: compact_request::Options,
148        version_control: &VersionControlRef,
149        access_layer: &AccessLayerRef,
150        waiter: OptionOutputTx,
151        manifest_ctx: &ManifestContextRef,
152        schema_metadata_manager: SchemaMetadataManagerRef,
153        max_parallelism: usize,
154    ) -> Result<()> {
155        // skip compaction if region is in staging state
156        let current_state = manifest_ctx.current_state();
157        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158            info!(
159                "Skipping compaction for region {} in staging mode, options: {:?}",
160                region_id, compact_options
161            );
162            waiter.send(Ok(0));
163            return Ok(());
164        }
165
166        if let Some(status) = self.region_status.get_mut(&region_id) {
167            match compact_options {
168                Options::Regular(_) => {
169                    // Region is compacting. Add the waiter to pending list.
170                    status.merge_waiter(waiter);
171                }
172                options @ Options::StrictWindow(_) => {
173                    // Incoming compaction request is manually triggered.
174                    status.set_pending_request(PendingCompaction {
175                        options,
176                        waiter,
177                        max_parallelism,
178                    });
179                    info!(
180                        "Region {} is compacting, manually compaction will be re-scheduled.",
181                        region_id
182                    );
183                }
184            }
185            return Ok(());
186        }
187
188        // The region can compact directly.
189        let mut status =
190            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191        let request = status.new_compaction_request(
192            self.request_sender.clone(),
193            waiter,
194            self.engine_config.clone(),
195            self.cache_manager.clone(),
196            manifest_ctx,
197            self.listener.clone(),
198            schema_metadata_manager,
199            max_parallelism,
200        );
201        self.region_status.insert(region_id, status);
202        let result = self
203            .schedule_compaction_request(request, compact_options)
204            .await;
205
206        self.listener.on_compaction_scheduled(region_id);
207        result
208    }
209
210    /// Notifies the scheduler that the compaction job is finished successfully.
211    pub(crate) async fn on_compaction_finished(
212        &mut self,
213        region_id: RegionId,
214        manifest_ctx: &ManifestContextRef,
215        schema_metadata_manager: SchemaMetadataManagerRef,
216    ) {
217        let Some(status) = self.region_status.get_mut(&region_id) else {
218            return;
219        };
220
221        if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
222            let PendingCompaction {
223                options,
224                waiter,
225                max_parallelism,
226            } = pending_request;
227
228            let request = status.new_compaction_request(
229                self.request_sender.clone(),
230                waiter,
231                self.engine_config.clone(),
232                self.cache_manager.clone(),
233                manifest_ctx,
234                self.listener.clone(),
235                schema_metadata_manager,
236                max_parallelism,
237            );
238
239            if let Err(e) = self.schedule_compaction_request(request, options).await {
240                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
241            } else {
242                debug!(
243                    "Successfully scheduled manual compaction for region id: {}",
244                    region_id
245                );
246            }
247            return;
248        }
249
250        // We should always try to compact the region until picker returns None.
251        let request = status.new_compaction_request(
252            self.request_sender.clone(),
253            OptionOutputTx::none(),
254            self.engine_config.clone(),
255            self.cache_manager.clone(),
256            manifest_ctx,
257            self.listener.clone(),
258            schema_metadata_manager,
259            MAX_PARALLEL_COMPACTION,
260        );
261        // Try to schedule next compaction task for this region.
262        if let Err(e) = self
263            .schedule_compaction_request(
264                request,
265                compact_request::Options::Regular(Default::default()),
266            )
267            .await
268        {
269            error!(e; "Failed to schedule next compaction for region {}", region_id);
270        }
271    }
272
273    /// Notifies the scheduler that the compaction job is failed.
274    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
275        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
276        // Remove this region.
277        let Some(status) = self.region_status.remove(&region_id) else {
278            return;
279        };
280
281        // Fast fail: cancels all pending tasks and sends error to their waiters.
282        status.on_failure(err);
283    }
284
285    /// Notifies the scheduler that the region is dropped.
286    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
287        self.remove_region_on_failure(
288            region_id,
289            Arc::new(RegionDroppedSnafu { region_id }.build()),
290        );
291    }
292
293    /// Notifies the scheduler that the region is closed.
294    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
295        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
296    }
297
298    /// Notifies the scheduler that the region is truncated.
299    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
300        self.remove_region_on_failure(
301            region_id,
302            Arc::new(RegionTruncatedSnafu { region_id }.build()),
303        );
304    }
305
306    /// Schedules a compaction request.
307    ///
308    /// If the region has nothing to compact, it removes the region from the status map.
309    async fn schedule_compaction_request(
310        &mut self,
311        request: CompactionRequest,
312        options: compact_request::Options,
313    ) -> Result<()> {
314        let picker = new_picker(
315            &options,
316            &request.current_version.options.compaction,
317            request.current_version.options.append_mode,
318            Some(self.engine_config.max_background_compactions),
319        );
320        let region_id = request.region_id();
321        let CompactionRequest {
322            engine_config,
323            current_version,
324            access_layer,
325            request_sender,
326            waiters,
327            start_time,
328            cache_manager,
329            manifest_ctx,
330            listener,
331            schema_metadata_manager,
332            max_parallelism,
333        } = request;
334
335        let ttl = find_ttl(
336            region_id.table_id(),
337            current_version.options.ttl,
338            &schema_metadata_manager,
339        )
340        .await
341        .unwrap_or_else(|e| {
342            warn!(e; "Failed to get ttl for region: {}", region_id);
343            TimeToLive::default()
344        });
345
346        debug!(
347            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
348            picker, region_id, ttl
349        );
350
351        let compaction_region = CompactionRegion {
352            region_id,
353            current_version: current_version.clone(),
354            region_options: current_version.options.clone(),
355            engine_config: engine_config.clone(),
356            region_metadata: current_version.metadata.clone(),
357            cache_manager: cache_manager.clone(),
358            access_layer: access_layer.clone(),
359            manifest_ctx: manifest_ctx.clone(),
360            file_purger: None,
361            ttl: Some(ttl),
362            max_parallelism,
363        };
364
365        let picker_output = {
366            let _pick_timer = COMPACTION_STAGE_ELAPSED
367                .with_label_values(&["pick"])
368                .start_timer();
369            picker.pick(&compaction_region)
370        };
371
372        let picker_output = if let Some(picker_output) = picker_output {
373            picker_output
374        } else {
375            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
376            for waiter in waiters {
377                waiter.send(Ok(0));
378            }
379            self.region_status.remove(&region_id);
380            return Ok(());
381        };
382
383        // If specified to run compaction remotely, we schedule the compaction job remotely.
384        // It will fall back to local compaction if there is no remote job scheduler.
385        let waiters = if current_version.options.compaction.remote_compaction() {
386            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
387                let remote_compaction_job = CompactionJob {
388                    compaction_region: compaction_region.clone(),
389                    picker_output: picker_output.clone(),
390                    start_time,
391                    waiters,
392                    ttl,
393                };
394
395                let result = remote_job_scheduler
396                    .schedule(
397                        RemoteJob::CompactionJob(remote_compaction_job),
398                        Box::new(DefaultNotifier {
399                            request_sender: request_sender.clone(),
400                        }),
401                    )
402                    .await;
403
404                match result {
405                    Ok(job_id) => {
406                        info!(
407                            "Scheduled remote compaction job {} for region {}",
408                            job_id, region_id
409                        );
410                        INFLIGHT_COMPACTION_COUNT.inc();
411                        return Ok(());
412                    }
413                    Err(e) => {
414                        if !current_version.options.compaction.fallback_to_local() {
415                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
416                            return RemoteCompactionSnafu {
417                                region_id,
418                                job_id: None,
419                                reason: e.reason,
420                            }
421                            .fail();
422                        }
423
424                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
425
426                        // Return the waiters back to the caller for local compaction.
427                        e.waiters
428                    }
429                }
430            } else {
431                debug!(
432                    "Remote compaction is not enabled, fallback to local compaction for region {}",
433                    region_id
434                );
435                waiters
436            }
437        } else {
438            waiters
439        };
440
441        // Create a local compaction task.
442        let estimated_bytes = estimate_compaction_bytes(&picker_output);
443        let local_compaction_task = Box::new(CompactionTaskImpl {
444            request_sender,
445            waiters,
446            start_time,
447            listener,
448            picker_output,
449            compaction_region,
450            compactor: Arc::new(DefaultCompactor {}),
451            memory_manager: self.memory_manager.clone(),
452            memory_policy: self.memory_policy,
453            estimated_memory_bytes: estimated_bytes,
454        });
455
456        self.submit_compaction_task(local_compaction_task, region_id)
457    }
458
459    fn submit_compaction_task(
460        &mut self,
461        mut task: Box<CompactionTaskImpl>,
462        region_id: RegionId,
463    ) -> Result<()> {
464        self.scheduler
465            .schedule(Box::pin(async move {
466                INFLIGHT_COMPACTION_COUNT.inc();
467                task.run().await;
468                INFLIGHT_COMPACTION_COUNT.dec();
469            }))
470            .map_err(|e| {
471                error!(e; "Failed to submit compaction request for region {}", region_id);
472                self.region_status.remove(&region_id);
473                e
474            })
475    }
476
477    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
478        // Remove this region.
479        let Some(status) = self.region_status.remove(&region_id) else {
480            return;
481        };
482
483        // Notifies all pending tasks.
484        status.on_failure(err);
485    }
486}
487
488impl Drop for CompactionScheduler {
489    fn drop(&mut self) {
490        for (region_id, status) in self.region_status.drain() {
491            // We are shutting down so notify all pending tasks.
492            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
493        }
494    }
495}
496
497/// Finds TTL of table by first examine table options then database options.
498async fn find_ttl(
499    table_id: TableId,
500    table_ttl: Option<TimeToLive>,
501    schema_metadata_manager: &SchemaMetadataManagerRef,
502) -> Result<TimeToLive> {
503    // If table TTL is set, we use it.
504    if let Some(table_ttl) = table_ttl {
505        return Ok(table_ttl);
506    }
507
508    let ttl = tokio::time::timeout(
509        crate::config::FETCH_OPTION_TIMEOUT,
510        schema_metadata_manager.get_schema_options_by_table_id(table_id),
511    )
512    .await
513    .context(TimeoutSnafu)?
514    .context(GetSchemaMetadataSnafu)?
515    .and_then(|options| options.ttl)
516    .unwrap_or_default()
517    .into();
518
519    Ok(ttl)
520}
521
522/// Status of running and pending region compaction tasks.
523struct CompactionStatus {
524    /// Id of the region.
525    region_id: RegionId,
526    /// Version control of the region.
527    version_control: VersionControlRef,
528    /// Access layer of the region.
529    access_layer: AccessLayerRef,
530    /// Pending waiters for compaction.
531    waiters: Vec<OutputTx>,
532    /// Pending compactions that are supposed to run as soon as current compaction task finished.
533    pending_request: Option<PendingCompaction>,
534}
535
536impl CompactionStatus {
537    /// Creates a new [CompactionStatus]
538    fn new(
539        region_id: RegionId,
540        version_control: VersionControlRef,
541        access_layer: AccessLayerRef,
542    ) -> CompactionStatus {
543        CompactionStatus {
544            region_id,
545            version_control,
546            access_layer,
547            waiters: Vec::new(),
548            pending_request: None,
549        }
550    }
551
552    /// Merge the waiter to the pending compaction.
553    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
554        if let Some(waiter) = waiter.take_inner() {
555            self.waiters.push(waiter);
556        }
557    }
558
559    /// Set pending compaction request or replace current value if already exist.
560    fn set_pending_request(&mut self, pending: PendingCompaction) {
561        if let Some(prev) = self.pending_request.replace(pending) {
562            debug!(
563                "Replace pending compaction options with new request {:?} for region: {}",
564                prev.options, self.region_id
565            );
566            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
567        }
568    }
569
570    fn on_failure(mut self, err: Arc<Error>) {
571        for waiter in self.waiters.drain(..) {
572            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
573                region_id: self.region_id,
574            }));
575        }
576
577        if let Some(pending_compaction) = self.pending_request {
578            pending_compaction
579                .waiter
580                .send(Err(err.clone()).context(CompactRegionSnafu {
581                    region_id: self.region_id,
582                }));
583        }
584    }
585
586    /// Creates a new compaction request for compaction picker.
587    ///
588    /// It consumes all pending compaction waiters.
589    #[allow(clippy::too_many_arguments)]
590    fn new_compaction_request(
591        &mut self,
592        request_sender: Sender<WorkerRequestWithTime>,
593        mut waiter: OptionOutputTx,
594        engine_config: Arc<MitoConfig>,
595        cache_manager: CacheManagerRef,
596        manifest_ctx: &ManifestContextRef,
597        listener: WorkerListener,
598        schema_metadata_manager: SchemaMetadataManagerRef,
599        max_parallelism: usize,
600    ) -> CompactionRequest {
601        let current_version = CompactionVersion::from(self.version_control.current().version);
602        let start_time = Instant::now();
603        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
604        waiters.extend(std::mem::take(&mut self.waiters));
605
606        if let Some(waiter) = waiter.take_inner() {
607            waiters.push(waiter);
608        }
609
610        CompactionRequest {
611            engine_config,
612            current_version,
613            access_layer: self.access_layer.clone(),
614            request_sender: request_sender.clone(),
615            waiters,
616            start_time,
617            cache_manager,
618            manifest_ctx: manifest_ctx.clone(),
619            listener,
620            schema_metadata_manager,
621            max_parallelism,
622        }
623    }
624}
625
626#[derive(Debug, Clone)]
627pub struct CompactionOutput {
628    /// Compaction output file level.
629    pub output_level: Level,
630    /// Compaction input files.
631    pub inputs: Vec<FileHandle>,
632    /// Whether to remove deletion markers.
633    pub filter_deleted: bool,
634    /// Compaction output time range. Only windowed compaction specifies output time range.
635    pub output_time_range: Option<TimestampRange>,
636}
637
638/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
639#[derive(Debug, Clone, Serialize, Deserialize)]
640pub struct SerializedCompactionOutput {
641    output_level: Level,
642    inputs: Vec<FileMeta>,
643    filter_deleted: bool,
644    output_time_range: Option<TimestampRange>,
645}
646
647/// Builders to create [BoxedBatchReader] for compaction.
648struct CompactionSstReaderBuilder<'a> {
649    metadata: RegionMetadataRef,
650    sst_layer: AccessLayerRef,
651    cache: CacheManagerRef,
652    inputs: &'a [FileHandle],
653    append_mode: bool,
654    filter_deleted: bool,
655    time_range: Option<TimestampRange>,
656    merge_mode: MergeMode,
657}
658
659impl CompactionSstReaderBuilder<'_> {
660    /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
661    async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
662        let scan_input = self.build_scan_input(false)?.with_compaction(true);
663
664        SeqScan::new(scan_input).build_reader_for_compaction().await
665    }
666
667    /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
668    async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
669        let scan_input = self.build_scan_input(true)?.with_compaction(true);
670
671        SeqScan::new(scan_input)
672            .build_flat_reader_for_compaction()
673            .await
674    }
675
676    fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
677        let mut scan_input = ScanInput::new(
678            self.sst_layer,
679            ProjectionMapper::all(&self.metadata, flat_format)?,
680        )
681        .with_files(self.inputs.to_vec())
682        .with_append_mode(self.append_mode)
683        // We use special cache strategy for compaction.
684        .with_cache(CacheStrategy::Compaction(self.cache))
685        .with_filter_deleted(self.filter_deleted)
686        // We ignore file not found error during compaction.
687        .with_ignore_file_not_found(true)
688        .with_merge_mode(self.merge_mode)
689        .with_flat_format(flat_format);
690
691        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
692        // by converting time ranges into predicate.
693        if let Some(time_range) = self.time_range {
694            scan_input =
695                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
696        }
697
698        Ok(scan_input)
699    }
700}
701
702/// Converts time range to predicates so that rows outside the range will be filtered.
703fn time_range_to_predicate(
704    range: TimestampRange,
705    metadata: &RegionMetadataRef,
706) -> Result<PredicateGroup> {
707    let ts_col = metadata.time_index_column();
708
709    // safety: time index column's type must be a valid timestamp type.
710    let ts_col_unit = ts_col
711        .column_schema
712        .data_type
713        .as_timestamp()
714        .unwrap()
715        .unit();
716
717    let exprs = match (range.start(), range.end()) {
718        (Some(start), Some(end)) => {
719            vec![
720                datafusion_expr::col(ts_col.column_schema.name.clone())
721                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
722                datafusion_expr::col(ts_col.column_schema.name.clone())
723                    .lt(ts_to_lit(*end, ts_col_unit)?),
724            ]
725        }
726        (Some(start), None) => {
727            vec![
728                datafusion_expr::col(ts_col.column_schema.name.clone())
729                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
730            ]
731        }
732
733        (None, Some(end)) => {
734            vec![
735                datafusion_expr::col(ts_col.column_schema.name.clone())
736                    .lt(ts_to_lit(*end, ts_col_unit)?),
737            ]
738        }
739        (None, None) => {
740            return Ok(PredicateGroup::default());
741        }
742    };
743
744    let predicate = PredicateGroup::new(metadata, &exprs)?;
745    Ok(predicate)
746}
747
748fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
749    let ts = ts
750        .convert_to(ts_col_unit)
751        .context(TimeRangePredicateOverflowSnafu {
752            timestamp: ts,
753            unit: ts_col_unit,
754        })?;
755    let val = ts.value();
756    let scalar_value = match ts_col_unit {
757        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
758        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
759        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
760        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
761    };
762    Ok(datafusion_expr::lit(scalar_value))
763}
764
765/// Finds all expired SSTs across levels.
766fn get_expired_ssts(
767    levels: &[LevelMeta],
768    ttl: Option<TimeToLive>,
769    now: Timestamp,
770) -> Vec<FileHandle> {
771    let Some(ttl) = ttl else {
772        return vec![];
773    };
774
775    levels
776        .iter()
777        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
778        .collect()
779}
780
781/// Estimates compaction memory as the sum of all input files' maximum row-group
782/// uncompressed sizes.
783fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
784    picker_output
785        .outputs
786        .iter()
787        .flat_map(|output| output.inputs.iter())
788        .map(|file: &FileHandle| {
789            let meta = file.meta_ref();
790            meta.max_row_group_uncompressed_size
791        })
792        .sum()
793}
794
795/// Pending compaction request that is supposed to run after current task is finished,
796/// typically used for manual compactions.
797struct PendingCompaction {
798    /// Compaction options. Currently, it can only be [StrictWindow].
799    pub(crate) options: compact_request::Options,
800    /// Waiters of pending requests.
801    pub(crate) waiter: OptionOutputTx,
802    /// Max parallelism for pending compaction.
803    pub(crate) max_parallelism: usize,
804}
805
806#[cfg(test)]
807mod tests {
808    use api::v1::region::StrictWindow;
809    use common_datasource::compression::CompressionType;
810    use tokio::sync::{Barrier, oneshot};
811
812    use super::*;
813    use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
814    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
815    use crate::region::ManifestContext;
816    use crate::sst::FormatType;
817    use crate::test_util::mock_schema_metadata_manager;
818    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
819    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
820
821    #[tokio::test]
822    async fn test_schedule_empty() {
823        let env = SchedulerEnv::new().await;
824        let (tx, _rx) = mpsc::channel(4);
825        let mut scheduler = env.mock_compaction_scheduler(tx);
826        let mut builder = VersionControlBuilder::new();
827        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
828        schema_metadata_manager
829            .register_region_table_info(
830                builder.region_id().table_id(),
831                "test_table",
832                "test_catalog",
833                "test_schema",
834                None,
835                kv_backend,
836            )
837            .await;
838        // Nothing to compact.
839        let version_control = Arc::new(builder.build());
840        let (output_tx, output_rx) = oneshot::channel();
841        let waiter = OptionOutputTx::from(output_tx);
842        let manifest_ctx = env
843            .mock_manifest_context(version_control.current().version.metadata.clone())
844            .await;
845        scheduler
846            .schedule_compaction(
847                builder.region_id(),
848                compact_request::Options::Regular(Default::default()),
849                &version_control,
850                &env.access_layer,
851                waiter,
852                &manifest_ctx,
853                schema_metadata_manager.clone(),
854                1,
855            )
856            .await
857            .unwrap();
858        let output = output_rx.await.unwrap().unwrap();
859        assert_eq!(output, 0);
860        assert!(scheduler.region_status.is_empty());
861
862        // Only one file, picker won't compact it.
863        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
864        let (output_tx, output_rx) = oneshot::channel();
865        let waiter = OptionOutputTx::from(output_tx);
866        scheduler
867            .schedule_compaction(
868                builder.region_id(),
869                compact_request::Options::Regular(Default::default()),
870                &version_control,
871                &env.access_layer,
872                waiter,
873                &manifest_ctx,
874                schema_metadata_manager,
875                1,
876            )
877            .await
878            .unwrap();
879        let output = output_rx.await.unwrap().unwrap();
880        assert_eq!(output, 0);
881        assert!(scheduler.region_status.is_empty());
882    }
883
884    #[tokio::test]
885    async fn test_schedule_on_finished() {
886        common_telemetry::init_default_ut_logging();
887        let job_scheduler = Arc::new(VecScheduler::default());
888        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
889        let (tx, _rx) = mpsc::channel(4);
890        let mut scheduler = env.mock_compaction_scheduler(tx);
891        let mut builder = VersionControlBuilder::new();
892        let purger = builder.file_purger();
893        let region_id = builder.region_id();
894
895        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
896        schema_metadata_manager
897            .register_region_table_info(
898                builder.region_id().table_id(),
899                "test_table",
900                "test_catalog",
901                "test_schema",
902                None,
903                kv_backend,
904            )
905            .await;
906
907        // 5 files to compact.
908        let end = 1000 * 1000;
909        let version_control = Arc::new(
910            builder
911                .push_l0_file(0, end)
912                .push_l0_file(10, end)
913                .push_l0_file(50, end)
914                .push_l0_file(80, end)
915                .push_l0_file(90, end)
916                .build(),
917        );
918        let manifest_ctx = env
919            .mock_manifest_context(version_control.current().version.metadata.clone())
920            .await;
921        scheduler
922            .schedule_compaction(
923                region_id,
924                compact_request::Options::Regular(Default::default()),
925                &version_control,
926                &env.access_layer,
927                OptionOutputTx::none(),
928                &manifest_ctx,
929                schema_metadata_manager.clone(),
930                1,
931            )
932            .await
933            .unwrap();
934        // Should schedule 1 compaction.
935        assert_eq!(1, scheduler.region_status.len());
936        assert_eq!(1, job_scheduler.num_jobs());
937        let data = version_control.current();
938        let file_metas: Vec<_> = data.version.ssts.levels()[0]
939            .files
940            .values()
941            .map(|file| file.meta_ref().clone())
942            .collect();
943
944        // 5 files for next compaction and removes old files.
945        apply_edit(
946            &version_control,
947            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
948            &file_metas,
949            purger.clone(),
950        );
951        // The task is pending.
952        let (tx, _rx) = oneshot::channel();
953        scheduler
954            .schedule_compaction(
955                region_id,
956                compact_request::Options::Regular(Default::default()),
957                &version_control,
958                &env.access_layer,
959                OptionOutputTx::new(Some(OutputTx::new(tx))),
960                &manifest_ctx,
961                schema_metadata_manager.clone(),
962                1,
963            )
964            .await
965            .unwrap();
966        assert_eq!(1, scheduler.region_status.len());
967        assert_eq!(1, job_scheduler.num_jobs());
968        assert!(
969            !scheduler
970                .region_status
971                .get(&builder.region_id())
972                .unwrap()
973                .waiters
974                .is_empty()
975        );
976
977        // On compaction finished and schedule next compaction.
978        scheduler
979            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
980            .await;
981        assert_eq!(1, scheduler.region_status.len());
982        assert_eq!(2, job_scheduler.num_jobs());
983
984        // 5 files for next compaction.
985        apply_edit(
986            &version_control,
987            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
988            &[],
989            purger.clone(),
990        );
991        let (tx, _rx) = oneshot::channel();
992        // The task is pending.
993        scheduler
994            .schedule_compaction(
995                region_id,
996                compact_request::Options::Regular(Default::default()),
997                &version_control,
998                &env.access_layer,
999                OptionOutputTx::new(Some(OutputTx::new(tx))),
1000                &manifest_ctx,
1001                schema_metadata_manager,
1002                1,
1003            )
1004            .await
1005            .unwrap();
1006        assert_eq!(2, job_scheduler.num_jobs());
1007        assert!(
1008            !scheduler
1009                .region_status
1010                .get(&builder.region_id())
1011                .unwrap()
1012                .waiters
1013                .is_empty()
1014        );
1015    }
1016
1017    #[tokio::test]
1018    async fn test_manual_compaction_when_compaction_in_progress() {
1019        common_telemetry::init_default_ut_logging();
1020        let job_scheduler = Arc::new(VecScheduler::default());
1021        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1022        let (tx, _rx) = mpsc::channel(4);
1023        let mut scheduler = env.mock_compaction_scheduler(tx);
1024        let mut builder = VersionControlBuilder::new();
1025        let purger = builder.file_purger();
1026        let region_id = builder.region_id();
1027
1028        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1029        schema_metadata_manager
1030            .register_region_table_info(
1031                builder.region_id().table_id(),
1032                "test_table",
1033                "test_catalog",
1034                "test_schema",
1035                None,
1036                kv_backend,
1037            )
1038            .await;
1039
1040        // 5 files to compact.
1041        let end = 1000 * 1000;
1042        let version_control = Arc::new(
1043            builder
1044                .push_l0_file(0, end)
1045                .push_l0_file(10, end)
1046                .push_l0_file(50, end)
1047                .push_l0_file(80, end)
1048                .push_l0_file(90, end)
1049                .build(),
1050        );
1051        let manifest_ctx = env
1052            .mock_manifest_context(version_control.current().version.metadata.clone())
1053            .await;
1054
1055        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1056            .files
1057            .values()
1058            .map(|file| file.meta_ref().clone())
1059            .collect();
1060
1061        // 5 files for next compaction and removes old files.
1062        apply_edit(
1063            &version_control,
1064            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1065            &file_metas,
1066            purger.clone(),
1067        );
1068
1069        scheduler
1070            .schedule_compaction(
1071                region_id,
1072                compact_request::Options::Regular(Default::default()),
1073                &version_control,
1074                &env.access_layer,
1075                OptionOutputTx::none(),
1076                &manifest_ctx,
1077                schema_metadata_manager.clone(),
1078                1,
1079            )
1080            .await
1081            .unwrap();
1082        // Should schedule 1 compaction.
1083        assert_eq!(1, scheduler.region_status.len());
1084        assert_eq!(1, job_scheduler.num_jobs());
1085        assert!(
1086            scheduler
1087                .region_status
1088                .get(&region_id)
1089                .unwrap()
1090                .pending_request
1091                .is_none()
1092        );
1093
1094        // Schedule another manual compaction.
1095        let (tx, _rx) = oneshot::channel();
1096        scheduler
1097            .schedule_compaction(
1098                region_id,
1099                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1100                &version_control,
1101                &env.access_layer,
1102                OptionOutputTx::new(Some(OutputTx::new(tx))),
1103                &manifest_ctx,
1104                schema_metadata_manager.clone(),
1105                1,
1106            )
1107            .await
1108            .unwrap();
1109        assert_eq!(1, scheduler.region_status.len());
1110        // Current job num should be 1 since compaction is in progress.
1111        assert_eq!(1, job_scheduler.num_jobs());
1112        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1113        assert!(status.pending_request.is_some());
1114
1115        // On compaction finished and schedule next compaction.
1116        scheduler
1117            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1118            .await;
1119        assert_eq!(1, scheduler.region_status.len());
1120        assert_eq!(2, job_scheduler.num_jobs());
1121
1122        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1123        assert!(status.pending_request.is_none());
1124    }
1125
1126    #[tokio::test]
1127    async fn test_compaction_bypass_in_staging_mode() {
1128        let env = SchedulerEnv::new().await;
1129        let (tx, _rx) = mpsc::channel(4);
1130        let mut scheduler = env.mock_compaction_scheduler(tx);
1131
1132        // Create version control and manifest context for staging mode
1133        let builder = VersionControlBuilder::new();
1134        let version_control = Arc::new(builder.build());
1135        let region_id = version_control.current().version.metadata.region_id;
1136
1137        // Create staging manifest context using the same pattern as SchedulerEnv
1138        let staging_manifest_ctx = {
1139            let manager = RegionManifestManager::new(
1140                version_control.current().version.metadata.clone(),
1141                0,
1142                RegionManifestOptions {
1143                    manifest_dir: "".to_string(),
1144                    object_store: env.access_layer.object_store().clone(),
1145                    compress_type: CompressionType::Uncompressed,
1146                    checkpoint_distance: 10,
1147                    remove_file_options: Default::default(),
1148                    manifest_cache: None,
1149                },
1150                FormatType::PrimaryKey,
1151                &Default::default(),
1152            )
1153            .await
1154            .unwrap();
1155            Arc::new(ManifestContext::new(
1156                manager,
1157                RegionRoleState::Leader(RegionLeaderState::Staging),
1158            ))
1159        };
1160
1161        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1162
1163        // Test regular compaction bypass in staging mode
1164        let (tx, rx) = oneshot::channel();
1165        scheduler
1166            .schedule_compaction(
1167                region_id,
1168                compact_request::Options::Regular(Default::default()),
1169                &version_control,
1170                &env.access_layer,
1171                OptionOutputTx::new(Some(OutputTx::new(tx))),
1172                &staging_manifest_ctx,
1173                schema_metadata_manager,
1174                1,
1175            )
1176            .await
1177            .unwrap();
1178
1179        let result = rx.await.unwrap();
1180        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1181        assert_eq!(0, scheduler.region_status.len());
1182    }
1183
1184    #[tokio::test]
1185    async fn test_concurrent_memory_competition() {
1186        let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
1187        let barrier = Arc::new(Barrier::new(3));
1188        let mut handles = vec![];
1189
1190        // Spawn 3 tasks competing for memory, each trying to acquire 2MB
1191        for _i in 0..3 {
1192            let mgr = manager.clone();
1193            let bar = barrier.clone();
1194            let handle = tokio::spawn(async move {
1195                bar.wait().await; // Synchronize start
1196                mgr.try_acquire(2 * 1024 * 1024)
1197            });
1198            handles.push(handle);
1199        }
1200
1201        let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1202            .await
1203            .into_iter()
1204            .map(|r| r.unwrap())
1205            .collect();
1206
1207        // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
1208        let succeeded = results.iter().filter(|r| r.is_some()).count();
1209        let failed = results.iter().filter(|r| r.is_none()).count();
1210
1211        assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1212        assert_eq!(failed, 2, "Expected 2 tasks to fail");
1213
1214        // Clean up
1215        drop(results);
1216        assert_eq!(manager.used_bytes(), 0);
1217    }
1218}