mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::projection::ProjectionMapper;
62use crate::read::scan_region::{PredicateGroup, ScanInput};
63use crate::read::seq_scan::SeqScan;
64use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
65use crate::region::options::{MergeMode, RegionOptions};
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77/// Region compaction request.
78pub struct CompactionRequest {
79    pub(crate) engine_config: Arc<MitoConfig>,
80    pub(crate) current_version: CompactionVersion,
81    pub(crate) access_layer: AccessLayerRef,
82    /// Sender to send notification to the region worker.
83    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84    /// Waiters of the compaction request.
85    pub(crate) waiters: Vec<OutputTx>,
86    /// Start time of compaction task.
87    pub(crate) start_time: Instant,
88    pub(crate) cache_manager: CacheManagerRef,
89    pub(crate) manifest_ctx: ManifestContextRef,
90    pub(crate) listener: WorkerListener,
91    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92    pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96    pub(crate) fn region_id(&self) -> RegionId {
97        self.current_version.metadata.region_id
98    }
99}
100
101/// Compaction scheduler tracks and manages compaction tasks.
102pub(crate) struct CompactionScheduler {
103    scheduler: SchedulerRef,
104    /// Compacting regions.
105    region_status: HashMap<RegionId, CompactionStatus>,
106    /// Request sender of the worker that this scheduler belongs to.
107    request_sender: Sender<WorkerRequestWithTime>,
108    cache_manager: CacheManagerRef,
109    engine_config: Arc<MitoConfig>,
110    memory_manager: Arc<CompactionMemoryManager>,
111    memory_policy: OnExhaustedPolicy,
112    listener: WorkerListener,
113    /// Plugins for the compaction scheduler.
114    plugins: Plugins,
115}
116
117impl CompactionScheduler {
118    #[allow(clippy::too_many_arguments)]
119    pub(crate) fn new(
120        scheduler: SchedulerRef,
121        request_sender: Sender<WorkerRequestWithTime>,
122        cache_manager: CacheManagerRef,
123        engine_config: Arc<MitoConfig>,
124        listener: WorkerListener,
125        plugins: Plugins,
126        memory_manager: Arc<CompactionMemoryManager>,
127        memory_policy: OnExhaustedPolicy,
128    ) -> Self {
129        Self {
130            scheduler,
131            region_status: HashMap::new(),
132            request_sender,
133            cache_manager,
134            engine_config,
135            memory_manager,
136            memory_policy,
137            listener,
138            plugins,
139        }
140    }
141
142    /// Schedules a compaction for the region.
143    #[allow(clippy::too_many_arguments)]
144    pub(crate) async fn schedule_compaction(
145        &mut self,
146        region_id: RegionId,
147        compact_options: compact_request::Options,
148        version_control: &VersionControlRef,
149        access_layer: &AccessLayerRef,
150        waiter: OptionOutputTx,
151        manifest_ctx: &ManifestContextRef,
152        schema_metadata_manager: SchemaMetadataManagerRef,
153        max_parallelism: usize,
154    ) -> Result<()> {
155        // skip compaction if region is in staging state
156        let current_state = manifest_ctx.current_state();
157        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158            info!(
159                "Skipping compaction for region {} in staging mode, options: {:?}",
160                region_id, compact_options
161            );
162            waiter.send(Ok(0));
163            return Ok(());
164        }
165
166        if let Some(status) = self.region_status.get_mut(&region_id) {
167            match compact_options {
168                Options::Regular(_) => {
169                    // Region is compacting. Add the waiter to pending list.
170                    status.merge_waiter(waiter);
171                }
172                options @ Options::StrictWindow(_) => {
173                    // Incoming compaction request is manually triggered.
174                    status.set_pending_request(PendingCompaction {
175                        options,
176                        waiter,
177                        max_parallelism,
178                    });
179                    info!(
180                        "Region {} is compacting, manually compaction will be re-scheduled.",
181                        region_id
182                    );
183                }
184            }
185            return Ok(());
186        }
187
188        // The region can compact directly.
189        let mut status =
190            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191        let request = status.new_compaction_request(
192            self.request_sender.clone(),
193            waiter,
194            self.engine_config.clone(),
195            self.cache_manager.clone(),
196            manifest_ctx,
197            self.listener.clone(),
198            schema_metadata_manager,
199            max_parallelism,
200        );
201        self.region_status.insert(region_id, status);
202        let result = self
203            .schedule_compaction_request(request, compact_options)
204            .await;
205
206        self.listener.on_compaction_scheduled(region_id);
207        result
208    }
209
210    /// Notifies the scheduler that the compaction job is finished successfully.
211    pub(crate) async fn on_compaction_finished(
212        &mut self,
213        region_id: RegionId,
214        manifest_ctx: &ManifestContextRef,
215        schema_metadata_manager: SchemaMetadataManagerRef,
216    ) {
217        let Some(status) = self.region_status.get_mut(&region_id) else {
218            return;
219        };
220
221        if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
222            let PendingCompaction {
223                options,
224                waiter,
225                max_parallelism,
226            } = pending_request;
227
228            let request = status.new_compaction_request(
229                self.request_sender.clone(),
230                waiter,
231                self.engine_config.clone(),
232                self.cache_manager.clone(),
233                manifest_ctx,
234                self.listener.clone(),
235                schema_metadata_manager,
236                max_parallelism,
237            );
238
239            if let Err(e) = self.schedule_compaction_request(request, options).await {
240                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
241            } else {
242                debug!(
243                    "Successfully scheduled manual compaction for region id: {}",
244                    region_id
245                );
246            }
247            return;
248        }
249
250        // We should always try to compact the region until picker returns None.
251        let request = status.new_compaction_request(
252            self.request_sender.clone(),
253            OptionOutputTx::none(),
254            self.engine_config.clone(),
255            self.cache_manager.clone(),
256            manifest_ctx,
257            self.listener.clone(),
258            schema_metadata_manager,
259            MAX_PARALLEL_COMPACTION,
260        );
261        // Try to schedule next compaction task for this region.
262        if let Err(e) = self
263            .schedule_compaction_request(
264                request,
265                compact_request::Options::Regular(Default::default()),
266            )
267            .await
268        {
269            error!(e; "Failed to schedule next compaction for region {}", region_id);
270        }
271    }
272
273    /// Notifies the scheduler that the compaction job is failed.
274    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
275        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
276        // Remove this region.
277        let Some(status) = self.region_status.remove(&region_id) else {
278            return;
279        };
280
281        // Fast fail: cancels all pending tasks and sends error to their waiters.
282        status.on_failure(err);
283    }
284
285    /// Notifies the scheduler that the region is dropped.
286    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
287        self.remove_region_on_failure(
288            region_id,
289            Arc::new(RegionDroppedSnafu { region_id }.build()),
290        );
291    }
292
293    /// Notifies the scheduler that the region is closed.
294    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
295        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
296    }
297
298    /// Notifies the scheduler that the region is truncated.
299    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
300        self.remove_region_on_failure(
301            region_id,
302            Arc::new(RegionTruncatedSnafu { region_id }.build()),
303        );
304    }
305
306    /// Schedules a compaction request.
307    ///
308    /// If the region has nothing to compact, it removes the region from the status map.
309    async fn schedule_compaction_request(
310        &mut self,
311        request: CompactionRequest,
312        options: compact_request::Options,
313    ) -> Result<()> {
314        let region_id = request.region_id();
315        let (dynamic_compaction_opts, ttl) = find_dynamic_options(
316            region_id.table_id(),
317            &request.current_version.options,
318            &request.schema_metadata_manager,
319        )
320        .await
321        .unwrap_or_else(|e| {
322            warn!(e; "Failed to find dynamic options for region: {}", region_id);
323            (
324                request.current_version.options.compaction.clone(),
325                request.current_version.options.ttl.unwrap_or_default(),
326            )
327        });
328
329        let picker = new_picker(
330            &options,
331            &dynamic_compaction_opts,
332            request.current_version.options.append_mode,
333            Some(self.engine_config.max_background_compactions),
334        );
335        let region_id = request.region_id();
336        let CompactionRequest {
337            engine_config,
338            current_version,
339            access_layer,
340            request_sender,
341            waiters,
342            start_time,
343            cache_manager,
344            manifest_ctx,
345            listener,
346            schema_metadata_manager: _,
347            max_parallelism,
348        } = request;
349
350        debug!(
351            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
352            picker, region_id, ttl
353        );
354
355        let compaction_region = CompactionRegion {
356            region_id,
357            current_version: current_version.clone(),
358            region_options: RegionOptions {
359                compaction: dynamic_compaction_opts.clone(),
360                ..current_version.options.clone()
361            },
362            engine_config: engine_config.clone(),
363            region_metadata: current_version.metadata.clone(),
364            cache_manager: cache_manager.clone(),
365            access_layer: access_layer.clone(),
366            manifest_ctx: manifest_ctx.clone(),
367            file_purger: None,
368            ttl: Some(ttl),
369            max_parallelism,
370        };
371
372        let picker_output = {
373            let _pick_timer = COMPACTION_STAGE_ELAPSED
374                .with_label_values(&["pick"])
375                .start_timer();
376            picker.pick(&compaction_region)
377        };
378
379        let picker_output = if let Some(picker_output) = picker_output {
380            picker_output
381        } else {
382            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
383            for waiter in waiters {
384                waiter.send(Ok(0));
385            }
386            self.region_status.remove(&region_id);
387            return Ok(());
388        };
389
390        // If specified to run compaction remotely, we schedule the compaction job remotely.
391        // It will fall back to local compaction if there is no remote job scheduler.
392        let waiters = if dynamic_compaction_opts.remote_compaction() {
393            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
394                let remote_compaction_job = CompactionJob {
395                    compaction_region: compaction_region.clone(),
396                    picker_output: picker_output.clone(),
397                    start_time,
398                    waiters,
399                    ttl,
400                };
401
402                let result = remote_job_scheduler
403                    .schedule(
404                        RemoteJob::CompactionJob(remote_compaction_job),
405                        Box::new(DefaultNotifier {
406                            request_sender: request_sender.clone(),
407                        }),
408                    )
409                    .await;
410
411                match result {
412                    Ok(job_id) => {
413                        info!(
414                            "Scheduled remote compaction job {} for region {}",
415                            job_id, region_id
416                        );
417                        INFLIGHT_COMPACTION_COUNT.inc();
418                        return Ok(());
419                    }
420                    Err(e) => {
421                        if !dynamic_compaction_opts.fallback_to_local() {
422                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
423                            return RemoteCompactionSnafu {
424                                region_id,
425                                job_id: None,
426                                reason: e.reason,
427                            }
428                            .fail();
429                        }
430
431                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
432
433                        // Return the waiters back to the caller for local compaction.
434                        e.waiters
435                    }
436                }
437            } else {
438                debug!(
439                    "Remote compaction is not enabled, fallback to local compaction for region {}",
440                    region_id
441                );
442                waiters
443            }
444        } else {
445            waiters
446        };
447
448        // Create a local compaction task.
449        let estimated_bytes = estimate_compaction_bytes(&picker_output);
450        let local_compaction_task = Box::new(CompactionTaskImpl {
451            request_sender,
452            waiters,
453            start_time,
454            listener,
455            picker_output,
456            compaction_region,
457            compactor: Arc::new(DefaultCompactor {}),
458            memory_manager: self.memory_manager.clone(),
459            memory_policy: self.memory_policy,
460            estimated_memory_bytes: estimated_bytes,
461        });
462
463        self.submit_compaction_task(local_compaction_task, region_id)
464    }
465
466    fn submit_compaction_task(
467        &mut self,
468        mut task: Box<CompactionTaskImpl>,
469        region_id: RegionId,
470    ) -> Result<()> {
471        self.scheduler
472            .schedule(Box::pin(async move {
473                INFLIGHT_COMPACTION_COUNT.inc();
474                task.run().await;
475                INFLIGHT_COMPACTION_COUNT.dec();
476            }))
477            .map_err(|e| {
478                error!(e; "Failed to submit compaction request for region {}", region_id);
479                self.region_status.remove(&region_id);
480                e
481            })
482    }
483
484    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
485        // Remove this region.
486        let Some(status) = self.region_status.remove(&region_id) else {
487            return;
488        };
489
490        // Notifies all pending tasks.
491        status.on_failure(err);
492    }
493}
494
495impl Drop for CompactionScheduler {
496    fn drop(&mut self) {
497        for (region_id, status) in self.region_status.drain() {
498            // We are shutting down so notify all pending tasks.
499            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
500        }
501    }
502}
503
504/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
505async fn find_dynamic_options(
506    table_id: TableId,
507    region_options: &crate::region::options::RegionOptions,
508    schema_metadata_manager: &SchemaMetadataManagerRef,
509) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
510    if region_options.compaction_override && region_options.ttl.is_some() {
511        debug!(
512            "Use region options directly for table {}: compaction={:?}, ttl={:?}",
513            table_id, region_options.compaction, region_options.ttl
514        );
515        return Ok((
516            region_options.compaction.clone(),
517            region_options.ttl.unwrap(),
518        ));
519    }
520
521    let db_options = tokio::time::timeout(
522        crate::config::FETCH_OPTION_TIMEOUT,
523        schema_metadata_manager.get_schema_options_by_table_id(table_id),
524    )
525    .await
526    .context(TimeoutSnafu)?
527    .context(GetSchemaMetadataSnafu)?;
528
529    let ttl = if region_options.ttl.is_some() {
530        debug!(
531            "Use region TTL directly for table {}: ttl={:?}",
532            table_id, region_options.ttl
533        );
534        region_options.ttl.unwrap()
535    } else {
536        db_options
537            .as_ref()
538            .and_then(|options| options.ttl)
539            .unwrap_or_default()
540            .into()
541    };
542
543    let compaction = if !region_options.compaction_override {
544        if let Some(schema_opts) = db_options {
545            let map: HashMap<String, String> = schema_opts
546                .extra_options
547                .iter()
548                .filter_map(|(k, v)| {
549                    if k.starts_with("compaction.") {
550                        Some((k.clone(), v.clone()))
551                    } else {
552                        None
553                    }
554                })
555                .collect();
556            if map.is_empty() {
557                region_options.compaction.clone()
558            } else {
559                crate::region::options::RegionOptions::try_from(&map)
560                    .map(|o| o.compaction)
561                    .unwrap_or_else(|e| {
562                        error!(e; "Failed to create RegionOptions from map");
563                        region_options.compaction.clone()
564                    })
565            }
566        } else {
567            debug!(
568                "DB options is None for table {}, use region compaction: compaction={:?}",
569                table_id, region_options.compaction
570            );
571            region_options.compaction.clone()
572        }
573    } else {
574        debug!(
575            "No schema options for table {}, use region compaction: compaction={:?}",
576            table_id, region_options.compaction
577        );
578        region_options.compaction.clone()
579    };
580
581    debug!(
582        "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
583        table_id, compaction, ttl
584    );
585    Ok((compaction, ttl))
586}
587
588/// Status of running and pending region compaction tasks.
589struct CompactionStatus {
590    /// Id of the region.
591    region_id: RegionId,
592    /// Version control of the region.
593    version_control: VersionControlRef,
594    /// Access layer of the region.
595    access_layer: AccessLayerRef,
596    /// Pending waiters for compaction.
597    waiters: Vec<OutputTx>,
598    /// Pending compactions that are supposed to run as soon as current compaction task finished.
599    pending_request: Option<PendingCompaction>,
600}
601
602impl CompactionStatus {
603    /// Creates a new [CompactionStatus]
604    fn new(
605        region_id: RegionId,
606        version_control: VersionControlRef,
607        access_layer: AccessLayerRef,
608    ) -> CompactionStatus {
609        CompactionStatus {
610            region_id,
611            version_control,
612            access_layer,
613            waiters: Vec::new(),
614            pending_request: None,
615        }
616    }
617
618    /// Merge the waiter to the pending compaction.
619    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
620        if let Some(waiter) = waiter.take_inner() {
621            self.waiters.push(waiter);
622        }
623    }
624
625    /// Set pending compaction request or replace current value if already exist.
626    fn set_pending_request(&mut self, pending: PendingCompaction) {
627        if let Some(prev) = self.pending_request.replace(pending) {
628            debug!(
629                "Replace pending compaction options with new request {:?} for region: {}",
630                prev.options, self.region_id
631            );
632            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
633        }
634    }
635
636    fn on_failure(mut self, err: Arc<Error>) {
637        for waiter in self.waiters.drain(..) {
638            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
639                region_id: self.region_id,
640            }));
641        }
642
643        if let Some(pending_compaction) = self.pending_request {
644            pending_compaction
645                .waiter
646                .send(Err(err.clone()).context(CompactRegionSnafu {
647                    region_id: self.region_id,
648                }));
649        }
650    }
651
652    /// Creates a new compaction request for compaction picker.
653    ///
654    /// It consumes all pending compaction waiters.
655    #[allow(clippy::too_many_arguments)]
656    fn new_compaction_request(
657        &mut self,
658        request_sender: Sender<WorkerRequestWithTime>,
659        mut waiter: OptionOutputTx,
660        engine_config: Arc<MitoConfig>,
661        cache_manager: CacheManagerRef,
662        manifest_ctx: &ManifestContextRef,
663        listener: WorkerListener,
664        schema_metadata_manager: SchemaMetadataManagerRef,
665        max_parallelism: usize,
666    ) -> CompactionRequest {
667        let current_version = CompactionVersion::from(self.version_control.current().version);
668        let start_time = Instant::now();
669        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
670        waiters.extend(std::mem::take(&mut self.waiters));
671
672        if let Some(waiter) = waiter.take_inner() {
673            waiters.push(waiter);
674        }
675
676        CompactionRequest {
677            engine_config,
678            current_version,
679            access_layer: self.access_layer.clone(),
680            request_sender: request_sender.clone(),
681            waiters,
682            start_time,
683            cache_manager,
684            manifest_ctx: manifest_ctx.clone(),
685            listener,
686            schema_metadata_manager,
687            max_parallelism,
688        }
689    }
690}
691
692#[derive(Debug, Clone)]
693pub struct CompactionOutput {
694    /// Compaction output file level.
695    pub output_level: Level,
696    /// Compaction input files.
697    pub inputs: Vec<FileHandle>,
698    /// Whether to remove deletion markers.
699    pub filter_deleted: bool,
700    /// Compaction output time range. Only windowed compaction specifies output time range.
701    pub output_time_range: Option<TimestampRange>,
702}
703
704/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
705#[derive(Debug, Clone, Serialize, Deserialize)]
706pub struct SerializedCompactionOutput {
707    output_level: Level,
708    inputs: Vec<FileMeta>,
709    filter_deleted: bool,
710    output_time_range: Option<TimestampRange>,
711}
712
713/// Builders to create [BoxedBatchReader] for compaction.
714struct CompactionSstReaderBuilder<'a> {
715    metadata: RegionMetadataRef,
716    sst_layer: AccessLayerRef,
717    cache: CacheManagerRef,
718    inputs: &'a [FileHandle],
719    append_mode: bool,
720    filter_deleted: bool,
721    time_range: Option<TimestampRange>,
722    merge_mode: MergeMode,
723}
724
725impl CompactionSstReaderBuilder<'_> {
726    /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
727    async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
728        let scan_input = self.build_scan_input(false)?.with_compaction(true);
729
730        SeqScan::new(scan_input).build_reader_for_compaction().await
731    }
732
733    /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
734    async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
735        let scan_input = self.build_scan_input(true)?.with_compaction(true);
736
737        SeqScan::new(scan_input)
738            .build_flat_reader_for_compaction()
739            .await
740    }
741
742    fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
743        let mut scan_input = ScanInput::new(
744            self.sst_layer,
745            ProjectionMapper::all(&self.metadata, flat_format)?,
746        )
747        .with_files(self.inputs.to_vec())
748        .with_append_mode(self.append_mode)
749        // We use special cache strategy for compaction.
750        .with_cache(CacheStrategy::Compaction(self.cache))
751        .with_filter_deleted(self.filter_deleted)
752        // We ignore file not found error during compaction.
753        .with_ignore_file_not_found(true)
754        .with_merge_mode(self.merge_mode)
755        .with_flat_format(flat_format);
756
757        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
758        // by converting time ranges into predicate.
759        if let Some(time_range) = self.time_range {
760            scan_input =
761                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
762        }
763
764        Ok(scan_input)
765    }
766}
767
768/// Converts time range to predicates so that rows outside the range will be filtered.
769fn time_range_to_predicate(
770    range: TimestampRange,
771    metadata: &RegionMetadataRef,
772) -> Result<PredicateGroup> {
773    let ts_col = metadata.time_index_column();
774
775    // safety: time index column's type must be a valid timestamp type.
776    let ts_col_unit = ts_col
777        .column_schema
778        .data_type
779        .as_timestamp()
780        .unwrap()
781        .unit();
782
783    let exprs = match (range.start(), range.end()) {
784        (Some(start), Some(end)) => {
785            vec![
786                datafusion_expr::col(ts_col.column_schema.name.clone())
787                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
788                datafusion_expr::col(ts_col.column_schema.name.clone())
789                    .lt(ts_to_lit(*end, ts_col_unit)?),
790            ]
791        }
792        (Some(start), None) => {
793            vec![
794                datafusion_expr::col(ts_col.column_schema.name.clone())
795                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
796            ]
797        }
798
799        (None, Some(end)) => {
800            vec![
801                datafusion_expr::col(ts_col.column_schema.name.clone())
802                    .lt(ts_to_lit(*end, ts_col_unit)?),
803            ]
804        }
805        (None, None) => {
806            return Ok(PredicateGroup::default());
807        }
808    };
809
810    let predicate = PredicateGroup::new(metadata, &exprs)?;
811    Ok(predicate)
812}
813
814fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
815    let ts = ts
816        .convert_to(ts_col_unit)
817        .context(TimeRangePredicateOverflowSnafu {
818            timestamp: ts,
819            unit: ts_col_unit,
820        })?;
821    let val = ts.value();
822    let scalar_value = match ts_col_unit {
823        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
824        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
825        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
826        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
827    };
828    Ok(datafusion_expr::lit(scalar_value))
829}
830
831/// Finds all expired SSTs across levels.
832fn get_expired_ssts(
833    levels: &[LevelMeta],
834    ttl: Option<TimeToLive>,
835    now: Timestamp,
836) -> Vec<FileHandle> {
837    let Some(ttl) = ttl else {
838        return vec![];
839    };
840
841    levels
842        .iter()
843        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
844        .collect()
845}
846
847/// Estimates compaction memory as the sum of all input files' maximum row-group
848/// uncompressed sizes.
849fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
850    picker_output
851        .outputs
852        .iter()
853        .flat_map(|output| output.inputs.iter())
854        .map(|file: &FileHandle| {
855            let meta = file.meta_ref();
856            meta.max_row_group_uncompressed_size
857        })
858        .sum()
859}
860
861/// Pending compaction request that is supposed to run after current task is finished,
862/// typically used for manual compactions.
863struct PendingCompaction {
864    /// Compaction options. Currently, it can only be [StrictWindow].
865    pub(crate) options: compact_request::Options,
866    /// Waiters of pending requests.
867    pub(crate) waiter: OptionOutputTx,
868    /// Max parallelism for pending compaction.
869    pub(crate) max_parallelism: usize,
870}
871
872#[cfg(test)]
873mod tests {
874    use std::time::Duration;
875
876    use api::v1::region::StrictWindow;
877    use common_datasource::compression::CompressionType;
878    use common_meta::key::schema_name::SchemaNameValue;
879    use common_time::DatabaseTimeToLive;
880    use tokio::sync::{Barrier, oneshot};
881
882    use super::*;
883    use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
884    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
885    use crate::region::ManifestContext;
886    use crate::sst::FormatType;
887    use crate::test_util::mock_schema_metadata_manager;
888    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
889    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
890
891    #[tokio::test]
892    async fn test_find_compaction_options_db_level() {
893        let env = SchedulerEnv::new().await;
894        let builder = VersionControlBuilder::new();
895        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
896        let region_id = builder.region_id();
897        let table_id = region_id.table_id();
898        // Register table without ttl but with db-level compaction options
899        let mut schema_value = SchemaNameValue {
900            ttl: Some(DatabaseTimeToLive::default()),
901            ..Default::default()
902        };
903        schema_value
904            .extra_options
905            .insert("compaction.type".to_string(), "twcs".to_string());
906        schema_value
907            .extra_options
908            .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
909        schema_metadata_manager
910            .register_region_table_info(
911                table_id,
912                "t",
913                "c",
914                "s",
915                Some(schema_value),
916                kv_backend.clone(),
917            )
918            .await;
919
920        let version_control = Arc::new(builder.build());
921        let region_opts = version_control.current().version.options.clone();
922        let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
923            .await
924            .unwrap();
925        match opts {
926            crate::region::options::CompactionOptions::Twcs(t) => {
927                assert_eq!(t.time_window_seconds(), Some(2 * 3600));
928            }
929        }
930        let manifest_ctx = env
931            .mock_manifest_context(version_control.current().version.metadata.clone())
932            .await;
933        let (tx, _rx) = mpsc::channel(4);
934        let mut scheduler = env.mock_compaction_scheduler(tx);
935        let (otx, _orx) = oneshot::channel();
936        let request = scheduler
937            .region_status
938            .entry(region_id)
939            .or_insert_with(|| {
940                crate::compaction::CompactionStatus::new(
941                    region_id,
942                    version_control.clone(),
943                    env.access_layer.clone(),
944                )
945            })
946            .new_compaction_request(
947                scheduler.request_sender.clone(),
948                OptionOutputTx::new(Some(OutputTx::new(otx))),
949                scheduler.engine_config.clone(),
950                scheduler.cache_manager.clone(),
951                &manifest_ctx,
952                scheduler.listener.clone(),
953                schema_metadata_manager.clone(),
954                1,
955            );
956        scheduler
957            .schedule_compaction_request(
958                request,
959                compact_request::Options::Regular(Default::default()),
960            )
961            .await
962            .unwrap();
963    }
964
965    #[tokio::test]
966    async fn test_find_compaction_options_priority() {
967        fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
968            let mut schema_value = SchemaNameValue {
969                ttl: Some(DatabaseTimeToLive::default()),
970                ..Default::default()
971            };
972            schema_value
973                .extra_options
974                .insert("compaction.type".to_string(), "twcs".to_string());
975            schema_value.extra_options.insert(
976                "compaction.twcs.time_window".to_string(),
977                time_window.to_string(),
978            );
979            schema_value
980        }
981
982        let cases = [
983            (
984                "db options set and table override set",
985                Some(schema_value_with_twcs("2h")),
986                true,
987                Some(Duration::from_secs(5 * 3600)),
988                Some(5 * 3600),
989            ),
990            (
991                "db options set and table override not set",
992                Some(schema_value_with_twcs("2h")),
993                false,
994                None,
995                Some(2 * 3600),
996            ),
997            (
998                "db options not set and table override set",
999                None,
1000                true,
1001                Some(Duration::from_secs(4 * 3600)),
1002                Some(4 * 3600),
1003            ),
1004            (
1005                "db options not set and table override not set",
1006                None,
1007                false,
1008                None,
1009                None,
1010            ),
1011        ];
1012
1013        for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1014            let builder = VersionControlBuilder::new();
1015            let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1016            let table_id = builder.region_id().table_id();
1017            schema_metadata_manager
1018                .register_region_table_info(
1019                    table_id,
1020                    "t",
1021                    "c",
1022                    "s",
1023                    schema_value,
1024                    kv_backend.clone(),
1025                )
1026                .await;
1027
1028            let version_control = Arc::new(builder.build());
1029            let mut region_opts = version_control.current().version.options.clone();
1030            region_opts.compaction_override = override_set;
1031            if let Some(window) = table_window {
1032                let crate::region::options::CompactionOptions::Twcs(twcs) =
1033                    &mut region_opts.compaction;
1034                twcs.time_window = Some(window);
1035            }
1036
1037            let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
1038                .await
1039                .unwrap();
1040            match opts {
1041                crate::region::options::CompactionOptions::Twcs(t) => {
1042                    assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1043                }
1044            }
1045        }
1046    }
1047
1048    #[tokio::test]
1049    async fn test_schedule_empty() {
1050        let env = SchedulerEnv::new().await;
1051        let (tx, _rx) = mpsc::channel(4);
1052        let mut scheduler = env.mock_compaction_scheduler(tx);
1053        let mut builder = VersionControlBuilder::new();
1054        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1055        schema_metadata_manager
1056            .register_region_table_info(
1057                builder.region_id().table_id(),
1058                "test_table",
1059                "test_catalog",
1060                "test_schema",
1061                None,
1062                kv_backend,
1063            )
1064            .await;
1065        // Nothing to compact.
1066        let version_control = Arc::new(builder.build());
1067        let (output_tx, output_rx) = oneshot::channel();
1068        let waiter = OptionOutputTx::from(output_tx);
1069        let manifest_ctx = env
1070            .mock_manifest_context(version_control.current().version.metadata.clone())
1071            .await;
1072        scheduler
1073            .schedule_compaction(
1074                builder.region_id(),
1075                compact_request::Options::Regular(Default::default()),
1076                &version_control,
1077                &env.access_layer,
1078                waiter,
1079                &manifest_ctx,
1080                schema_metadata_manager.clone(),
1081                1,
1082            )
1083            .await
1084            .unwrap();
1085        let output = output_rx.await.unwrap().unwrap();
1086        assert_eq!(output, 0);
1087        assert!(scheduler.region_status.is_empty());
1088
1089        // Only one file, picker won't compact it.
1090        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1091        let (output_tx, output_rx) = oneshot::channel();
1092        let waiter = OptionOutputTx::from(output_tx);
1093        scheduler
1094            .schedule_compaction(
1095                builder.region_id(),
1096                compact_request::Options::Regular(Default::default()),
1097                &version_control,
1098                &env.access_layer,
1099                waiter,
1100                &manifest_ctx,
1101                schema_metadata_manager,
1102                1,
1103            )
1104            .await
1105            .unwrap();
1106        let output = output_rx.await.unwrap().unwrap();
1107        assert_eq!(output, 0);
1108        assert!(scheduler.region_status.is_empty());
1109    }
1110
1111    #[tokio::test]
1112    async fn test_schedule_on_finished() {
1113        common_telemetry::init_default_ut_logging();
1114        let job_scheduler = Arc::new(VecScheduler::default());
1115        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1116        let (tx, _rx) = mpsc::channel(4);
1117        let mut scheduler = env.mock_compaction_scheduler(tx);
1118        let mut builder = VersionControlBuilder::new();
1119        let purger = builder.file_purger();
1120        let region_id = builder.region_id();
1121
1122        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1123        schema_metadata_manager
1124            .register_region_table_info(
1125                builder.region_id().table_id(),
1126                "test_table",
1127                "test_catalog",
1128                "test_schema",
1129                None,
1130                kv_backend,
1131            )
1132            .await;
1133
1134        // 5 files to compact.
1135        let end = 1000 * 1000;
1136        let version_control = Arc::new(
1137            builder
1138                .push_l0_file(0, end)
1139                .push_l0_file(10, end)
1140                .push_l0_file(50, end)
1141                .push_l0_file(80, end)
1142                .push_l0_file(90, end)
1143                .build(),
1144        );
1145        let manifest_ctx = env
1146            .mock_manifest_context(version_control.current().version.metadata.clone())
1147            .await;
1148        scheduler
1149            .schedule_compaction(
1150                region_id,
1151                compact_request::Options::Regular(Default::default()),
1152                &version_control,
1153                &env.access_layer,
1154                OptionOutputTx::none(),
1155                &manifest_ctx,
1156                schema_metadata_manager.clone(),
1157                1,
1158            )
1159            .await
1160            .unwrap();
1161        // Should schedule 1 compaction.
1162        assert_eq!(1, scheduler.region_status.len());
1163        assert_eq!(1, job_scheduler.num_jobs());
1164        let data = version_control.current();
1165        let file_metas: Vec<_> = data.version.ssts.levels()[0]
1166            .files
1167            .values()
1168            .map(|file| file.meta_ref().clone())
1169            .collect();
1170
1171        // 5 files for next compaction and removes old files.
1172        apply_edit(
1173            &version_control,
1174            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1175            &file_metas,
1176            purger.clone(),
1177        );
1178        // The task is pending.
1179        let (tx, _rx) = oneshot::channel();
1180        scheduler
1181            .schedule_compaction(
1182                region_id,
1183                compact_request::Options::Regular(Default::default()),
1184                &version_control,
1185                &env.access_layer,
1186                OptionOutputTx::new(Some(OutputTx::new(tx))),
1187                &manifest_ctx,
1188                schema_metadata_manager.clone(),
1189                1,
1190            )
1191            .await
1192            .unwrap();
1193        assert_eq!(1, scheduler.region_status.len());
1194        assert_eq!(1, job_scheduler.num_jobs());
1195        assert!(
1196            !scheduler
1197                .region_status
1198                .get(&builder.region_id())
1199                .unwrap()
1200                .waiters
1201                .is_empty()
1202        );
1203
1204        // On compaction finished and schedule next compaction.
1205        scheduler
1206            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1207            .await;
1208        assert_eq!(1, scheduler.region_status.len());
1209        assert_eq!(2, job_scheduler.num_jobs());
1210
1211        // 5 files for next compaction.
1212        apply_edit(
1213            &version_control,
1214            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1215            &[],
1216            purger.clone(),
1217        );
1218        let (tx, _rx) = oneshot::channel();
1219        // The task is pending.
1220        scheduler
1221            .schedule_compaction(
1222                region_id,
1223                compact_request::Options::Regular(Default::default()),
1224                &version_control,
1225                &env.access_layer,
1226                OptionOutputTx::new(Some(OutputTx::new(tx))),
1227                &manifest_ctx,
1228                schema_metadata_manager,
1229                1,
1230            )
1231            .await
1232            .unwrap();
1233        assert_eq!(2, job_scheduler.num_jobs());
1234        assert!(
1235            !scheduler
1236                .region_status
1237                .get(&builder.region_id())
1238                .unwrap()
1239                .waiters
1240                .is_empty()
1241        );
1242    }
1243
1244    #[tokio::test]
1245    async fn test_manual_compaction_when_compaction_in_progress() {
1246        common_telemetry::init_default_ut_logging();
1247        let job_scheduler = Arc::new(VecScheduler::default());
1248        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1249        let (tx, _rx) = mpsc::channel(4);
1250        let mut scheduler = env.mock_compaction_scheduler(tx);
1251        let mut builder = VersionControlBuilder::new();
1252        let purger = builder.file_purger();
1253        let region_id = builder.region_id();
1254
1255        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1256        schema_metadata_manager
1257            .register_region_table_info(
1258                builder.region_id().table_id(),
1259                "test_table",
1260                "test_catalog",
1261                "test_schema",
1262                None,
1263                kv_backend,
1264            )
1265            .await;
1266
1267        // 5 files to compact.
1268        let end = 1000 * 1000;
1269        let version_control = Arc::new(
1270            builder
1271                .push_l0_file(0, end)
1272                .push_l0_file(10, end)
1273                .push_l0_file(50, end)
1274                .push_l0_file(80, end)
1275                .push_l0_file(90, end)
1276                .build(),
1277        );
1278        let manifest_ctx = env
1279            .mock_manifest_context(version_control.current().version.metadata.clone())
1280            .await;
1281
1282        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1283            .files
1284            .values()
1285            .map(|file| file.meta_ref().clone())
1286            .collect();
1287
1288        // 5 files for next compaction and removes old files.
1289        apply_edit(
1290            &version_control,
1291            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1292            &file_metas,
1293            purger.clone(),
1294        );
1295
1296        scheduler
1297            .schedule_compaction(
1298                region_id,
1299                compact_request::Options::Regular(Default::default()),
1300                &version_control,
1301                &env.access_layer,
1302                OptionOutputTx::none(),
1303                &manifest_ctx,
1304                schema_metadata_manager.clone(),
1305                1,
1306            )
1307            .await
1308            .unwrap();
1309        // Should schedule 1 compaction.
1310        assert_eq!(1, scheduler.region_status.len());
1311        assert_eq!(1, job_scheduler.num_jobs());
1312        assert!(
1313            scheduler
1314                .region_status
1315                .get(&region_id)
1316                .unwrap()
1317                .pending_request
1318                .is_none()
1319        );
1320
1321        // Schedule another manual compaction.
1322        let (tx, _rx) = oneshot::channel();
1323        scheduler
1324            .schedule_compaction(
1325                region_id,
1326                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1327                &version_control,
1328                &env.access_layer,
1329                OptionOutputTx::new(Some(OutputTx::new(tx))),
1330                &manifest_ctx,
1331                schema_metadata_manager.clone(),
1332                1,
1333            )
1334            .await
1335            .unwrap();
1336        assert_eq!(1, scheduler.region_status.len());
1337        // Current job num should be 1 since compaction is in progress.
1338        assert_eq!(1, job_scheduler.num_jobs());
1339        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1340        assert!(status.pending_request.is_some());
1341
1342        // On compaction finished and schedule next compaction.
1343        scheduler
1344            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1345            .await;
1346        assert_eq!(1, scheduler.region_status.len());
1347        assert_eq!(2, job_scheduler.num_jobs());
1348
1349        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1350        assert!(status.pending_request.is_none());
1351    }
1352
1353    #[tokio::test]
1354    async fn test_compaction_bypass_in_staging_mode() {
1355        let env = SchedulerEnv::new().await;
1356        let (tx, _rx) = mpsc::channel(4);
1357        let mut scheduler = env.mock_compaction_scheduler(tx);
1358
1359        // Create version control and manifest context for staging mode
1360        let builder = VersionControlBuilder::new();
1361        let version_control = Arc::new(builder.build());
1362        let region_id = version_control.current().version.metadata.region_id;
1363
1364        // Create staging manifest context using the same pattern as SchedulerEnv
1365        let staging_manifest_ctx = {
1366            let manager = RegionManifestManager::new(
1367                version_control.current().version.metadata.clone(),
1368                0,
1369                RegionManifestOptions {
1370                    manifest_dir: "".to_string(),
1371                    object_store: env.access_layer.object_store().clone(),
1372                    compress_type: CompressionType::Uncompressed,
1373                    checkpoint_distance: 10,
1374                    remove_file_options: Default::default(),
1375                    manifest_cache: None,
1376                },
1377                FormatType::PrimaryKey,
1378                &Default::default(),
1379            )
1380            .await
1381            .unwrap();
1382            Arc::new(ManifestContext::new(
1383                manager,
1384                RegionRoleState::Leader(RegionLeaderState::Staging),
1385            ))
1386        };
1387
1388        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1389
1390        // Test regular compaction bypass in staging mode
1391        let (tx, rx) = oneshot::channel();
1392        scheduler
1393            .schedule_compaction(
1394                region_id,
1395                compact_request::Options::Regular(Default::default()),
1396                &version_control,
1397                &env.access_layer,
1398                OptionOutputTx::new(Some(OutputTx::new(tx))),
1399                &staging_manifest_ctx,
1400                schema_metadata_manager,
1401                1,
1402            )
1403            .await
1404            .unwrap();
1405
1406        let result = rx.await.unwrap();
1407        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1408        assert_eq!(0, scheduler.region_status.len());
1409    }
1410
1411    #[tokio::test]
1412    async fn test_concurrent_memory_competition() {
1413        let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
1414        let barrier = Arc::new(Barrier::new(3));
1415        let mut handles = vec![];
1416
1417        // Spawn 3 tasks competing for memory, each trying to acquire 2MB
1418        for _i in 0..3 {
1419            let mgr = manager.clone();
1420            let bar = barrier.clone();
1421            let handle = tokio::spawn(async move {
1422                bar.wait().await; // Synchronize start
1423                mgr.try_acquire(2 * 1024 * 1024)
1424            });
1425            handles.push(handle);
1426        }
1427
1428        let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1429            .await
1430            .into_iter()
1431            .map(|r| r.unwrap())
1432            .collect();
1433
1434        // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
1435        let succeeded = results.iter().filter(|r| r.is_some()).count();
1436        let failed = results.iter().filter(|r| r.is_none()).count();
1437
1438        assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1439        assert_eq!(failed, 2, "Expected 2 tasks to fail");
1440
1441        // Clean up
1442        drop(results);
1443        assert_eq!(manager.used_bytes(), 0);
1444    }
1445}