mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod picker;
18mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{new_picker, CompactionTask};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::BoxedBatchReader;
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::ManifestContextRef;
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
66use crate::schedule::remote_job_scheduler::{
67    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74/// Region compaction request.
75pub struct CompactionRequest {
76    pub(crate) engine_config: Arc<MitoConfig>,
77    pub(crate) current_version: CompactionVersion,
78    pub(crate) access_layer: AccessLayerRef,
79    /// Sender to send notification to the region worker.
80    pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
81    /// Waiters of the compaction request.
82    pub(crate) waiters: Vec<OutputTx>,
83    /// Start time of compaction task.
84    pub(crate) start_time: Instant,
85    pub(crate) cache_manager: CacheManagerRef,
86    pub(crate) manifest_ctx: ManifestContextRef,
87    pub(crate) listener: WorkerListener,
88    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89    pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93    pub(crate) fn region_id(&self) -> RegionId {
94        self.current_version.metadata.region_id
95    }
96}
97
98/// Compaction scheduler tracks and manages compaction tasks.
99pub(crate) struct CompactionScheduler {
100    scheduler: SchedulerRef,
101    /// Compacting regions.
102    region_status: HashMap<RegionId, CompactionStatus>,
103    /// Request sender of the worker that this scheduler belongs to.
104    request_sender: Sender<WorkerRequest>,
105    cache_manager: CacheManagerRef,
106    engine_config: Arc<MitoConfig>,
107    listener: WorkerListener,
108    /// Plugins for the compaction scheduler.
109    plugins: Plugins,
110}
111
112impl CompactionScheduler {
113    pub(crate) fn new(
114        scheduler: SchedulerRef,
115        request_sender: Sender<WorkerRequest>,
116        cache_manager: CacheManagerRef,
117        engine_config: Arc<MitoConfig>,
118        listener: WorkerListener,
119        plugins: Plugins,
120    ) -> Self {
121        Self {
122            scheduler,
123            region_status: HashMap::new(),
124            request_sender,
125            cache_manager,
126            engine_config,
127            listener,
128            plugins,
129        }
130    }
131
132    /// Schedules a compaction for the region.
133    #[allow(clippy::too_many_arguments)]
134    pub(crate) async fn schedule_compaction(
135        &mut self,
136        region_id: RegionId,
137        compact_options: compact_request::Options,
138        version_control: &VersionControlRef,
139        access_layer: &AccessLayerRef,
140        waiter: OptionOutputTx,
141        manifest_ctx: &ManifestContextRef,
142        schema_metadata_manager: SchemaMetadataManagerRef,
143        max_parallelism: usize,
144    ) -> Result<()> {
145        if let Some(status) = self.region_status.get_mut(&region_id) {
146            match compact_options {
147                Options::Regular(_) => {
148                    // Region is compacting. Add the waiter to pending list.
149                    status.merge_waiter(waiter);
150                }
151                options @ Options::StrictWindow(_) => {
152                    // Incoming compaction request is manually triggered.
153                    status.set_pending_request(PendingCompaction {
154                        options,
155                        waiter,
156                        max_parallelism,
157                    });
158                    info!(
159                        "Region {} is compacting, manually compaction will be re-scheduled.",
160                        region_id
161                    );
162                }
163            }
164            return Ok(());
165        }
166
167        // The region can compact directly.
168        let mut status =
169            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
170        let request = status.new_compaction_request(
171            self.request_sender.clone(),
172            waiter,
173            self.engine_config.clone(),
174            self.cache_manager.clone(),
175            manifest_ctx,
176            self.listener.clone(),
177            schema_metadata_manager,
178            max_parallelism,
179        );
180        self.region_status.insert(region_id, status);
181        let result = self
182            .schedule_compaction_request(request, compact_options)
183            .await;
184
185        self.listener.on_compaction_scheduled(region_id);
186        result
187    }
188
189    /// Notifies the scheduler that the compaction job is finished successfully.
190    pub(crate) async fn on_compaction_finished(
191        &mut self,
192        region_id: RegionId,
193        manifest_ctx: &ManifestContextRef,
194        schema_metadata_manager: SchemaMetadataManagerRef,
195    ) {
196        let Some(status) = self.region_status.get_mut(&region_id) else {
197            return;
198        };
199
200        if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
201            let PendingCompaction {
202                options,
203                waiter,
204                max_parallelism,
205            } = pending_request;
206
207            let request = status.new_compaction_request(
208                self.request_sender.clone(),
209                waiter,
210                self.engine_config.clone(),
211                self.cache_manager.clone(),
212                manifest_ctx,
213                self.listener.clone(),
214                schema_metadata_manager,
215                max_parallelism,
216            );
217
218            if let Err(e) = self.schedule_compaction_request(request, options).await {
219                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
220            } else {
221                debug!(
222                    "Successfully scheduled manual compaction for region id: {}",
223                    region_id
224                );
225            }
226            return;
227        }
228
229        // We should always try to compact the region until picker returns None.
230        let request = status.new_compaction_request(
231            self.request_sender.clone(),
232            OptionOutputTx::none(),
233            self.engine_config.clone(),
234            self.cache_manager.clone(),
235            manifest_ctx,
236            self.listener.clone(),
237            schema_metadata_manager,
238            MAX_PARALLEL_COMPACTION,
239        );
240        // Try to schedule next compaction task for this region.
241        if let Err(e) = self
242            .schedule_compaction_request(
243                request,
244                compact_request::Options::Regular(Default::default()),
245            )
246            .await
247        {
248            error!(e; "Failed to schedule next compaction for region {}", region_id);
249        }
250    }
251
252    /// Notifies the scheduler that the compaction job is failed.
253    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
254        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
255        // Remove this region.
256        let Some(status) = self.region_status.remove(&region_id) else {
257            return;
258        };
259
260        // Fast fail: cancels all pending tasks and sends error to their waiters.
261        status.on_failure(err);
262    }
263
264    /// Notifies the scheduler that the region is dropped.
265    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
266        self.remove_region_on_failure(
267            region_id,
268            Arc::new(RegionDroppedSnafu { region_id }.build()),
269        );
270    }
271
272    /// Notifies the scheduler that the region is closed.
273    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
274        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
275    }
276
277    /// Notifies the scheduler that the region is truncated.
278    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
279        self.remove_region_on_failure(
280            region_id,
281            Arc::new(RegionTruncatedSnafu { region_id }.build()),
282        );
283    }
284
285    /// Schedules a compaction request.
286    ///
287    /// If the region has nothing to compact, it removes the region from the status map.
288    async fn schedule_compaction_request(
289        &mut self,
290        request: CompactionRequest,
291        options: compact_request::Options,
292    ) -> Result<()> {
293        let picker = new_picker(
294            &options,
295            &request.current_version.options.compaction,
296            request.current_version.options.append_mode,
297        );
298        let region_id = request.region_id();
299        let CompactionRequest {
300            engine_config,
301            current_version,
302            access_layer,
303            request_sender,
304            waiters,
305            start_time,
306            cache_manager,
307            manifest_ctx,
308            listener,
309            schema_metadata_manager,
310            max_parallelism,
311        } = request;
312
313        let ttl = find_ttl(
314            region_id.table_id(),
315            current_version.options.ttl,
316            &schema_metadata_manager,
317        )
318        .await
319        .unwrap_or_else(|e| {
320            warn!(e; "Failed to get ttl for region: {}", region_id);
321            TimeToLive::default()
322        });
323
324        debug!(
325            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
326            picker, region_id, ttl
327        );
328
329        let compaction_region = CompactionRegion {
330            region_id,
331            region_dir: access_layer.region_dir().to_string(),
332            current_version: current_version.clone(),
333            region_options: current_version.options.clone(),
334            engine_config: engine_config.clone(),
335            region_metadata: current_version.metadata.clone(),
336            cache_manager: cache_manager.clone(),
337            access_layer: access_layer.clone(),
338            manifest_ctx: manifest_ctx.clone(),
339            file_purger: None,
340            ttl: Some(ttl),
341            max_parallelism,
342        };
343
344        let picker_output = {
345            let _pick_timer = COMPACTION_STAGE_ELAPSED
346                .with_label_values(&["pick"])
347                .start_timer();
348            picker.pick(&compaction_region)
349        };
350
351        let picker_output = if let Some(picker_output) = picker_output {
352            picker_output
353        } else {
354            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
355            for waiter in waiters {
356                waiter.send(Ok(0));
357            }
358            self.region_status.remove(&region_id);
359            return Ok(());
360        };
361
362        // If specified to run compaction remotely, we schedule the compaction job remotely.
363        // It will fall back to local compaction if there is no remote job scheduler.
364        let waiters = if current_version.options.compaction.remote_compaction() {
365            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
366                let remote_compaction_job = CompactionJob {
367                    compaction_region: compaction_region.clone(),
368                    picker_output: picker_output.clone(),
369                    start_time,
370                    waiters,
371                };
372
373                let result = remote_job_scheduler
374                    .schedule(
375                        RemoteJob::CompactionJob(remote_compaction_job),
376                        Box::new(DefaultNotifier {
377                            request_sender: request_sender.clone(),
378                        }),
379                    )
380                    .await;
381
382                match result {
383                    Ok(job_id) => {
384                        info!(
385                            "Scheduled remote compaction job {} for region {}",
386                            job_id, region_id
387                        );
388                        INFLIGHT_COMPACTION_COUNT.inc();
389                        return Ok(());
390                    }
391                    Err(e) => {
392                        if !current_version.options.compaction.fallback_to_local() {
393                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
394                            return RemoteCompactionSnafu {
395                                region_id,
396                                job_id: None,
397                                reason: e.reason,
398                            }
399                            .fail();
400                        }
401
402                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
403
404                        // Return the waiters back to the caller for local compaction.
405                        e.waiters
406                    }
407                }
408            } else {
409                debug!(
410                    "Remote compaction is not enabled, fallback to local compaction for region {}",
411                    region_id
412                );
413                waiters
414            }
415        } else {
416            waiters
417        };
418
419        // Create a local compaction task.
420        let mut local_compaction_task = Box::new(CompactionTaskImpl {
421            request_sender,
422            waiters,
423            start_time,
424            listener,
425            picker_output,
426            compaction_region,
427            compactor: Arc::new(DefaultCompactor {}),
428        });
429
430        // Submit the compaction task.
431        self.scheduler
432            .schedule(Box::pin(async move {
433                INFLIGHT_COMPACTION_COUNT.inc();
434                local_compaction_task.run().await;
435                INFLIGHT_COMPACTION_COUNT.dec();
436            }))
437            .map_err(|e| {
438                error!(e; "Failed to submit compaction request for region {}", region_id);
439                // If failed to submit the job, we need to remove the region from the scheduler.
440                self.region_status.remove(&region_id);
441                e
442            })
443    }
444
445    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
446        // Remove this region.
447        let Some(status) = self.region_status.remove(&region_id) else {
448            return;
449        };
450
451        // Notifies all pending tasks.
452        status.on_failure(err);
453    }
454}
455
456impl Drop for CompactionScheduler {
457    fn drop(&mut self) {
458        for (region_id, status) in self.region_status.drain() {
459            // We are shutting down so notify all pending tasks.
460            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
461        }
462    }
463}
464
465/// Finds TTL of table by first examine table options then database options.
466async fn find_ttl(
467    table_id: TableId,
468    table_ttl: Option<TimeToLive>,
469    schema_metadata_manager: &SchemaMetadataManagerRef,
470) -> Result<TimeToLive> {
471    // If table TTL is set, we use it.
472    if let Some(table_ttl) = table_ttl {
473        return Ok(table_ttl);
474    }
475
476    let ttl = tokio::time::timeout(
477        crate::config::FETCH_OPTION_TIMEOUT,
478        schema_metadata_manager.get_schema_options_by_table_id(table_id),
479    )
480    .await
481    .context(TimeoutSnafu)?
482    .context(GetSchemaMetadataSnafu)?
483    .and_then(|options| options.ttl)
484    .unwrap_or_default()
485    .into();
486
487    Ok(ttl)
488}
489
490/// Status of running and pending region compaction tasks.
491struct CompactionStatus {
492    /// Id of the region.
493    region_id: RegionId,
494    /// Version control of the region.
495    version_control: VersionControlRef,
496    /// Access layer of the region.
497    access_layer: AccessLayerRef,
498    /// Pending waiters for compaction.
499    waiters: Vec<OutputTx>,
500    /// Pending compactions that are supposed to run as soon as current compaction task finished.
501    pending_request: Option<PendingCompaction>,
502}
503
504impl CompactionStatus {
505    /// Creates a new [CompactionStatus]
506    fn new(
507        region_id: RegionId,
508        version_control: VersionControlRef,
509        access_layer: AccessLayerRef,
510    ) -> CompactionStatus {
511        CompactionStatus {
512            region_id,
513            version_control,
514            access_layer,
515            waiters: Vec::new(),
516            pending_request: None,
517        }
518    }
519
520    /// Merge the waiter to the pending compaction.
521    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
522        if let Some(waiter) = waiter.take_inner() {
523            self.waiters.push(waiter);
524        }
525    }
526
527    /// Set pending compaction request or replace current value if already exist.
528    fn set_pending_request(&mut self, pending: PendingCompaction) {
529        if let Some(mut prev) = self.pending_request.replace(pending) {
530            debug!(
531                "Replace pending compaction options with new request {:?} for region: {}",
532                prev.options, self.region_id
533            );
534            if let Some(waiter) = prev.waiter.take_inner() {
535                waiter.send(ManualCompactionOverrideSnafu.fail());
536            }
537        }
538    }
539
540    fn on_failure(mut self, err: Arc<Error>) {
541        for waiter in self.waiters.drain(..) {
542            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
543                region_id: self.region_id,
544            }));
545        }
546
547        if let Some(pending_compaction) = self.pending_request {
548            pending_compaction
549                .waiter
550                .send(Err(err.clone()).context(CompactRegionSnafu {
551                    region_id: self.region_id,
552                }));
553        }
554    }
555
556    /// Creates a new compaction request for compaction picker.
557    ///
558    /// It consumes all pending compaction waiters.
559    #[allow(clippy::too_many_arguments)]
560    fn new_compaction_request(
561        &mut self,
562        request_sender: Sender<WorkerRequest>,
563        mut waiter: OptionOutputTx,
564        engine_config: Arc<MitoConfig>,
565        cache_manager: CacheManagerRef,
566        manifest_ctx: &ManifestContextRef,
567        listener: WorkerListener,
568        schema_metadata_manager: SchemaMetadataManagerRef,
569        max_parallelism: usize,
570    ) -> CompactionRequest {
571        let current_version = CompactionVersion::from(self.version_control.current().version);
572        let start_time = Instant::now();
573        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
574        waiters.extend(std::mem::take(&mut self.waiters));
575
576        if let Some(waiter) = waiter.take_inner() {
577            waiters.push(waiter);
578        }
579
580        CompactionRequest {
581            engine_config,
582            current_version,
583            access_layer: self.access_layer.clone(),
584            request_sender: request_sender.clone(),
585            waiters,
586            start_time,
587            cache_manager,
588            manifest_ctx: manifest_ctx.clone(),
589            listener,
590            schema_metadata_manager,
591            max_parallelism,
592        }
593    }
594}
595
596#[derive(Debug, Clone)]
597pub struct CompactionOutput {
598    /// Compaction output file level.
599    pub output_level: Level,
600    /// Compaction input files.
601    pub inputs: Vec<FileHandle>,
602    /// Whether to remove deletion markers.
603    pub filter_deleted: bool,
604    /// Compaction output time range. Only windowed compaction specifies output time range.
605    pub output_time_range: Option<TimestampRange>,
606}
607
608/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
609#[derive(Debug, Clone, Serialize, Deserialize)]
610pub struct SerializedCompactionOutput {
611    output_level: Level,
612    inputs: Vec<FileMeta>,
613    filter_deleted: bool,
614    output_time_range: Option<TimestampRange>,
615}
616
617/// Builders to create [BoxedBatchReader] for compaction.
618struct CompactionSstReaderBuilder<'a> {
619    metadata: RegionMetadataRef,
620    sst_layer: AccessLayerRef,
621    cache: CacheManagerRef,
622    inputs: &'a [FileHandle],
623    append_mode: bool,
624    filter_deleted: bool,
625    time_range: Option<TimestampRange>,
626    merge_mode: MergeMode,
627}
628
629impl CompactionSstReaderBuilder<'_> {
630    /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
631    async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
632        let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
633            .with_files(self.inputs.to_vec())
634            .with_append_mode(self.append_mode)
635            // We use special cache strategy for compaction.
636            .with_cache(CacheStrategy::Compaction(self.cache))
637            .with_filter_deleted(self.filter_deleted)
638            // We ignore file not found error during compaction.
639            .with_ignore_file_not_found(true)
640            .with_merge_mode(self.merge_mode);
641
642        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
643        // by converting time ranges into predicate.
644        if let Some(time_range) = self.time_range {
645            scan_input =
646                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
647        }
648
649        SeqScan::new(scan_input, true)
650            .build_reader_for_compaction()
651            .await
652    }
653}
654
655/// Converts time range to predicates so that rows outside the range will be filtered.
656fn time_range_to_predicate(
657    range: TimestampRange,
658    metadata: &RegionMetadataRef,
659) -> Result<PredicateGroup> {
660    let ts_col = metadata.time_index_column();
661
662    // safety: time index column's type must be a valid timestamp type.
663    let ts_col_unit = ts_col
664        .column_schema
665        .data_type
666        .as_timestamp()
667        .unwrap()
668        .unit();
669
670    let exprs = match (range.start(), range.end()) {
671        (Some(start), Some(end)) => {
672            vec![
673                datafusion_expr::col(ts_col.column_schema.name.clone())
674                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
675                datafusion_expr::col(ts_col.column_schema.name.clone())
676                    .lt(ts_to_lit(*end, ts_col_unit)?),
677            ]
678        }
679        (Some(start), None) => {
680            vec![datafusion_expr::col(ts_col.column_schema.name.clone())
681                .gt_eq(ts_to_lit(*start, ts_col_unit)?)]
682        }
683
684        (None, Some(end)) => {
685            vec![datafusion_expr::col(ts_col.column_schema.name.clone())
686                .lt(ts_to_lit(*end, ts_col_unit)?)]
687        }
688        (None, None) => {
689            return Ok(PredicateGroup::default());
690        }
691    };
692
693    let predicate = PredicateGroup::new(metadata, &exprs);
694    Ok(predicate)
695}
696
697fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
698    let ts = ts
699        .convert_to(ts_col_unit)
700        .context(TimeRangePredicateOverflowSnafu {
701            timestamp: ts,
702            unit: ts_col_unit,
703        })?;
704    let val = ts.value();
705    let scalar_value = match ts_col_unit {
706        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
707        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
708        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
709        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
710    };
711    Ok(datafusion_expr::lit(scalar_value))
712}
713
714/// Finds all expired SSTs across levels.
715fn get_expired_ssts(
716    levels: &[LevelMeta],
717    ttl: Option<TimeToLive>,
718    now: Timestamp,
719) -> Vec<FileHandle> {
720    let Some(ttl) = ttl else {
721        return vec![];
722    };
723
724    levels
725        .iter()
726        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
727        .collect()
728}
729
730/// Pending compaction request that is supposed to run after current task is finished,
731/// typically used for manual compactions.
732struct PendingCompaction {
733    /// Compaction options. Currently, it can only be [StrictWindow].
734    pub(crate) options: compact_request::Options,
735    /// Waiters of pending requests.
736    pub(crate) waiter: OptionOutputTx,
737    /// Max parallelism for pending compaction.
738    pub(crate) max_parallelism: usize,
739}
740
741#[cfg(test)]
742mod tests {
743    use api::v1::region::StrictWindow;
744    use tokio::sync::oneshot;
745
746    use super::*;
747    use crate::test_util::mock_schema_metadata_manager;
748    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
749    use crate::test_util::version_util::{apply_edit, VersionControlBuilder};
750
751    #[tokio::test]
752    async fn test_schedule_empty() {
753        let env = SchedulerEnv::new().await;
754        let (tx, _rx) = mpsc::channel(4);
755        let mut scheduler = env.mock_compaction_scheduler(tx);
756        let mut builder = VersionControlBuilder::new();
757        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
758        schema_metadata_manager
759            .register_region_table_info(
760                builder.region_id().table_id(),
761                "test_table",
762                "test_catalog",
763                "test_schema",
764                None,
765                kv_backend,
766            )
767            .await;
768        // Nothing to compact.
769        let version_control = Arc::new(builder.build());
770        let (output_tx, output_rx) = oneshot::channel();
771        let waiter = OptionOutputTx::from(output_tx);
772        let manifest_ctx = env
773            .mock_manifest_context(version_control.current().version.metadata.clone())
774            .await;
775        scheduler
776            .schedule_compaction(
777                builder.region_id(),
778                compact_request::Options::Regular(Default::default()),
779                &version_control,
780                &env.access_layer,
781                waiter,
782                &manifest_ctx,
783                schema_metadata_manager.clone(),
784                1,
785            )
786            .await
787            .unwrap();
788        let output = output_rx.await.unwrap().unwrap();
789        assert_eq!(output, 0);
790        assert!(scheduler.region_status.is_empty());
791
792        // Only one file, picker won't compact it.
793        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
794        let (output_tx, output_rx) = oneshot::channel();
795        let waiter = OptionOutputTx::from(output_tx);
796        scheduler
797            .schedule_compaction(
798                builder.region_id(),
799                compact_request::Options::Regular(Default::default()),
800                &version_control,
801                &env.access_layer,
802                waiter,
803                &manifest_ctx,
804                schema_metadata_manager,
805                1,
806            )
807            .await
808            .unwrap();
809        let output = output_rx.await.unwrap().unwrap();
810        assert_eq!(output, 0);
811        assert!(scheduler.region_status.is_empty());
812    }
813
814    #[tokio::test]
815    async fn test_schedule_on_finished() {
816        common_telemetry::init_default_ut_logging();
817        let job_scheduler = Arc::new(VecScheduler::default());
818        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
819        let (tx, _rx) = mpsc::channel(4);
820        let mut scheduler = env.mock_compaction_scheduler(tx);
821        let mut builder = VersionControlBuilder::new();
822        let purger = builder.file_purger();
823        let region_id = builder.region_id();
824
825        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
826        schema_metadata_manager
827            .register_region_table_info(
828                builder.region_id().table_id(),
829                "test_table",
830                "test_catalog",
831                "test_schema",
832                None,
833                kv_backend,
834            )
835            .await;
836
837        // 5 files to compact.
838        let end = 1000 * 1000;
839        let version_control = Arc::new(
840            builder
841                .push_l0_file(0, end)
842                .push_l0_file(10, end)
843                .push_l0_file(50, end)
844                .push_l0_file(80, end)
845                .push_l0_file(90, end)
846                .build(),
847        );
848        let manifest_ctx = env
849            .mock_manifest_context(version_control.current().version.metadata.clone())
850            .await;
851        scheduler
852            .schedule_compaction(
853                region_id,
854                compact_request::Options::Regular(Default::default()),
855                &version_control,
856                &env.access_layer,
857                OptionOutputTx::none(),
858                &manifest_ctx,
859                schema_metadata_manager.clone(),
860                1,
861            )
862            .await
863            .unwrap();
864        // Should schedule 1 compaction.
865        assert_eq!(1, scheduler.region_status.len());
866        assert_eq!(1, job_scheduler.num_jobs());
867        let data = version_control.current();
868        let file_metas: Vec<_> = data.version.ssts.levels()[0]
869            .files
870            .values()
871            .map(|file| file.meta_ref().clone())
872            .collect();
873
874        // 5 files for next compaction and removes old files.
875        apply_edit(
876            &version_control,
877            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
878            &file_metas,
879            purger.clone(),
880        );
881        // The task is pending.
882        let (tx, _rx) = oneshot::channel();
883        scheduler
884            .schedule_compaction(
885                region_id,
886                compact_request::Options::Regular(Default::default()),
887                &version_control,
888                &env.access_layer,
889                OptionOutputTx::new(Some(OutputTx::new(tx))),
890                &manifest_ctx,
891                schema_metadata_manager.clone(),
892                1,
893            )
894            .await
895            .unwrap();
896        assert_eq!(1, scheduler.region_status.len());
897        assert_eq!(1, job_scheduler.num_jobs());
898        assert!(!scheduler
899            .region_status
900            .get(&builder.region_id())
901            .unwrap()
902            .waiters
903            .is_empty());
904
905        // On compaction finished and schedule next compaction.
906        scheduler
907            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
908            .await;
909        assert_eq!(1, scheduler.region_status.len());
910        assert_eq!(2, job_scheduler.num_jobs());
911
912        // 5 files for next compaction.
913        apply_edit(
914            &version_control,
915            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
916            &[],
917            purger.clone(),
918        );
919        let (tx, _rx) = oneshot::channel();
920        // The task is pending.
921        scheduler
922            .schedule_compaction(
923                region_id,
924                compact_request::Options::Regular(Default::default()),
925                &version_control,
926                &env.access_layer,
927                OptionOutputTx::new(Some(OutputTx::new(tx))),
928                &manifest_ctx,
929                schema_metadata_manager,
930                1,
931            )
932            .await
933            .unwrap();
934        assert_eq!(2, job_scheduler.num_jobs());
935        assert!(!scheduler
936            .region_status
937            .get(&builder.region_id())
938            .unwrap()
939            .waiters
940            .is_empty());
941    }
942
943    #[tokio::test]
944    async fn test_manual_compaction_when_compaction_in_progress() {
945        common_telemetry::init_default_ut_logging();
946        let job_scheduler = Arc::new(VecScheduler::default());
947        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
948        let (tx, _rx) = mpsc::channel(4);
949        let mut scheduler = env.mock_compaction_scheduler(tx);
950        let mut builder = VersionControlBuilder::new();
951        let purger = builder.file_purger();
952        let region_id = builder.region_id();
953
954        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
955        schema_metadata_manager
956            .register_region_table_info(
957                builder.region_id().table_id(),
958                "test_table",
959                "test_catalog",
960                "test_schema",
961                None,
962                kv_backend,
963            )
964            .await;
965
966        // 5 files to compact.
967        let end = 1000 * 1000;
968        let version_control = Arc::new(
969            builder
970                .push_l0_file(0, end)
971                .push_l0_file(10, end)
972                .push_l0_file(50, end)
973                .push_l0_file(80, end)
974                .push_l0_file(90, end)
975                .build(),
976        );
977        let manifest_ctx = env
978            .mock_manifest_context(version_control.current().version.metadata.clone())
979            .await;
980
981        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
982            .files
983            .values()
984            .map(|file| file.meta_ref().clone())
985            .collect();
986
987        // 5 files for next compaction and removes old files.
988        apply_edit(
989            &version_control,
990            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
991            &file_metas,
992            purger.clone(),
993        );
994
995        scheduler
996            .schedule_compaction(
997                region_id,
998                compact_request::Options::Regular(Default::default()),
999                &version_control,
1000                &env.access_layer,
1001                OptionOutputTx::none(),
1002                &manifest_ctx,
1003                schema_metadata_manager.clone(),
1004                1,
1005            )
1006            .await
1007            .unwrap();
1008        // Should schedule 1 compaction.
1009        assert_eq!(1, scheduler.region_status.len());
1010        assert_eq!(1, job_scheduler.num_jobs());
1011        assert!(scheduler
1012            .region_status
1013            .get(&region_id)
1014            .unwrap()
1015            .pending_request
1016            .is_none());
1017
1018        // Schedule another manual compaction.
1019        let (tx, _rx) = oneshot::channel();
1020        scheduler
1021            .schedule_compaction(
1022                region_id,
1023                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1024                &version_control,
1025                &env.access_layer,
1026                OptionOutputTx::new(Some(OutputTx::new(tx))),
1027                &manifest_ctx,
1028                schema_metadata_manager.clone(),
1029                1,
1030            )
1031            .await
1032            .unwrap();
1033        assert_eq!(1, scheduler.region_status.len());
1034        // Current job num should be 1 since compaction is in progress.
1035        assert_eq!(1, job_scheduler.num_jobs());
1036        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1037        assert!(status.pending_request.is_some());
1038
1039        // On compaction finished and schedule next compaction.
1040        scheduler
1041            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1042            .await;
1043        assert_eq!(1, scheduler.region_status.len());
1044        assert_eq!(2, job_scheduler.num_jobs());
1045
1046        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1047        assert!(status.pending_request.is_none());
1048    }
1049}