mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56    CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::BoxedRecordBatchStream;
62use crate::read::projection::ProjectionMapper;
63use crate::read::scan_region::{PredicateGroup, ScanInput};
64use crate::read::seq_scan::SeqScan;
65use crate::region::options::{MergeMode, RegionOptions};
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77/// Region compaction request.
78pub struct CompactionRequest {
79    pub(crate) engine_config: Arc<MitoConfig>,
80    pub(crate) current_version: CompactionVersion,
81    pub(crate) access_layer: AccessLayerRef,
82    /// Sender to send notification to the region worker.
83    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84    /// Waiters of the compaction request.
85    pub(crate) waiters: Vec<OutputTx>,
86    /// Start time of compaction task.
87    pub(crate) start_time: Instant,
88    pub(crate) cache_manager: CacheManagerRef,
89    pub(crate) manifest_ctx: ManifestContextRef,
90    pub(crate) listener: WorkerListener,
91    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92    pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96    pub(crate) fn region_id(&self) -> RegionId {
97        self.current_version.metadata.region_id
98    }
99}
100
101/// Compaction scheduler tracks and manages compaction tasks.
102pub(crate) struct CompactionScheduler {
103    scheduler: SchedulerRef,
104    /// Compacting regions.
105    region_status: HashMap<RegionId, CompactionStatus>,
106    /// Request sender of the worker that this scheduler belongs to.
107    request_sender: Sender<WorkerRequestWithTime>,
108    cache_manager: CacheManagerRef,
109    engine_config: Arc<MitoConfig>,
110    memory_manager: Arc<CompactionMemoryManager>,
111    memory_policy: OnExhaustedPolicy,
112    listener: WorkerListener,
113    /// Plugins for the compaction scheduler.
114    plugins: Plugins,
115}
116
117impl CompactionScheduler {
118    #[allow(clippy::too_many_arguments)]
119    pub(crate) fn new(
120        scheduler: SchedulerRef,
121        request_sender: Sender<WorkerRequestWithTime>,
122        cache_manager: CacheManagerRef,
123        engine_config: Arc<MitoConfig>,
124        listener: WorkerListener,
125        plugins: Plugins,
126        memory_manager: Arc<CompactionMemoryManager>,
127        memory_policy: OnExhaustedPolicy,
128    ) -> Self {
129        Self {
130            scheduler,
131            region_status: HashMap::new(),
132            request_sender,
133            cache_manager,
134            engine_config,
135            memory_manager,
136            memory_policy,
137            listener,
138            plugins,
139        }
140    }
141
142    /// Schedules a compaction for the region.
143    #[allow(clippy::too_many_arguments)]
144    pub(crate) async fn schedule_compaction(
145        &mut self,
146        region_id: RegionId,
147        compact_options: compact_request::Options,
148        version_control: &VersionControlRef,
149        access_layer: &AccessLayerRef,
150        waiter: OptionOutputTx,
151        manifest_ctx: &ManifestContextRef,
152        schema_metadata_manager: SchemaMetadataManagerRef,
153        max_parallelism: usize,
154    ) -> Result<()> {
155        // skip compaction if region is in staging state
156        let current_state = manifest_ctx.current_state();
157        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158            info!(
159                "Skipping compaction for region {} in staging mode, options: {:?}",
160                region_id, compact_options
161            );
162            waiter.send(Ok(0));
163            return Ok(());
164        }
165
166        if let Some(status) = self.region_status.get_mut(&region_id) {
167            match compact_options {
168                Options::Regular(_) => {
169                    // Region is compacting. Add the waiter to pending list.
170                    status.merge_waiter(waiter);
171                }
172                options @ Options::StrictWindow(_) => {
173                    // Incoming compaction request is manually triggered.
174                    status.set_pending_request(PendingCompaction {
175                        options,
176                        waiter,
177                        max_parallelism,
178                    });
179                    info!(
180                        "Region {} is compacting, manually compaction will be re-scheduled.",
181                        region_id
182                    );
183                }
184            }
185            return Ok(());
186        }
187
188        // The region can compact directly.
189        let mut status: CompactionStatus =
190            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191        let request = status.new_compaction_request(
192            self.request_sender.clone(),
193            waiter,
194            self.engine_config.clone(),
195            self.cache_manager.clone(),
196            manifest_ctx,
197            self.listener.clone(),
198            schema_metadata_manager,
199            max_parallelism,
200        );
201
202        let result = self
203            .schedule_compaction_request(request, compact_options)
204            .await;
205        if matches!(result, Ok(true)) {
206            // Only if the compaction request is scheduled successfully,
207            // we insert the region into the status map.
208            self.region_status.insert(region_id, status);
209        }
210
211        self.listener.on_compaction_scheduled(region_id);
212        result.map(|_| ())
213    }
214
215    // Handle pending manual compaction request for the region.
216    //
217    // Returns true if should early return, false otherwise.
218    pub(crate) async fn handle_pending_compaction_request(
219        &mut self,
220        region_id: RegionId,
221        manifest_ctx: &ManifestContextRef,
222        schema_metadata_manager: SchemaMetadataManagerRef,
223    ) -> bool {
224        let Some(status) = self.region_status.get_mut(&region_id) else {
225            return true;
226        };
227
228        // If there is a pending manual compaction request, schedule it.
229        // and defer returning the pending DDL requests to the caller.
230        let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
231            return false;
232        };
233
234        let PendingCompaction {
235            options,
236            waiter,
237            max_parallelism,
238        } = pending_request;
239
240        let request = {
241            status.new_compaction_request(
242                self.request_sender.clone(),
243                waiter,
244                self.engine_config.clone(),
245                self.cache_manager.clone(),
246                manifest_ctx,
247                self.listener.clone(),
248                schema_metadata_manager,
249                max_parallelism,
250            )
251        };
252
253        match self.schedule_compaction_request(request, options).await {
254            Ok(true) => {
255                debug!(
256                    "Successfully scheduled manual compaction for region id: {}",
257                    region_id
258                );
259                true
260            }
261            Ok(false) => {
262                // We still need to handle the pending DDL requests.
263                // So we can't return early here.
264                false
265            }
266            Err(e) => {
267                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
268                self.remove_region_on_failure(region_id, Arc::new(e));
269                true
270            }
271        }
272    }
273
274    /// Notifies the scheduler that the compaction job is finished successfully.
275    pub(crate) async fn on_compaction_finished(
276        &mut self,
277        region_id: RegionId,
278        manifest_ctx: &ManifestContextRef,
279        schema_metadata_manager: SchemaMetadataManagerRef,
280    ) -> Vec<SenderDdlRequest> {
281        // If there a pending compaction request, handle it first
282        // and defer returning the pending DDL requests to the caller.
283        if self
284            .handle_pending_compaction_request(
285                region_id,
286                manifest_ctx,
287                schema_metadata_manager.clone(),
288            )
289            .await
290        {
291            return Vec::new();
292        }
293
294        let Some(status) = self.region_status.get_mut(&region_id) else {
295            // The region status might be removed by the previous steps.
296            // So we return empty DDL requests.
297            return Vec::new();
298        };
299
300        // Notify all waiters that compaction is finished.
301        for waiter in std::mem::take(&mut status.waiters) {
302            waiter.send(Ok(0));
303        }
304
305        // If there are pending DDL requests, run them.
306        let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
307        if !pending_ddl_requests.is_empty() {
308            self.region_status.remove(&region_id);
309            // If there are pending DDL requests, we should return them to the caller.
310            // And skip try to schedule next compaction task.
311            return pending_ddl_requests;
312        }
313
314        // We should always try to compact the region until picker returns None.
315        let request = status.new_compaction_request(
316            self.request_sender.clone(),
317            OptionOutputTx::none(),
318            self.engine_config.clone(),
319            self.cache_manager.clone(),
320            manifest_ctx,
321            self.listener.clone(),
322            schema_metadata_manager,
323            MAX_PARALLEL_COMPACTION,
324        );
325
326        // Try to schedule next compaction task for this region.
327        match self
328            .schedule_compaction_request(
329                request,
330                compact_request::Options::Regular(Default::default()),
331            )
332            .await
333        {
334            Ok(true) => {
335                debug!(
336                    "Successfully scheduled next compaction for region id: {}",
337                    region_id
338                );
339            }
340            Ok(false) => {
341                // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
342                // All DDL requests and pending compaction requests have already been processed.
343                // Safe to remove the region from status tracking.
344                self.region_status.remove(&region_id);
345            }
346            Err(e) => {
347                error!(e; "Failed to schedule next compaction for region {}", region_id);
348                self.remove_region_on_failure(region_id, Arc::new(e));
349            }
350        }
351
352        Vec::new()
353    }
354
355    /// Notifies the scheduler that the compaction job is failed.
356    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
357        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
358        self.remove_region_on_failure(region_id, err);
359    }
360
361    /// Notifies the scheduler that the region is dropped.
362    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
363        self.remove_region_on_failure(
364            region_id,
365            Arc::new(RegionDroppedSnafu { region_id }.build()),
366        );
367    }
368
369    /// Notifies the scheduler that the region is closed.
370    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
371        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
372    }
373
374    /// Notifies the scheduler that the region is truncated.
375    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
376        self.remove_region_on_failure(
377            region_id,
378            Arc::new(RegionTruncatedSnafu { region_id }.build()),
379        );
380    }
381
382    /// Add ddl request to pending queue.
383    ///
384    /// # Panics
385    /// Panics if region didn't request compaction.
386    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
387        debug!(
388            "Added pending DDL request for region: {}, ddl: {:?}",
389            request.region_id, request.request
390        );
391        let status = self.region_status.get_mut(&request.region_id).unwrap();
392        status.pending_ddl_requests.push(request);
393    }
394
395    #[cfg(test)]
396    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
397        let has_pending = self
398            .region_status
399            .get(&region_id)
400            .map(|status| !status.pending_ddl_requests.is_empty())
401            .unwrap_or(false);
402        debug!(
403            "Checked pending DDL requests for region: {}, has_pending: {}",
404            region_id, has_pending
405        );
406        has_pending
407    }
408
409    /// Returns true if the region is compacting.
410    pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
411        self.region_status.contains_key(&region_id)
412    }
413
414    /// Schedules a compaction request.
415    ///
416    /// Returns true if the compaction request is scheduled successfully.
417    /// Returns false if no compaction task can be scheduled for this region.
418    async fn schedule_compaction_request(
419        &mut self,
420        request: CompactionRequest,
421        options: compact_request::Options,
422    ) -> Result<bool> {
423        let region_id = request.region_id();
424        let (dynamic_compaction_opts, ttl) = find_dynamic_options(
425            region_id.table_id(),
426            &request.current_version.options,
427            &request.schema_metadata_manager,
428        )
429        .await
430        .unwrap_or_else(|e| {
431            warn!(e; "Failed to find dynamic options for region: {}", region_id);
432            (
433                request.current_version.options.compaction.clone(),
434                request.current_version.options.ttl.unwrap_or_default(),
435            )
436        });
437
438        let picker = new_picker(
439            &options,
440            &dynamic_compaction_opts,
441            request.current_version.options.append_mode,
442            Some(self.engine_config.max_background_compactions),
443        );
444        let region_id = request.region_id();
445        let CompactionRequest {
446            engine_config,
447            current_version,
448            access_layer,
449            request_sender,
450            waiters,
451            start_time,
452            cache_manager,
453            manifest_ctx,
454            listener,
455            schema_metadata_manager: _,
456            max_parallelism,
457        } = request;
458
459        debug!(
460            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
461            picker, region_id, ttl
462        );
463
464        let compaction_region = CompactionRegion {
465            region_id,
466            current_version: current_version.clone(),
467            region_options: RegionOptions {
468                compaction: dynamic_compaction_opts.clone(),
469                ..current_version.options.clone()
470            },
471            engine_config: engine_config.clone(),
472            region_metadata: current_version.metadata.clone(),
473            cache_manager: cache_manager.clone(),
474            access_layer: access_layer.clone(),
475            manifest_ctx: manifest_ctx.clone(),
476            file_purger: None,
477            ttl: Some(ttl),
478            max_parallelism,
479        };
480
481        let picker_output = {
482            let _pick_timer = COMPACTION_STAGE_ELAPSED
483                .with_label_values(&["pick"])
484                .start_timer();
485            picker.pick(&compaction_region)
486        };
487
488        let picker_output = if let Some(picker_output) = picker_output {
489            picker_output
490        } else {
491            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
492            for waiter in waiters {
493                waiter.send(Ok(0));
494            }
495            return Ok(false);
496        };
497
498        // If specified to run compaction remotely, we schedule the compaction job remotely.
499        // It will fall back to local compaction if there is no remote job scheduler.
500        let waiters = if dynamic_compaction_opts.remote_compaction() {
501            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
502                let remote_compaction_job = CompactionJob {
503                    compaction_region: compaction_region.clone(),
504                    picker_output: picker_output.clone(),
505                    start_time,
506                    waiters,
507                    ttl,
508                };
509
510                let result = remote_job_scheduler
511                    .schedule(
512                        RemoteJob::CompactionJob(remote_compaction_job),
513                        Box::new(DefaultNotifier {
514                            request_sender: request_sender.clone(),
515                        }),
516                    )
517                    .await;
518
519                match result {
520                    Ok(job_id) => {
521                        info!(
522                            "Scheduled remote compaction job {} for region {}",
523                            job_id, region_id
524                        );
525                        INFLIGHT_COMPACTION_COUNT.inc();
526                        return Ok(true);
527                    }
528                    Err(e) => {
529                        if !dynamic_compaction_opts.fallback_to_local() {
530                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
531                            return RemoteCompactionSnafu {
532                                region_id,
533                                job_id: None,
534                                reason: e.reason,
535                            }
536                            .fail();
537                        }
538
539                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
540
541                        // Return the waiters back to the caller for local compaction.
542                        e.waiters
543                    }
544                }
545            } else {
546                debug!(
547                    "Remote compaction is not enabled, fallback to local compaction for region {}",
548                    region_id
549                );
550                waiters
551            }
552        } else {
553            waiters
554        };
555
556        // Create a local compaction task.
557        let estimated_bytes = estimate_compaction_bytes(&picker_output);
558        let local_compaction_task = Box::new(CompactionTaskImpl {
559            request_sender,
560            waiters,
561            start_time,
562            listener,
563            picker_output,
564            compaction_region,
565            compactor: Arc::new(DefaultCompactor {}),
566            memory_manager: self.memory_manager.clone(),
567            memory_policy: self.memory_policy,
568            estimated_memory_bytes: estimated_bytes,
569        });
570
571        self.submit_compaction_task(local_compaction_task, region_id)
572            .map(|_| true)
573    }
574
575    fn submit_compaction_task(
576        &mut self,
577        mut task: Box<CompactionTaskImpl>,
578        region_id: RegionId,
579    ) -> Result<()> {
580        self.scheduler
581            .schedule(Box::pin(async move {
582                INFLIGHT_COMPACTION_COUNT.inc();
583                task.run().await;
584                INFLIGHT_COMPACTION_COUNT.dec();
585            }))
586            .inspect_err(
587                |e| error!(e; "Failed to submit compaction request for region {}", region_id),
588            )
589    }
590
591    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
592        // Remove this region.
593        let Some(status) = self.region_status.remove(&region_id) else {
594            return;
595        };
596
597        // Notifies all pending tasks.
598        status.on_failure(err);
599    }
600}
601
602impl Drop for CompactionScheduler {
603    fn drop(&mut self) {
604        for (region_id, status) in self.region_status.drain() {
605            // We are shutting down so notify all pending tasks.
606            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
607        }
608    }
609}
610
611/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
612async fn find_dynamic_options(
613    table_id: TableId,
614    region_options: &crate::region::options::RegionOptions,
615    schema_metadata_manager: &SchemaMetadataManagerRef,
616) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
617    if region_options.compaction_override && region_options.ttl.is_some() {
618        debug!(
619            "Use region options directly for table {}: compaction={:?}, ttl={:?}",
620            table_id, region_options.compaction, region_options.ttl
621        );
622        return Ok((
623            region_options.compaction.clone(),
624            region_options.ttl.unwrap(),
625        ));
626    }
627
628    let db_options = tokio::time::timeout(
629        crate::config::FETCH_OPTION_TIMEOUT,
630        schema_metadata_manager.get_schema_options_by_table_id(table_id),
631    )
632    .await
633    .context(TimeoutSnafu)?
634    .context(GetSchemaMetadataSnafu)?;
635
636    let ttl = if region_options.ttl.is_some() {
637        debug!(
638            "Use region TTL directly for table {}: ttl={:?}",
639            table_id, region_options.ttl
640        );
641        region_options.ttl.unwrap()
642    } else {
643        db_options
644            .as_ref()
645            .and_then(|options| options.ttl)
646            .unwrap_or_default()
647            .into()
648    };
649
650    let compaction = if !region_options.compaction_override {
651        if let Some(schema_opts) = db_options {
652            let map: HashMap<String, String> = schema_opts
653                .extra_options
654                .iter()
655                .filter_map(|(k, v)| {
656                    if k.starts_with("compaction.") {
657                        Some((k.clone(), v.clone()))
658                    } else {
659                        None
660                    }
661                })
662                .collect();
663            if map.is_empty() {
664                region_options.compaction.clone()
665            } else {
666                crate::region::options::RegionOptions::try_from(&map)
667                    .map(|o| o.compaction)
668                    .unwrap_or_else(|e| {
669                        error!(e; "Failed to create RegionOptions from map");
670                        region_options.compaction.clone()
671                    })
672            }
673        } else {
674            debug!(
675                "DB options is None for table {}, use region compaction: compaction={:?}",
676                table_id, region_options.compaction
677            );
678            region_options.compaction.clone()
679        }
680    } else {
681        debug!(
682            "No schema options for table {}, use region compaction: compaction={:?}",
683            table_id, region_options.compaction
684        );
685        region_options.compaction.clone()
686    };
687
688    debug!(
689        "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
690        table_id, compaction, ttl
691    );
692    Ok((compaction, ttl))
693}
694
695/// Status of running and pending region compaction tasks.
696struct CompactionStatus {
697    /// Id of the region.
698    region_id: RegionId,
699    /// Version control of the region.
700    version_control: VersionControlRef,
701    /// Access layer of the region.
702    access_layer: AccessLayerRef,
703    /// Pending waiters for compaction.
704    waiters: Vec<OutputTx>,
705    /// Pending compactions that are supposed to run as soon as current compaction task finished.
706    pending_request: Option<PendingCompaction>,
707    /// Pending DDL requests that should run when compaction is done.
708    pending_ddl_requests: Vec<SenderDdlRequest>,
709}
710
711impl CompactionStatus {
712    /// Creates a new [CompactionStatus]
713    fn new(
714        region_id: RegionId,
715        version_control: VersionControlRef,
716        access_layer: AccessLayerRef,
717    ) -> CompactionStatus {
718        CompactionStatus {
719            region_id,
720            version_control,
721            access_layer,
722            waiters: Vec::new(),
723            pending_request: None,
724            pending_ddl_requests: Vec::new(),
725        }
726    }
727
728    /// Merge the waiter to the pending compaction.
729    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
730        if let Some(waiter) = waiter.take_inner() {
731            self.waiters.push(waiter);
732        }
733    }
734
735    /// Set pending compaction request or replace current value if already exist.
736    fn set_pending_request(&mut self, pending: PendingCompaction) {
737        if let Some(prev) = self.pending_request.replace(pending) {
738            debug!(
739                "Replace pending compaction options with new request {:?} for region: {}",
740                prev.options, self.region_id
741            );
742            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
743        }
744    }
745
746    fn on_failure(mut self, err: Arc<Error>) {
747        for waiter in self.waiters.drain(..) {
748            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
749                region_id: self.region_id,
750            }));
751        }
752
753        if let Some(pending_compaction) = self.pending_request {
754            pending_compaction
755                .waiter
756                .send(Err(err.clone()).context(CompactRegionSnafu {
757                    region_id: self.region_id,
758                }));
759        }
760
761        for pending_ddl in self.pending_ddl_requests {
762            pending_ddl
763                .sender
764                .send(Err(err.clone()).context(CompactRegionSnafu {
765                    region_id: self.region_id,
766                }));
767        }
768    }
769
770    /// Creates a new compaction request for compaction picker.
771    ///
772    /// It consumes all pending compaction waiters.
773    #[allow(clippy::too_many_arguments)]
774    fn new_compaction_request(
775        &mut self,
776        request_sender: Sender<WorkerRequestWithTime>,
777        mut waiter: OptionOutputTx,
778        engine_config: Arc<MitoConfig>,
779        cache_manager: CacheManagerRef,
780        manifest_ctx: &ManifestContextRef,
781        listener: WorkerListener,
782        schema_metadata_manager: SchemaMetadataManagerRef,
783        max_parallelism: usize,
784    ) -> CompactionRequest {
785        let current_version = CompactionVersion::from(self.version_control.current().version);
786        let start_time = Instant::now();
787        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
788        waiters.extend(std::mem::take(&mut self.waiters));
789
790        if let Some(waiter) = waiter.take_inner() {
791            waiters.push(waiter);
792        }
793
794        CompactionRequest {
795            engine_config,
796            current_version,
797            access_layer: self.access_layer.clone(),
798            request_sender: request_sender.clone(),
799            waiters,
800            start_time,
801            cache_manager,
802            manifest_ctx: manifest_ctx.clone(),
803            listener,
804            schema_metadata_manager,
805            max_parallelism,
806        }
807    }
808}
809
810#[derive(Debug, Clone)]
811pub struct CompactionOutput {
812    /// Compaction output file level.
813    pub output_level: Level,
814    /// Compaction input files.
815    pub inputs: Vec<FileHandle>,
816    /// Whether to remove deletion markers.
817    pub filter_deleted: bool,
818    /// Compaction output time range. Only windowed compaction specifies output time range.
819    pub output_time_range: Option<TimestampRange>,
820}
821
822/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
823#[derive(Debug, Clone, Serialize, Deserialize)]
824pub struct SerializedCompactionOutput {
825    output_level: Level,
826    inputs: Vec<FileMeta>,
827    filter_deleted: bool,
828    output_time_range: Option<TimestampRange>,
829}
830
831/// Builders to create [BoxedRecordBatchStream] for compaction.
832struct CompactionSstReaderBuilder<'a> {
833    metadata: RegionMetadataRef,
834    sst_layer: AccessLayerRef,
835    cache: CacheManagerRef,
836    inputs: &'a [FileHandle],
837    append_mode: bool,
838    filter_deleted: bool,
839    time_range: Option<TimestampRange>,
840    merge_mode: MergeMode,
841}
842
843impl CompactionSstReaderBuilder<'_> {
844    /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
845    async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
846        let scan_input = self.build_scan_input()?.with_compaction(true);
847
848        SeqScan::new(scan_input)
849            .build_flat_reader_for_compaction()
850            .await
851    }
852
853    fn build_scan_input(self) -> Result<ScanInput> {
854        let mapper = ProjectionMapper::all(&self.metadata, true)?;
855        let mut scan_input = ScanInput::new(self.sst_layer, mapper)
856            .with_files(self.inputs.to_vec())
857            .with_append_mode(self.append_mode)
858            // We use special cache strategy for compaction.
859            .with_cache(CacheStrategy::Compaction(self.cache))
860            .with_filter_deleted(self.filter_deleted)
861            // We ignore file not found error during compaction.
862            .with_ignore_file_not_found(true)
863            .with_merge_mode(self.merge_mode)
864            .with_flat_format(true);
865
866        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
867        // by converting time ranges into predicate.
868        if let Some(time_range) = self.time_range {
869            scan_input =
870                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
871        }
872
873        Ok(scan_input)
874    }
875}
876
877/// Converts time range to predicates so that rows outside the range will be filtered.
878fn time_range_to_predicate(
879    range: TimestampRange,
880    metadata: &RegionMetadataRef,
881) -> Result<PredicateGroup> {
882    let ts_col = metadata.time_index_column();
883
884    // safety: time index column's type must be a valid timestamp type.
885    let ts_col_unit = ts_col
886        .column_schema
887        .data_type
888        .as_timestamp()
889        .unwrap()
890        .unit();
891
892    let exprs = match (range.start(), range.end()) {
893        (Some(start), Some(end)) => {
894            vec![
895                datafusion_expr::col(ts_col.column_schema.name.clone())
896                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
897                datafusion_expr::col(ts_col.column_schema.name.clone())
898                    .lt(ts_to_lit(*end, ts_col_unit)?),
899            ]
900        }
901        (Some(start), None) => {
902            vec![
903                datafusion_expr::col(ts_col.column_schema.name.clone())
904                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
905            ]
906        }
907
908        (None, Some(end)) => {
909            vec![
910                datafusion_expr::col(ts_col.column_schema.name.clone())
911                    .lt(ts_to_lit(*end, ts_col_unit)?),
912            ]
913        }
914        (None, None) => {
915            return Ok(PredicateGroup::default());
916        }
917    };
918
919    let predicate = PredicateGroup::new(metadata, &exprs)?;
920    Ok(predicate)
921}
922
923fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
924    let ts = ts
925        .convert_to(ts_col_unit)
926        .context(TimeRangePredicateOverflowSnafu {
927            timestamp: ts,
928            unit: ts_col_unit,
929        })?;
930    let val = ts.value();
931    let scalar_value = match ts_col_unit {
932        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
933        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
934        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
935        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
936    };
937    Ok(datafusion_expr::lit(scalar_value))
938}
939
940/// Finds all expired SSTs across levels.
941fn get_expired_ssts(
942    levels: &[LevelMeta],
943    ttl: Option<TimeToLive>,
944    now: Timestamp,
945) -> Vec<FileHandle> {
946    let Some(ttl) = ttl else {
947        return vec![];
948    };
949
950    levels
951        .iter()
952        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
953        .collect()
954}
955
956/// Estimates compaction memory as the sum of all input files' maximum row-group
957/// uncompressed sizes.
958fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
959    picker_output
960        .outputs
961        .iter()
962        .flat_map(|output| output.inputs.iter())
963        .map(|file: &FileHandle| {
964            let meta = file.meta_ref();
965            meta.max_row_group_uncompressed_size
966        })
967        .sum()
968}
969
970/// Pending compaction request that is supposed to run after current task is finished,
971/// typically used for manual compactions.
972struct PendingCompaction {
973    /// Compaction options. Currently, it can only be [StrictWindow].
974    pub(crate) options: compact_request::Options,
975    /// Waiters of pending requests.
976    pub(crate) waiter: OptionOutputTx,
977    /// Max parallelism for pending compaction.
978    pub(crate) max_parallelism: usize,
979}
980
981#[cfg(test)]
982mod tests {
983    use std::assert_matches::assert_matches;
984    use std::time::Duration;
985
986    use api::v1::region::StrictWindow;
987    use common_datasource::compression::CompressionType;
988    use common_meta::key::schema_name::SchemaNameValue;
989    use common_time::DatabaseTimeToLive;
990    use tokio::sync::{Barrier, oneshot};
991
992    use super::*;
993    use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
994    use crate::error::InvalidSchedulerStateSnafu;
995    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
996    use crate::region::ManifestContext;
997    use crate::schedule::scheduler::{Job, Scheduler};
998    use crate::sst::FormatType;
999    use crate::test_util::mock_schema_metadata_manager;
1000    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1001    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1002
1003    struct FailingScheduler;
1004
1005    #[async_trait::async_trait]
1006    impl Scheduler for FailingScheduler {
1007        fn schedule(&self, _job: Job) -> Result<()> {
1008            InvalidSchedulerStateSnafu.fail()
1009        }
1010
1011        async fn stop(&self, _await_termination: bool) -> Result<()> {
1012            Ok(())
1013        }
1014    }
1015
1016    #[tokio::test]
1017    async fn test_find_compaction_options_db_level() {
1018        let env = SchedulerEnv::new().await;
1019        let builder = VersionControlBuilder::new();
1020        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1021        let region_id = builder.region_id();
1022        let table_id = region_id.table_id();
1023        // Register table without ttl but with db-level compaction options
1024        let mut schema_value = SchemaNameValue {
1025            ttl: Some(DatabaseTimeToLive::default()),
1026            ..Default::default()
1027        };
1028        schema_value
1029            .extra_options
1030            .insert("compaction.type".to_string(), "twcs".to_string());
1031        schema_value
1032            .extra_options
1033            .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1034        schema_metadata_manager
1035            .register_region_table_info(
1036                table_id,
1037                "t",
1038                "c",
1039                "s",
1040                Some(schema_value),
1041                kv_backend.clone(),
1042            )
1043            .await;
1044
1045        let version_control = Arc::new(builder.build());
1046        let region_opts = version_control.current().version.options.clone();
1047        let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
1048            .await
1049            .unwrap();
1050        match opts {
1051            crate::region::options::CompactionOptions::Twcs(t) => {
1052                assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1053            }
1054        }
1055        let manifest_ctx = env
1056            .mock_manifest_context(version_control.current().version.metadata.clone())
1057            .await;
1058        let (tx, _rx) = mpsc::channel(4);
1059        let mut scheduler = env.mock_compaction_scheduler(tx);
1060        let (otx, _orx) = oneshot::channel();
1061        let request = scheduler
1062            .region_status
1063            .entry(region_id)
1064            .or_insert_with(|| {
1065                crate::compaction::CompactionStatus::new(
1066                    region_id,
1067                    version_control.clone(),
1068                    env.access_layer.clone(),
1069                )
1070            })
1071            .new_compaction_request(
1072                scheduler.request_sender.clone(),
1073                OptionOutputTx::new(Some(OutputTx::new(otx))),
1074                scheduler.engine_config.clone(),
1075                scheduler.cache_manager.clone(),
1076                &manifest_ctx,
1077                scheduler.listener.clone(),
1078                schema_metadata_manager.clone(),
1079                1,
1080            );
1081        scheduler
1082            .schedule_compaction_request(
1083                request,
1084                compact_request::Options::Regular(Default::default()),
1085            )
1086            .await
1087            .unwrap();
1088    }
1089
1090    #[tokio::test]
1091    async fn test_find_compaction_options_priority() {
1092        fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1093            let mut schema_value = SchemaNameValue {
1094                ttl: Some(DatabaseTimeToLive::default()),
1095                ..Default::default()
1096            };
1097            schema_value
1098                .extra_options
1099                .insert("compaction.type".to_string(), "twcs".to_string());
1100            schema_value.extra_options.insert(
1101                "compaction.twcs.time_window".to_string(),
1102                time_window.to_string(),
1103            );
1104            schema_value
1105        }
1106
1107        let cases = [
1108            (
1109                "db options set and table override set",
1110                Some(schema_value_with_twcs("2h")),
1111                true,
1112                Some(Duration::from_secs(5 * 3600)),
1113                Some(5 * 3600),
1114            ),
1115            (
1116                "db options set and table override not set",
1117                Some(schema_value_with_twcs("2h")),
1118                false,
1119                None,
1120                Some(2 * 3600),
1121            ),
1122            (
1123                "db options not set and table override set",
1124                None,
1125                true,
1126                Some(Duration::from_secs(4 * 3600)),
1127                Some(4 * 3600),
1128            ),
1129            (
1130                "db options not set and table override not set",
1131                None,
1132                false,
1133                None,
1134                None,
1135            ),
1136        ];
1137
1138        for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1139            let builder = VersionControlBuilder::new();
1140            let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1141            let table_id = builder.region_id().table_id();
1142            schema_metadata_manager
1143                .register_region_table_info(
1144                    table_id,
1145                    "t",
1146                    "c",
1147                    "s",
1148                    schema_value,
1149                    kv_backend.clone(),
1150                )
1151                .await;
1152
1153            let version_control = Arc::new(builder.build());
1154            let mut region_opts = version_control.current().version.options.clone();
1155            region_opts.compaction_override = override_set;
1156            if let Some(window) = table_window {
1157                let crate::region::options::CompactionOptions::Twcs(twcs) =
1158                    &mut region_opts.compaction;
1159                twcs.time_window = Some(window);
1160            }
1161
1162            let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
1163                .await
1164                .unwrap();
1165            match opts {
1166                crate::region::options::CompactionOptions::Twcs(t) => {
1167                    assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1168                }
1169            }
1170        }
1171    }
1172
1173    #[tokio::test]
1174    async fn test_schedule_empty() {
1175        let env = SchedulerEnv::new().await;
1176        let (tx, _rx) = mpsc::channel(4);
1177        let mut scheduler = env.mock_compaction_scheduler(tx);
1178        let mut builder = VersionControlBuilder::new();
1179        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1180        schema_metadata_manager
1181            .register_region_table_info(
1182                builder.region_id().table_id(),
1183                "test_table",
1184                "test_catalog",
1185                "test_schema",
1186                None,
1187                kv_backend,
1188            )
1189            .await;
1190        // Nothing to compact.
1191        let version_control = Arc::new(builder.build());
1192        let (output_tx, output_rx) = oneshot::channel();
1193        let waiter = OptionOutputTx::from(output_tx);
1194        let manifest_ctx = env
1195            .mock_manifest_context(version_control.current().version.metadata.clone())
1196            .await;
1197        scheduler
1198            .schedule_compaction(
1199                builder.region_id(),
1200                compact_request::Options::Regular(Default::default()),
1201                &version_control,
1202                &env.access_layer,
1203                waiter,
1204                &manifest_ctx,
1205                schema_metadata_manager.clone(),
1206                1,
1207            )
1208            .await
1209            .unwrap();
1210        let output = output_rx.await.unwrap().unwrap();
1211        assert_eq!(output, 0);
1212        assert!(scheduler.region_status.is_empty());
1213
1214        // Only one file, picker won't compact it.
1215        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1216        let (output_tx, output_rx) = oneshot::channel();
1217        let waiter = OptionOutputTx::from(output_tx);
1218        scheduler
1219            .schedule_compaction(
1220                builder.region_id(),
1221                compact_request::Options::Regular(Default::default()),
1222                &version_control,
1223                &env.access_layer,
1224                waiter,
1225                &manifest_ctx,
1226                schema_metadata_manager,
1227                1,
1228            )
1229            .await
1230            .unwrap();
1231        let output = output_rx.await.unwrap().unwrap();
1232        assert_eq!(output, 0);
1233        assert!(scheduler.region_status.is_empty());
1234    }
1235
1236    #[tokio::test]
1237    async fn test_schedule_on_finished() {
1238        common_telemetry::init_default_ut_logging();
1239        let job_scheduler = Arc::new(VecScheduler::default());
1240        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1241        let (tx, _rx) = mpsc::channel(4);
1242        let mut scheduler = env.mock_compaction_scheduler(tx);
1243        let mut builder = VersionControlBuilder::new();
1244        let purger = builder.file_purger();
1245        let region_id = builder.region_id();
1246
1247        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1248        schema_metadata_manager
1249            .register_region_table_info(
1250                builder.region_id().table_id(),
1251                "test_table",
1252                "test_catalog",
1253                "test_schema",
1254                None,
1255                kv_backend,
1256            )
1257            .await;
1258
1259        // 5 files to compact.
1260        let end = 1000 * 1000;
1261        let version_control = Arc::new(
1262            builder
1263                .push_l0_file(0, end)
1264                .push_l0_file(10, end)
1265                .push_l0_file(50, end)
1266                .push_l0_file(80, end)
1267                .push_l0_file(90, end)
1268                .build(),
1269        );
1270        let manifest_ctx = env
1271            .mock_manifest_context(version_control.current().version.metadata.clone())
1272            .await;
1273        scheduler
1274            .schedule_compaction(
1275                region_id,
1276                compact_request::Options::Regular(Default::default()),
1277                &version_control,
1278                &env.access_layer,
1279                OptionOutputTx::none(),
1280                &manifest_ctx,
1281                schema_metadata_manager.clone(),
1282                1,
1283            )
1284            .await
1285            .unwrap();
1286        // Should schedule 1 compaction.
1287        assert_eq!(1, scheduler.region_status.len());
1288        assert_eq!(1, job_scheduler.num_jobs());
1289        let data = version_control.current();
1290        let file_metas: Vec<_> = data.version.ssts.levels()[0]
1291            .files
1292            .values()
1293            .map(|file| file.meta_ref().clone())
1294            .collect();
1295
1296        // 5 files for next compaction and removes old files.
1297        apply_edit(
1298            &version_control,
1299            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1300            &file_metas,
1301            purger.clone(),
1302        );
1303        // The task is pending.
1304        let (tx, _rx) = oneshot::channel();
1305        scheduler
1306            .schedule_compaction(
1307                region_id,
1308                compact_request::Options::Regular(Default::default()),
1309                &version_control,
1310                &env.access_layer,
1311                OptionOutputTx::new(Some(OutputTx::new(tx))),
1312                &manifest_ctx,
1313                schema_metadata_manager.clone(),
1314                1,
1315            )
1316            .await
1317            .unwrap();
1318        assert_eq!(1, scheduler.region_status.len());
1319        assert_eq!(1, job_scheduler.num_jobs());
1320        assert!(
1321            !scheduler
1322                .region_status
1323                .get(&builder.region_id())
1324                .unwrap()
1325                .waiters
1326                .is_empty()
1327        );
1328
1329        // On compaction finished and schedule next compaction.
1330        scheduler
1331            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1332            .await;
1333        assert_eq!(1, scheduler.region_status.len());
1334        assert_eq!(2, job_scheduler.num_jobs());
1335
1336        // 5 files for next compaction.
1337        apply_edit(
1338            &version_control,
1339            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1340            &[],
1341            purger.clone(),
1342        );
1343        let (tx, _rx) = oneshot::channel();
1344        // The task is pending.
1345        scheduler
1346            .schedule_compaction(
1347                region_id,
1348                compact_request::Options::Regular(Default::default()),
1349                &version_control,
1350                &env.access_layer,
1351                OptionOutputTx::new(Some(OutputTx::new(tx))),
1352                &manifest_ctx,
1353                schema_metadata_manager,
1354                1,
1355            )
1356            .await
1357            .unwrap();
1358        assert_eq!(2, job_scheduler.num_jobs());
1359        assert!(
1360            !scheduler
1361                .region_status
1362                .get(&builder.region_id())
1363                .unwrap()
1364                .waiters
1365                .is_empty()
1366        );
1367    }
1368
1369    #[tokio::test]
1370    async fn test_manual_compaction_when_compaction_in_progress() {
1371        common_telemetry::init_default_ut_logging();
1372        let job_scheduler = Arc::new(VecScheduler::default());
1373        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1374        let (tx, _rx) = mpsc::channel(4);
1375        let mut scheduler = env.mock_compaction_scheduler(tx);
1376        let mut builder = VersionControlBuilder::new();
1377        let purger = builder.file_purger();
1378        let region_id = builder.region_id();
1379
1380        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1381        schema_metadata_manager
1382            .register_region_table_info(
1383                builder.region_id().table_id(),
1384                "test_table",
1385                "test_catalog",
1386                "test_schema",
1387                None,
1388                kv_backend,
1389            )
1390            .await;
1391
1392        // 5 files to compact.
1393        let end = 1000 * 1000;
1394        let version_control = Arc::new(
1395            builder
1396                .push_l0_file(0, end)
1397                .push_l0_file(10, end)
1398                .push_l0_file(50, end)
1399                .push_l0_file(80, end)
1400                .push_l0_file(90, end)
1401                .build(),
1402        );
1403        let manifest_ctx = env
1404            .mock_manifest_context(version_control.current().version.metadata.clone())
1405            .await;
1406
1407        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1408            .files
1409            .values()
1410            .map(|file| file.meta_ref().clone())
1411            .collect();
1412
1413        // 5 files for next compaction and removes old files.
1414        apply_edit(
1415            &version_control,
1416            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1417            &file_metas,
1418            purger.clone(),
1419        );
1420
1421        scheduler
1422            .schedule_compaction(
1423                region_id,
1424                compact_request::Options::Regular(Default::default()),
1425                &version_control,
1426                &env.access_layer,
1427                OptionOutputTx::none(),
1428                &manifest_ctx,
1429                schema_metadata_manager.clone(),
1430                1,
1431            )
1432            .await
1433            .unwrap();
1434        // Should schedule 1 compaction.
1435        assert_eq!(1, scheduler.region_status.len());
1436        assert_eq!(1, job_scheduler.num_jobs());
1437        assert!(
1438            scheduler
1439                .region_status
1440                .get(&region_id)
1441                .unwrap()
1442                .pending_request
1443                .is_none()
1444        );
1445
1446        // Schedule another manual compaction.
1447        let (tx, _rx) = oneshot::channel();
1448        scheduler
1449            .schedule_compaction(
1450                region_id,
1451                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1452                &version_control,
1453                &env.access_layer,
1454                OptionOutputTx::new(Some(OutputTx::new(tx))),
1455                &manifest_ctx,
1456                schema_metadata_manager.clone(),
1457                1,
1458            )
1459            .await
1460            .unwrap();
1461        assert_eq!(1, scheduler.region_status.len());
1462        // Current job num should be 1 since compaction is in progress.
1463        assert_eq!(1, job_scheduler.num_jobs());
1464        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1465        assert!(status.pending_request.is_some());
1466
1467        // On compaction finished and schedule next compaction.
1468        scheduler
1469            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1470            .await;
1471        assert_eq!(1, scheduler.region_status.len());
1472        assert_eq!(2, job_scheduler.num_jobs());
1473
1474        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1475        assert!(status.pending_request.is_none());
1476    }
1477
1478    #[tokio::test]
1479    async fn test_compaction_bypass_in_staging_mode() {
1480        let env = SchedulerEnv::new().await;
1481        let (tx, _rx) = mpsc::channel(4);
1482        let mut scheduler = env.mock_compaction_scheduler(tx);
1483
1484        // Create version control and manifest context for staging mode
1485        let builder = VersionControlBuilder::new();
1486        let version_control = Arc::new(builder.build());
1487        let region_id = version_control.current().version.metadata.region_id;
1488
1489        // Create staging manifest context using the same pattern as SchedulerEnv
1490        let staging_manifest_ctx = {
1491            let manager = RegionManifestManager::new(
1492                version_control.current().version.metadata.clone(),
1493                0,
1494                RegionManifestOptions {
1495                    manifest_dir: "".to_string(),
1496                    object_store: env.access_layer.object_store().clone(),
1497                    compress_type: CompressionType::Uncompressed,
1498                    checkpoint_distance: 10,
1499                    remove_file_options: Default::default(),
1500                    manifest_cache: None,
1501                },
1502                FormatType::PrimaryKey,
1503                &Default::default(),
1504            )
1505            .await
1506            .unwrap();
1507            Arc::new(ManifestContext::new(
1508                manager,
1509                RegionRoleState::Leader(RegionLeaderState::Staging),
1510            ))
1511        };
1512
1513        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1514
1515        // Test regular compaction bypass in staging mode
1516        let (tx, rx) = oneshot::channel();
1517        scheduler
1518            .schedule_compaction(
1519                region_id,
1520                compact_request::Options::Regular(Default::default()),
1521                &version_control,
1522                &env.access_layer,
1523                OptionOutputTx::new(Some(OutputTx::new(tx))),
1524                &staging_manifest_ctx,
1525                schema_metadata_manager,
1526                1,
1527            )
1528            .await
1529            .unwrap();
1530
1531        let result = rx.await.unwrap();
1532        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1533        assert_eq!(0, scheduler.region_status.len());
1534    }
1535
1536    #[tokio::test]
1537    async fn test_add_ddl_request_to_pending() {
1538        let env = SchedulerEnv::new().await;
1539        let (tx, _rx) = mpsc::channel(4);
1540        let mut scheduler = env.mock_compaction_scheduler(tx);
1541        let builder = VersionControlBuilder::new();
1542        let version_control = Arc::new(builder.build());
1543        let region_id = builder.region_id();
1544
1545        scheduler.region_status.insert(
1546            region_id,
1547            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1548        );
1549
1550        let (output_tx, _output_rx) = oneshot::channel();
1551        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1552            region_id,
1553            sender: OptionOutputTx::from(output_tx),
1554            request: crate::request::DdlRequest::EnterStaging(
1555                store_api::region_request::EnterStagingRequest {
1556                    partition_directive:
1557                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1558                },
1559            ),
1560        });
1561
1562        assert!(scheduler.has_pending_ddls(region_id));
1563    }
1564
1565    #[tokio::test]
1566    async fn test_pending_ddl_request_failed_on_compaction_failed() {
1567        let env = SchedulerEnv::new().await;
1568        let (tx, _rx) = mpsc::channel(4);
1569        let mut scheduler = env.mock_compaction_scheduler(tx);
1570        let builder = VersionControlBuilder::new();
1571        let version_control = Arc::new(builder.build());
1572        let region_id = builder.region_id();
1573
1574        scheduler.region_status.insert(
1575            region_id,
1576            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1577        );
1578
1579        let (output_tx, output_rx) = oneshot::channel();
1580        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1581            region_id,
1582            sender: OptionOutputTx::from(output_tx),
1583            request: crate::request::DdlRequest::EnterStaging(
1584                store_api::region_request::EnterStagingRequest {
1585                    partition_directive:
1586                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1587                },
1588            ),
1589        });
1590
1591        assert!(scheduler.has_pending_ddls(region_id));
1592        scheduler
1593            .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1594
1595        assert!(!scheduler.has_pending_ddls(region_id));
1596        let result = output_rx.await.unwrap();
1597        assert_matches!(result, Err(_));
1598    }
1599
1600    #[tokio::test]
1601    async fn test_pending_ddl_request_failed_on_region_closed() {
1602        let env = SchedulerEnv::new().await;
1603        let (tx, _rx) = mpsc::channel(4);
1604        let mut scheduler = env.mock_compaction_scheduler(tx);
1605        let builder = VersionControlBuilder::new();
1606        let version_control = Arc::new(builder.build());
1607        let region_id = builder.region_id();
1608
1609        scheduler.region_status.insert(
1610            region_id,
1611            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1612        );
1613
1614        let (output_tx, output_rx) = oneshot::channel();
1615        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1616            region_id,
1617            sender: OptionOutputTx::from(output_tx),
1618            request: crate::request::DdlRequest::EnterStaging(
1619                store_api::region_request::EnterStagingRequest {
1620                    partition_directive:
1621                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1622                },
1623            ),
1624        });
1625
1626        assert!(scheduler.has_pending_ddls(region_id));
1627        scheduler.on_region_closed(region_id);
1628
1629        assert!(!scheduler.has_pending_ddls(region_id));
1630        let result = output_rx.await.unwrap();
1631        assert_matches!(result, Err(_));
1632    }
1633
1634    #[tokio::test]
1635    async fn test_pending_ddl_request_failed_on_region_dropped() {
1636        let env = SchedulerEnv::new().await;
1637        let (tx, _rx) = mpsc::channel(4);
1638        let mut scheduler = env.mock_compaction_scheduler(tx);
1639        let builder = VersionControlBuilder::new();
1640        let version_control = Arc::new(builder.build());
1641        let region_id = builder.region_id();
1642
1643        scheduler.region_status.insert(
1644            region_id,
1645            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1646        );
1647
1648        let (output_tx, output_rx) = oneshot::channel();
1649        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1650            region_id,
1651            sender: OptionOutputTx::from(output_tx),
1652            request: crate::request::DdlRequest::EnterStaging(
1653                store_api::region_request::EnterStagingRequest {
1654                    partition_directive:
1655                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1656                },
1657            ),
1658        });
1659
1660        assert!(scheduler.has_pending_ddls(region_id));
1661        scheduler.on_region_dropped(region_id);
1662
1663        assert!(!scheduler.has_pending_ddls(region_id));
1664        let result = output_rx.await.unwrap();
1665        assert_matches!(result, Err(_));
1666    }
1667
1668    #[tokio::test]
1669    async fn test_pending_ddl_request_failed_on_region_truncated() {
1670        let env = SchedulerEnv::new().await;
1671        let (tx, _rx) = mpsc::channel(4);
1672        let mut scheduler = env.mock_compaction_scheduler(tx);
1673        let builder = VersionControlBuilder::new();
1674        let version_control = Arc::new(builder.build());
1675        let region_id = builder.region_id();
1676
1677        scheduler.region_status.insert(
1678            region_id,
1679            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1680        );
1681
1682        let (output_tx, output_rx) = oneshot::channel();
1683        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1684            region_id,
1685            sender: OptionOutputTx::from(output_tx),
1686            request: crate::request::DdlRequest::EnterStaging(
1687                store_api::region_request::EnterStagingRequest {
1688                    partition_directive:
1689                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1690                },
1691            ),
1692        });
1693
1694        assert!(scheduler.has_pending_ddls(region_id));
1695        scheduler.on_region_truncated(region_id);
1696
1697        assert!(!scheduler.has_pending_ddls(region_id));
1698        let result = output_rx.await.unwrap();
1699        assert_matches!(result, Err(_));
1700    }
1701
1702    #[tokio::test]
1703    async fn test_on_compaction_finished_returns_pending_ddl_requests() {
1704        let job_scheduler = Arc::new(VecScheduler::default());
1705        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1706        let (tx, _rx) = mpsc::channel(4);
1707        let mut scheduler = env.mock_compaction_scheduler(tx);
1708        let builder = VersionControlBuilder::new();
1709        let version_control = Arc::new(builder.build());
1710        let region_id = builder.region_id();
1711        let manifest_ctx = env
1712            .mock_manifest_context(version_control.current().version.metadata.clone())
1713            .await;
1714        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1715
1716        scheduler.region_status.insert(
1717            region_id,
1718            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1719        );
1720
1721        let (output_tx, _output_rx) = oneshot::channel();
1722        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1723            region_id,
1724            sender: OptionOutputTx::from(output_tx),
1725            request: crate::request::DdlRequest::EnterStaging(
1726                store_api::region_request::EnterStagingRequest {
1727                    partition_directive:
1728                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1729                },
1730            ),
1731        });
1732
1733        let pending_ddls = scheduler
1734            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1735            .await;
1736
1737        assert_eq!(pending_ddls.len(), 1);
1738        assert!(!scheduler.has_pending_ddls(region_id));
1739        assert!(!scheduler.region_status.contains_key(&region_id));
1740        assert_eq!(job_scheduler.num_jobs(), 0);
1741    }
1742
1743    #[tokio::test]
1744    async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
1745        let env = SchedulerEnv::new().await;
1746        let (tx, _rx) = mpsc::channel(4);
1747        let mut scheduler = env.mock_compaction_scheduler(tx);
1748        let builder = VersionControlBuilder::new();
1749        let version_control = Arc::new(builder.build());
1750        let region_id = builder.region_id();
1751        let manifest_ctx = env
1752            .mock_manifest_context(version_control.current().version.metadata.clone())
1753            .await;
1754        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1755
1756        let (manual_tx, manual_rx) = oneshot::channel();
1757        let mut status =
1758            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
1759        status.set_pending_request(PendingCompaction {
1760            options: compact_request::Options::Regular(Default::default()),
1761            waiter: OptionOutputTx::from(manual_tx),
1762            max_parallelism: 1,
1763        });
1764        scheduler.region_status.insert(region_id, status);
1765
1766        let (ddl_tx, _ddl_rx) = oneshot::channel();
1767        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1768            region_id,
1769            sender: OptionOutputTx::from(ddl_tx),
1770            request: crate::request::DdlRequest::EnterStaging(
1771                store_api::region_request::EnterStagingRequest {
1772                    partition_directive:
1773                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1774                },
1775            ),
1776        });
1777
1778        let pending_ddls = scheduler
1779            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1780            .await;
1781
1782        assert_eq!(pending_ddls.len(), 1);
1783        assert!(!scheduler.region_status.contains_key(&region_id));
1784        assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
1785    }
1786
1787    #[tokio::test]
1788    async fn test_on_compaction_finished_returns_empty_when_region_absent() {
1789        let env = SchedulerEnv::new().await;
1790        let (tx, _rx) = mpsc::channel(4);
1791        let mut scheduler = env.mock_compaction_scheduler(tx);
1792        let builder = VersionControlBuilder::new();
1793        let region_id = builder.region_id();
1794        let version_control = Arc::new(builder.build());
1795        let manifest_ctx = env
1796            .mock_manifest_context(version_control.current().version.metadata.clone())
1797            .await;
1798        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1799
1800        let pending_ddls = scheduler
1801            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1802            .await;
1803
1804        assert!(pending_ddls.is_empty());
1805    }
1806
1807    #[tokio::test]
1808    async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
1809        let env = SchedulerEnv::new()
1810            .await
1811            .scheduler(Arc::new(FailingScheduler));
1812        let (tx, _rx) = mpsc::channel(4);
1813        let mut scheduler = env.mock_compaction_scheduler(tx);
1814        let mut builder = VersionControlBuilder::new();
1815        let end = 1000 * 1000;
1816        let version_control = Arc::new(
1817            builder
1818                .push_l0_file(0, end)
1819                .push_l0_file(10, end)
1820                .push_l0_file(50, end)
1821                .push_l0_file(80, end)
1822                .push_l0_file(90, end)
1823                .build(),
1824        );
1825        let region_id = builder.region_id();
1826        let manifest_ctx = env
1827            .mock_manifest_context(version_control.current().version.metadata.clone())
1828            .await;
1829        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1830
1831        let (manual_tx, manual_rx) = oneshot::channel();
1832        let mut status =
1833            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
1834        status.set_pending_request(PendingCompaction {
1835            options: compact_request::Options::Regular(Default::default()),
1836            waiter: OptionOutputTx::from(manual_tx),
1837            max_parallelism: 1,
1838        });
1839        scheduler.region_status.insert(region_id, status);
1840
1841        let (ddl_tx, ddl_rx) = oneshot::channel();
1842        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1843            region_id,
1844            sender: OptionOutputTx::from(ddl_tx),
1845            request: crate::request::DdlRequest::EnterStaging(
1846                store_api::region_request::EnterStagingRequest {
1847                    partition_directive:
1848                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1849                },
1850            ),
1851        });
1852
1853        let pending_ddls = scheduler
1854            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1855            .await;
1856
1857        assert!(pending_ddls.is_empty());
1858        assert!(!scheduler.region_status.contains_key(&region_id));
1859        assert!(manual_rx.await.is_err());
1860        assert_matches!(ddl_rx.await.unwrap(), Err(_));
1861    }
1862
1863    #[tokio::test]
1864    async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
1865        let env = SchedulerEnv::new().await;
1866        let (tx, _rx) = mpsc::channel(4);
1867        let mut scheduler = env.mock_compaction_scheduler(tx);
1868        let builder = VersionControlBuilder::new();
1869        let version_control = Arc::new(builder.build());
1870        let region_id = builder.region_id();
1871        let manifest_ctx = env
1872            .mock_manifest_context(version_control.current().version.metadata.clone())
1873            .await;
1874        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1875
1876        scheduler.region_status.insert(
1877            region_id,
1878            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1879        );
1880
1881        let pending_ddls = scheduler
1882            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1883            .await;
1884
1885        assert!(pending_ddls.is_empty());
1886        assert!(!scheduler.region_status.contains_key(&region_id));
1887    }
1888
1889    #[tokio::test]
1890    async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
1891        let env = SchedulerEnv::new()
1892            .await
1893            .scheduler(Arc::new(FailingScheduler));
1894        let (tx, _rx) = mpsc::channel(4);
1895        let mut scheduler = env.mock_compaction_scheduler(tx);
1896        let mut builder = VersionControlBuilder::new();
1897        let end = 1000 * 1000;
1898        let version_control = Arc::new(
1899            builder
1900                .push_l0_file(0, end)
1901                .push_l0_file(10, end)
1902                .push_l0_file(50, end)
1903                .push_l0_file(80, end)
1904                .push_l0_file(90, end)
1905                .build(),
1906        );
1907        let region_id = builder.region_id();
1908        let manifest_ctx = env
1909            .mock_manifest_context(version_control.current().version.metadata.clone())
1910            .await;
1911        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1912
1913        scheduler.region_status.insert(
1914            region_id,
1915            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1916        );
1917
1918        let pending_ddls = scheduler
1919            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1920            .await;
1921
1922        assert!(pending_ddls.is_empty());
1923        assert!(!scheduler.region_status.contains_key(&region_id));
1924    }
1925
1926    #[tokio::test]
1927    async fn test_concurrent_memory_competition() {
1928        let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
1929        let barrier = Arc::new(Barrier::new(3));
1930        let mut handles = vec![];
1931
1932        // Spawn 3 tasks competing for memory, each trying to acquire 2MB
1933        for _i in 0..3 {
1934            let mgr = manager.clone();
1935            let bar = barrier.clone();
1936            let handle = tokio::spawn(async move {
1937                bar.wait().await; // Synchronize start
1938                mgr.try_acquire(2 * 1024 * 1024)
1939            });
1940            handles.push(handle);
1941        }
1942
1943        let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1944            .await
1945            .into_iter()
1946            .map(|r| r.unwrap())
1947            .collect();
1948
1949        // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
1950        let succeeded = results.iter().filter(|r| r.is_some()).count();
1951        let failed = results.iter().filter(|r| r.is_none()).count();
1952
1953        assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1954        assert_eq!(failed, 2, "Expected 2 tasks to fail");
1955
1956        // Clean up
1957        drop(results);
1958        assert_eq!(manager.used_bytes(), 0);
1959    }
1960}