Skip to main content

mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::{Arc, Mutex};
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_base::cancellation::CancellationHandle;
34use common_memory_manager::OnExhaustedPolicy;
35use common_meta::key::SchemaMetadataManagerRef;
36use common_telemetry::{debug, error, info, warn};
37use common_time::range::TimestampRange;
38use common_time::timestamp::TimeUnit;
39use common_time::{TimeToLive, Timestamp};
40use datafusion_common::ScalarValue;
41use datafusion_expr::Expr;
42use serde::{Deserialize, Serialize};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{RegionId, TableId};
46use task::MAX_PARALLEL_COMPACTION;
47use tokio::sync::mpsc::{self, Sender};
48
49use crate::access_layer::AccessLayerRef;
50use crate::cache::{CacheManagerRef, CacheStrategy};
51use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
52use crate::compaction::memory_manager::CompactionMemoryManager;
53use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
54use crate::compaction::task::CompactionTaskImpl;
55use crate::config::MitoConfig;
56use crate::error::{
57    CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu,
58    ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
59    RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
60};
61use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
62use crate::read::BoxedRecordBatchStream;
63use crate::read::projection::ProjectionMapper;
64use crate::read::scan_region::{PredicateGroup, ScanInput};
65use crate::read::seq_scan::SeqScan;
66use crate::region::options::{MergeMode, RegionOptions};
67use crate::region::version::VersionControlRef;
68use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
69use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
70use crate::schedule::remote_job_scheduler::{
71    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
72};
73use crate::schedule::scheduler::SchedulerRef;
74use crate::sst::file::{FileHandle, FileMeta, Level};
75use crate::sst::version::LevelMeta;
76use crate::worker::WorkerListener;
77
78/// Region compaction request.
79pub struct CompactionRequest {
80    pub(crate) engine_config: Arc<MitoConfig>,
81    pub(crate) current_version: CompactionVersion,
82    pub(crate) access_layer: AccessLayerRef,
83    /// Sender to send notification to the region worker.
84    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
85    /// Waiters of the compaction request.
86    pub(crate) waiters: Vec<OutputTx>,
87    /// Start time of compaction task.
88    pub(crate) start_time: Instant,
89    pub(crate) cache_manager: CacheManagerRef,
90    pub(crate) manifest_ctx: ManifestContextRef,
91    pub(crate) listener: WorkerListener,
92    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
93    pub(crate) max_parallelism: usize,
94}
95
96impl CompactionRequest {
97    pub(crate) fn region_id(&self) -> RegionId {
98        self.current_version.metadata.region_id
99    }
100}
101
102/// Compaction scheduler tracks and manages compaction tasks.
103pub(crate) struct CompactionScheduler {
104    scheduler: SchedulerRef,
105    /// Compacting regions.
106    region_status: HashMap<RegionId, CompactionStatus>,
107    /// Request sender of the worker that this scheduler belongs to.
108    request_sender: Sender<WorkerRequestWithTime>,
109    cache_manager: CacheManagerRef,
110    engine_config: Arc<MitoConfig>,
111    memory_manager: Arc<CompactionMemoryManager>,
112    memory_policy: OnExhaustedPolicy,
113    listener: WorkerListener,
114    /// Plugins for the compaction scheduler.
115    plugins: Plugins,
116}
117
118impl CompactionScheduler {
119    #[allow(clippy::too_many_arguments)]
120    pub(crate) fn new(
121        scheduler: SchedulerRef,
122        request_sender: Sender<WorkerRequestWithTime>,
123        cache_manager: CacheManagerRef,
124        engine_config: Arc<MitoConfig>,
125        listener: WorkerListener,
126        plugins: Plugins,
127        memory_manager: Arc<CompactionMemoryManager>,
128        memory_policy: OnExhaustedPolicy,
129    ) -> Self {
130        Self {
131            scheduler,
132            region_status: HashMap::new(),
133            request_sender,
134            cache_manager,
135            engine_config,
136            memory_manager,
137            memory_policy,
138            listener,
139            plugins,
140        }
141    }
142
143    /// Schedules a compaction for the region.
144    #[allow(clippy::too_many_arguments)]
145    pub(crate) async fn schedule_compaction(
146        &mut self,
147        region_id: RegionId,
148        compact_options: compact_request::Options,
149        version_control: &VersionControlRef,
150        access_layer: &AccessLayerRef,
151        waiter: OptionOutputTx,
152        manifest_ctx: &ManifestContextRef,
153        schema_metadata_manager: SchemaMetadataManagerRef,
154        max_parallelism: usize,
155    ) -> Result<()> {
156        // skip compaction if region is in staging state
157        let current_state = manifest_ctx.current_state();
158        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
159            info!(
160                "Skipping compaction for region {} in staging mode, options: {:?}",
161                region_id, compact_options
162            );
163            waiter.send(Ok(0));
164            return Ok(());
165        }
166
167        if let Some(status) = self.region_status.get_mut(&region_id) {
168            match compact_options {
169                Options::Regular(_) => {
170                    // Region is compacting. Add the waiter to pending list.
171                    status.merge_waiter(waiter);
172                }
173                options @ Options::StrictWindow(_) => {
174                    // Incoming compaction request is manually triggered.
175                    status.set_pending_request(PendingCompaction {
176                        options,
177                        waiter,
178                        max_parallelism,
179                    });
180                    info!(
181                        "Region {} is compacting, manually compaction will be re-scheduled.",
182                        region_id
183                    );
184                }
185            }
186            return Ok(());
187        }
188
189        // The region can compact directly.
190        let mut status =
191            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
192        let request = status.new_compaction_request(
193            self.request_sender.clone(),
194            waiter,
195            self.engine_config.clone(),
196            self.cache_manager.clone(),
197            manifest_ctx,
198            self.listener.clone(),
199            schema_metadata_manager,
200            max_parallelism,
201        );
202
203        let result = match self
204            .schedule_compaction_request(request, compact_options)
205            .await
206        {
207            Ok(Some(active_compaction)) => {
208                // Publish CompactionStatus only after a task has been accepted by the scheduler.
209                // This avoids exposing a half-initialized region status that could collect pending
210                // DDL/compaction state even though no compaction is actually running.
211                status.active_compaction = Some(active_compaction);
212                self.region_status.insert(region_id, status);
213
214                Ok(())
215            }
216            Ok(None) => Ok(()),
217            Err(e) => Err(e),
218        };
219
220        self.listener.on_compaction_scheduled(region_id);
221        result
222    }
223
224    // Handle pending manual compaction request for the region.
225    //
226    // Returns true if should early return, false otherwise.
227    pub(crate) async fn handle_pending_compaction_request(
228        &mut self,
229        region_id: RegionId,
230        manifest_ctx: &ManifestContextRef,
231        schema_metadata_manager: SchemaMetadataManagerRef,
232    ) -> bool {
233        let Some(status) = self.region_status.get_mut(&region_id) else {
234            return true;
235        };
236
237        // If there is a pending manual compaction request, schedule it.
238        // and defer returning the pending DDL requests to the caller.
239        let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
240            return false;
241        };
242
243        let PendingCompaction {
244            options,
245            waiter,
246            max_parallelism,
247        } = pending_request;
248
249        let request = {
250            status.new_compaction_request(
251                self.request_sender.clone(),
252                waiter,
253                self.engine_config.clone(),
254                self.cache_manager.clone(),
255                manifest_ctx,
256                self.listener.clone(),
257                schema_metadata_manager,
258                max_parallelism,
259            )
260        };
261
262        match self.schedule_compaction_request(request, options).await {
263            Ok(Some(active_compaction)) => {
264                let status = self.region_status.get_mut(&region_id).unwrap();
265                status.active_compaction = Some(active_compaction);
266                debug!(
267                    "Successfully scheduled manual compaction for region id: {}",
268                    region_id
269                );
270                true
271            }
272            Ok(None) => {
273                // We still need to handle the pending DDL requests.
274                // So we can't return early here.
275                false
276            }
277            Err(e) => {
278                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
279                self.remove_region_on_failure(region_id, Arc::new(e));
280                true
281            }
282        }
283    }
284
285    /// Notifies the scheduler that the compaction job is finished successfully.
286    pub(crate) async fn on_compaction_finished(
287        &mut self,
288        region_id: RegionId,
289        manifest_ctx: &ManifestContextRef,
290        schema_metadata_manager: SchemaMetadataManagerRef,
291    ) -> Vec<SenderDdlRequest> {
292        let Some(status) = self.region_status.get_mut(&region_id) else {
293            return Vec::new();
294        };
295        status.clear_running_task();
296
297        // If there a pending compaction request, handle it first
298        // and defer returning the pending DDL requests to the caller.
299        if self
300            .handle_pending_compaction_request(
301                region_id,
302                manifest_ctx,
303                schema_metadata_manager.clone(),
304            )
305            .await
306        {
307            return Vec::new();
308        }
309
310        let Some(status) = self.region_status.get_mut(&region_id) else {
311            // The region status might be removed by the previous steps.
312            // So we return empty DDL requests.
313            return Vec::new();
314        };
315
316        for waiter in std::mem::take(&mut status.waiters) {
317            waiter.send(Ok(0));
318        }
319
320        // If there are pending DDL requests, run them.
321        let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
322        if !pending_ddl_requests.is_empty() {
323            self.region_status.remove(&region_id);
324            // If there are pending DDL requests, we should return them to the caller.
325            // And skip try to schedule next compaction task.
326            return pending_ddl_requests;
327        }
328
329        // We should always try to compact the region until picker returns None.
330        let request = status.new_compaction_request(
331            self.request_sender.clone(),
332            OptionOutputTx::none(),
333            self.engine_config.clone(),
334            self.cache_manager.clone(),
335            manifest_ctx,
336            self.listener.clone(),
337            schema_metadata_manager,
338            MAX_PARALLEL_COMPACTION,
339        );
340
341        // Try to schedule next compaction task for this region.
342        match self
343            .schedule_compaction_request(
344                request,
345                compact_request::Options::Regular(Default::default()),
346            )
347            .await
348        {
349            Ok(Some(active_compaction)) => {
350                self.region_status
351                    .get_mut(&region_id)
352                    .unwrap()
353                    .active_compaction = Some(active_compaction);
354                debug!(
355                    "Successfully scheduled next compaction for region id: {}",
356                    region_id
357                );
358            }
359            Ok(None) => {
360                // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
361                // All DDL requests and pending compaction requests have already been processed.
362                // Safe to remove the region from status tracking.
363                self.region_status.remove(&region_id);
364            }
365            Err(e) => {
366                error!(e; "Failed to schedule next compaction for region {}", region_id);
367                self.remove_region_on_failure(region_id, Arc::new(e));
368            }
369        }
370
371        Vec::new()
372    }
373
374    /// Notifies the scheduler that the compaction job is cancelled cooperatively.
375    pub(crate) async fn on_compaction_cancelled(
376        &mut self,
377        region_id: RegionId,
378    ) -> Vec<SenderDdlRequest> {
379        self.remove_region_on_cancel(region_id)
380    }
381
382    /// Notifies the scheduler that the compaction job is failed.
383    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
384        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
385        self.remove_region_on_failure(region_id, err);
386    }
387
388    /// Notifies the scheduler that the region is dropped.
389    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
390        self.remove_region_on_failure(
391            region_id,
392            Arc::new(RegionDroppedSnafu { region_id }.build()),
393        );
394    }
395
396    /// Notifies the scheduler that the region is closed.
397    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
398        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
399    }
400
401    /// Notifies the scheduler that the region is truncated.
402    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
403        self.remove_region_on_failure(
404            region_id,
405            Arc::new(RegionTruncatedSnafu { region_id }.build()),
406        );
407    }
408
409    /// Add ddl request to pending queue.
410    ///
411    /// # Panics
412    /// Panics if region didn't request compaction.
413    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
414        debug!(
415            "Added pending DDL request for region: {}, ddl: {:?}",
416            request.region_id, request.request
417        );
418        let status = self.region_status.get_mut(&request.region_id).unwrap();
419        status.pending_ddl_requests.push(request);
420    }
421
422    #[cfg(test)]
423    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
424        let has_pending = self
425            .region_status
426            .get(&region_id)
427            .map(|status| !status.pending_ddl_requests.is_empty())
428            .unwrap_or(false);
429        debug!(
430            "Checked pending DDL requests for region: {}, has_pending: {}",
431            region_id, has_pending
432        );
433        has_pending
434    }
435
436    pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
437        let Some(status) = self.region_status.get_mut(&region_id) else {
438            return RequestCancelResult::NotRunning;
439        };
440
441        status.request_cancel()
442    }
443
444    /// Schedules a compaction request.
445    ///
446    /// Returns the active compaction state if the request is scheduled successfully.
447    /// Returns `None` if no compaction task can be scheduled for this region.
448    async fn schedule_compaction_request(
449        &mut self,
450        request: CompactionRequest,
451        options: compact_request::Options,
452    ) -> Result<Option<ActiveCompaction>> {
453        let region_id = request.region_id();
454        let (dynamic_compaction_opts, ttl) = find_dynamic_options(
455            region_id.table_id(),
456            &request.current_version.options,
457            &request.schema_metadata_manager,
458        )
459        .await
460        .unwrap_or_else(|e| {
461            warn!(e; "Failed to find dynamic options for region: {}", region_id);
462            (
463                request.current_version.options.compaction.clone(),
464                request.current_version.options.ttl.unwrap_or_default(),
465            )
466        });
467
468        let picker = new_picker(
469            &options,
470            &dynamic_compaction_opts,
471            request.current_version.options.append_mode,
472            Some(self.engine_config.max_background_compactions),
473        );
474        let region_id = request.region_id();
475        let CompactionRequest {
476            engine_config,
477            current_version,
478            access_layer,
479            request_sender,
480            waiters,
481            start_time,
482            cache_manager,
483            manifest_ctx,
484            listener,
485            schema_metadata_manager: _,
486            max_parallelism,
487        } = request;
488
489        debug!(
490            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
491            picker, region_id, ttl
492        );
493
494        let compaction_region = CompactionRegion {
495            region_id,
496            current_version: current_version.clone(),
497            region_options: RegionOptions {
498                compaction: dynamic_compaction_opts.clone(),
499                ..current_version.options.clone()
500            },
501            engine_config: engine_config.clone(),
502            region_metadata: current_version.metadata.clone(),
503            cache_manager: cache_manager.clone(),
504            access_layer: access_layer.clone(),
505            manifest_ctx: manifest_ctx.clone(),
506            file_purger: None,
507            ttl: Some(ttl),
508            max_parallelism,
509        };
510
511        let picker_output = {
512            let _pick_timer = COMPACTION_STAGE_ELAPSED
513                .with_label_values(&["pick"])
514                .start_timer();
515            picker.pick(&compaction_region)
516        };
517
518        let picker_output = if let Some(picker_output) = picker_output {
519            picker_output
520        } else {
521            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
522            for waiter in waiters {
523                waiter.send(Ok(0));
524            }
525            return Ok(None);
526        };
527
528        // If specified to run compaction remotely, we schedule the compaction job remotely.
529        // It will fall back to local compaction if there is no remote job scheduler.
530        let waiters = if dynamic_compaction_opts.remote_compaction() {
531            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
532                let remote_compaction_job = CompactionJob {
533                    compaction_region: compaction_region.clone(),
534                    picker_output: picker_output.clone(),
535                    start_time,
536                    waiters,
537                    ttl,
538                };
539
540                let result = remote_job_scheduler
541                    .schedule(
542                        RemoteJob::CompactionJob(remote_compaction_job),
543                        Box::new(DefaultNotifier {
544                            request_sender: request_sender.clone(),
545                        }),
546                    )
547                    .await;
548
549                match result {
550                    Ok(job_id) => {
551                        info!(
552                            "Scheduled remote compaction job {} for region {}",
553                            job_id, region_id
554                        );
555                        INFLIGHT_COMPACTION_COUNT.inc();
556                        return Ok(Some(ActiveCompaction::Remote));
557                    }
558                    Err(e) => {
559                        if !dynamic_compaction_opts.fallback_to_local() {
560                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
561                            return RemoteCompactionSnafu {
562                                region_id,
563                                job_id: None,
564                                reason: e.reason,
565                            }
566                            .fail();
567                        }
568
569                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
570
571                        // Return the waiters back to the caller for local compaction.
572                        e.waiters
573                    }
574                }
575            } else {
576                debug!(
577                    "Remote compaction is not enabled, fallback to local compaction for region {}",
578                    region_id
579                );
580                waiters
581            }
582        } else {
583            waiters
584        };
585
586        // Create a local compaction task.
587        let estimated_bytes = estimate_compaction_bytes(&picker_output);
588
589        let cancel_handle = Arc::new(CancellationHandle::default());
590        let state = LocalCompactionState::new(cancel_handle.clone());
591        let local_compaction_task = Box::new(CompactionTaskImpl {
592            state: state.clone(),
593            request_sender,
594            waiters,
595            start_time,
596            listener,
597            picker_output,
598            compaction_region,
599            compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
600            memory_manager: self.memory_manager.clone(),
601            memory_policy: self.memory_policy,
602            estimated_memory_bytes: estimated_bytes,
603        });
604
605        self.submit_compaction_task(local_compaction_task, region_id)
606            .map(|_| Some(ActiveCompaction::Local { state }))
607    }
608
609    fn submit_compaction_task(
610        &mut self,
611        mut task: Box<CompactionTaskImpl>,
612        region_id: RegionId,
613    ) -> Result<()> {
614        self.scheduler
615            .schedule(Box::pin(async move {
616                INFLIGHT_COMPACTION_COUNT.inc();
617                task.run().await;
618                INFLIGHT_COMPACTION_COUNT.dec();
619            }))
620            .inspect_err(
621                |e| error!(e; "Failed to submit compaction request for region {}", region_id),
622            )
623    }
624
625    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
626        // Remove this region.
627        let Some(status) = self.region_status.remove(&region_id) else {
628            return;
629        };
630
631        // Notifies all pending tasks.
632        status.on_failure(err);
633    }
634
635    fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
636        let Some(status) = self.region_status.remove(&region_id) else {
637            return Vec::new();
638        };
639
640        status.on_cancel()
641    }
642}
643
644#[derive(Debug, Clone)]
645pub(crate) struct LocalCompactionState {
646    cancel_handle: Arc<CancellationHandle>,
647    commit_started: Arc<Mutex<bool>>,
648}
649
650#[derive(Debug)]
651enum ActiveCompaction {
652    Local { state: LocalCompactionState },
653    Remote,
654}
655
656impl LocalCompactionState {
657    fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
658        Self {
659            cancel_handle,
660            commit_started: Arc::new(Mutex::new(false)),
661        }
662    }
663
664    /// Returns the cancellation handle for this compaction task.
665    pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
666        self.cancel_handle.clone()
667    }
668
669    /// Marks the compaction task as started to commit,
670    /// which means the compaction task is in the final stage and is about to update region version and manifest.
671    /// It will reject cancellation request after this method is called.
672    ///
673    /// Returns true if this is the first time to mark commit started, false otherwise.
674    pub(crate) fn mark_commit_started(&self) -> bool {
675        let mut commit_started = self.commit_started.lock().unwrap();
676        if self.cancel_handle.is_cancelled() {
677            return false;
678        }
679        *commit_started = true;
680        true
681    }
682
683    /// Request cancellation for this compaction task.
684    pub(crate) fn request_cancel(&self) -> RequestCancelResult {
685        // The cancel handle must under the lock of `commit_started` to avoid racing between cancellation and commit.
686        let commit_started = self.commit_started.lock().unwrap();
687        if *commit_started {
688            return RequestCancelResult::TooLateToCancel;
689        }
690        if self.cancel_handle.is_cancelled() {
691            return RequestCancelResult::AlreadyCancelling;
692        }
693
694        self.cancel_handle.cancel();
695        RequestCancelResult::CancelIssued
696    }
697}
698
699#[derive(Debug, Clone, Copy, PartialEq, Eq)]
700pub(crate) enum RequestCancelResult {
701    CancelIssued,
702    AlreadyCancelling,
703    TooLateToCancel,
704    NotRunning,
705}
706
707impl Drop for CompactionScheduler {
708    fn drop(&mut self) {
709        for (region_id, status) in self.region_status.drain() {
710            // We are shutting down so notify all pending tasks.
711            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
712        }
713    }
714}
715
716/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
717async fn find_dynamic_options(
718    table_id: TableId,
719    region_options: &crate::region::options::RegionOptions,
720    schema_metadata_manager: &SchemaMetadataManagerRef,
721) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
722    if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
723        debug!(
724            "Use region options directly for table {}: compaction={:?}, ttl={:?}",
725            table_id, region_options.compaction, region_options.ttl
726        );
727        return Ok((region_options.compaction.clone(), ttl));
728    }
729
730    let db_options = tokio::time::timeout(
731        crate::config::FETCH_OPTION_TIMEOUT,
732        schema_metadata_manager.get_schema_options_by_table_id(table_id),
733    )
734    .await
735    .context(TimeoutSnafu)?
736    .context(GetSchemaMetadataSnafu)?;
737
738    let ttl = if let Some(ttl) = region_options.ttl {
739        debug!(
740            "Use region TTL directly for table {}: ttl={:?}",
741            table_id, region_options.ttl
742        );
743        ttl
744    } else {
745        db_options
746            .as_ref()
747            .and_then(|options| options.ttl)
748            .unwrap_or_default()
749            .into()
750    };
751
752    let compaction = if !region_options.compaction_override {
753        if let Some(schema_opts) = db_options {
754            let map: HashMap<String, String> = schema_opts
755                .extra_options
756                .iter()
757                .filter_map(|(k, v)| {
758                    if k.starts_with("compaction.") {
759                        Some((k.clone(), v.clone()))
760                    } else {
761                        None
762                    }
763                })
764                .collect();
765            if map.is_empty() {
766                region_options.compaction.clone()
767            } else {
768                crate::region::options::RegionOptions::try_from(&map)
769                    .map(|o| o.compaction)
770                    .unwrap_or_else(|e| {
771                        error!(e; "Failed to create RegionOptions from map");
772                        region_options.compaction.clone()
773                    })
774            }
775        } else {
776            debug!(
777                "DB options is None for table {}, use region compaction: compaction={:?}",
778                table_id, region_options.compaction
779            );
780            region_options.compaction.clone()
781        }
782    } else {
783        debug!(
784            "No schema options for table {}, use region compaction: compaction={:?}",
785            table_id, region_options.compaction
786        );
787        region_options.compaction.clone()
788    };
789
790    debug!(
791        "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
792        table_id, compaction, ttl
793    );
794    Ok((compaction, ttl))
795}
796
797/// Status of running and pending region compaction tasks.
798struct CompactionStatus {
799    /// Id of the region.
800    region_id: RegionId,
801    /// Version control of the region.
802    version_control: VersionControlRef,
803    /// Access layer of the region.
804    access_layer: AccessLayerRef,
805    /// Pending waiters for compaction.
806    waiters: Vec<OutputTx>,
807    /// Pending compactions that are supposed to run as soon as current compaction task finished.
808    pending_request: Option<PendingCompaction>,
809    /// Pending DDL requests that should run when compaction is done.
810    pending_ddl_requests: Vec<SenderDdlRequest>,
811    /// Active compaction state.
812    active_compaction: Option<ActiveCompaction>,
813}
814
815impl CompactionStatus {
816    /// Creates a new [CompactionStatus]
817    fn new(
818        region_id: RegionId,
819        version_control: VersionControlRef,
820        access_layer: AccessLayerRef,
821    ) -> CompactionStatus {
822        CompactionStatus {
823            region_id,
824            version_control,
825            access_layer,
826            waiters: Vec::new(),
827            pending_request: None,
828            pending_ddl_requests: Vec::new(),
829            active_compaction: None,
830        }
831    }
832
833    #[cfg(test)]
834    fn start_local_task(&mut self) -> LocalCompactionState {
835        let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
836        self.active_compaction = Some(ActiveCompaction::Local {
837            state: state.clone(),
838        });
839        state
840    }
841
842    #[cfg(test)]
843    fn start_remote_task(&mut self) {
844        self.active_compaction = Some(ActiveCompaction::Remote);
845    }
846
847    fn request_cancel(&mut self) -> RequestCancelResult {
848        let Some(active_compaction) = &self.active_compaction else {
849            return RequestCancelResult::NotRunning;
850        };
851
852        match active_compaction {
853            ActiveCompaction::Local { state, .. } => state.request_cancel(),
854            ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
855        }
856    }
857
858    fn clear_running_task(&mut self) -> bool {
859        self.active_compaction.take().is_some()
860    }
861
862    /// Merge the waiter to the pending compaction.
863    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
864        if let Some(waiter) = waiter.take_inner() {
865            self.waiters.push(waiter);
866        }
867    }
868
869    /// Set pending compaction request or replace current value if already exist.
870    fn set_pending_request(&mut self, pending: PendingCompaction) {
871        if let Some(prev) = self.pending_request.replace(pending) {
872            debug!(
873                "Replace pending compaction options with new request {:?} for region: {}",
874                prev.options, self.region_id
875            );
876            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
877        }
878    }
879
880    fn on_failure(mut self, err: Arc<Error>) {
881        for waiter in self.waiters.drain(..) {
882            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
883                region_id: self.region_id,
884            }));
885        }
886
887        if let Some(pending_compaction) = self.pending_request {
888            pending_compaction
889                .waiter
890                .send(Err(err.clone()).context(CompactRegionSnafu {
891                    region_id: self.region_id,
892                }));
893        }
894
895        for pending_ddl in self.pending_ddl_requests {
896            pending_ddl
897                .sender
898                .send(Err(err.clone()).context(CompactRegionSnafu {
899                    region_id: self.region_id,
900                }));
901        }
902    }
903
904    #[must_use]
905    fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
906        for waiter in self.waiters.drain(..) {
907            waiter.send(CompactionCancelledSnafu.fail());
908        }
909
910        if let Some(pending_compaction) = self.pending_request {
911            pending_compaction.waiter.send(
912                Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
913                    region_id: self.region_id,
914                }),
915            );
916        }
917
918        std::mem::take(&mut self.pending_ddl_requests)
919    }
920
921    /// Creates a new compaction request for compaction picker.
922    ///
923    /// It consumes all pending compaction waiters.
924    #[allow(clippy::too_many_arguments)]
925    fn new_compaction_request(
926        &mut self,
927        request_sender: Sender<WorkerRequestWithTime>,
928        mut waiter: OptionOutputTx,
929        engine_config: Arc<MitoConfig>,
930        cache_manager: CacheManagerRef,
931        manifest_ctx: &ManifestContextRef,
932        listener: WorkerListener,
933        schema_metadata_manager: SchemaMetadataManagerRef,
934        max_parallelism: usize,
935    ) -> CompactionRequest {
936        let current_version = CompactionVersion::from(self.version_control.current().version);
937        let start_time = Instant::now();
938        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
939        waiters.extend(std::mem::take(&mut self.waiters));
940
941        if let Some(waiter) = waiter.take_inner() {
942            waiters.push(waiter);
943        }
944
945        CompactionRequest {
946            engine_config,
947            current_version,
948            access_layer: self.access_layer.clone(),
949            request_sender: request_sender.clone(),
950            waiters,
951            start_time,
952            cache_manager,
953            manifest_ctx: manifest_ctx.clone(),
954            listener,
955            schema_metadata_manager,
956            max_parallelism,
957        }
958    }
959}
960
961#[derive(Debug, Clone)]
962pub struct CompactionOutput {
963    /// Compaction output file level.
964    pub output_level: Level,
965    /// Compaction input files.
966    pub inputs: Vec<FileHandle>,
967    /// Whether to remove deletion markers.
968    pub filter_deleted: bool,
969    /// Compaction output time range. Only windowed compaction specifies output time range.
970    pub output_time_range: Option<TimestampRange>,
971}
972
973/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
974#[derive(Debug, Clone, Serialize, Deserialize)]
975pub struct SerializedCompactionOutput {
976    output_level: Level,
977    inputs: Vec<FileMeta>,
978    filter_deleted: bool,
979    output_time_range: Option<TimestampRange>,
980}
981
982/// Builders to create [BoxedRecordBatchStream] for compaction.
983struct CompactionSstReaderBuilder<'a> {
984    metadata: RegionMetadataRef,
985    sst_layer: AccessLayerRef,
986    cache: CacheManagerRef,
987    inputs: &'a [FileHandle],
988    append_mode: bool,
989    filter_deleted: bool,
990    time_range: Option<TimestampRange>,
991    merge_mode: MergeMode,
992}
993
994impl CompactionSstReaderBuilder<'_> {
995    /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
996    async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
997        let scan_input = self.build_scan_input()?.with_compaction(true);
998
999        SeqScan::new(scan_input)
1000            .build_flat_reader_for_compaction()
1001            .await
1002    }
1003
1004    fn build_scan_input(self) -> Result<ScanInput> {
1005        let mapper = ProjectionMapper::all(&self.metadata)?;
1006        let mut scan_input = ScanInput::new(self.sst_layer, mapper)
1007            .with_files(self.inputs.to_vec())
1008            .with_append_mode(self.append_mode)
1009            // We use special cache strategy for compaction.
1010            .with_cache(CacheStrategy::Compaction(self.cache))
1011            .with_filter_deleted(self.filter_deleted)
1012            // We ignore file not found error during compaction.
1013            .with_ignore_file_not_found(true)
1014            .with_merge_mode(self.merge_mode);
1015
1016        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
1017        // by converting time ranges into predicate.
1018        if let Some(time_range) = self.time_range {
1019            scan_input =
1020                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
1021        }
1022
1023        Ok(scan_input)
1024    }
1025}
1026
1027/// Converts time range to predicates so that rows outside the range will be filtered.
1028fn time_range_to_predicate(
1029    range: TimestampRange,
1030    metadata: &RegionMetadataRef,
1031) -> Result<PredicateGroup> {
1032    let ts_col = metadata.time_index_column();
1033
1034    // safety: time index column's type must be a valid timestamp type.
1035    let ts_col_unit = ts_col
1036        .column_schema
1037        .data_type
1038        .as_timestamp()
1039        .unwrap()
1040        .unit();
1041
1042    let exprs = match (range.start(), range.end()) {
1043        (Some(start), Some(end)) => {
1044            vec![
1045                datafusion_expr::col(ts_col.column_schema.name.clone())
1046                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1047                datafusion_expr::col(ts_col.column_schema.name.clone())
1048                    .lt(ts_to_lit(*end, ts_col_unit)?),
1049            ]
1050        }
1051        (Some(start), None) => {
1052            vec![
1053                datafusion_expr::col(ts_col.column_schema.name.clone())
1054                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1055            ]
1056        }
1057
1058        (None, Some(end)) => {
1059            vec![
1060                datafusion_expr::col(ts_col.column_schema.name.clone())
1061                    .lt(ts_to_lit(*end, ts_col_unit)?),
1062            ]
1063        }
1064        (None, None) => {
1065            return Ok(PredicateGroup::default());
1066        }
1067    };
1068
1069    let predicate = PredicateGroup::new(metadata, &exprs)?;
1070    Ok(predicate)
1071}
1072
1073fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
1074    let ts = ts
1075        .convert_to(ts_col_unit)
1076        .context(TimeRangePredicateOverflowSnafu {
1077            timestamp: ts,
1078            unit: ts_col_unit,
1079        })?;
1080    let val = ts.value();
1081    let scalar_value = match ts_col_unit {
1082        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
1083        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
1084        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
1085        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
1086    };
1087    Ok(datafusion_expr::lit(scalar_value))
1088}
1089
1090/// Finds all expired SSTs across levels.
1091fn get_expired_ssts(
1092    levels: &[LevelMeta],
1093    ttl: Option<TimeToLive>,
1094    now: Timestamp,
1095) -> Vec<FileHandle> {
1096    let Some(ttl) = ttl else {
1097        return vec![];
1098    };
1099
1100    levels
1101        .iter()
1102        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
1103        .collect()
1104}
1105
1106/// Estimates compaction memory as the sum of all input files' maximum row-group
1107/// uncompressed sizes.
1108fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
1109    picker_output
1110        .outputs
1111        .iter()
1112        .flat_map(|output| output.inputs.iter())
1113        .map(|file: &FileHandle| {
1114            let meta = file.meta_ref();
1115            meta.max_row_group_uncompressed_size
1116        })
1117        .sum()
1118}
1119
1120/// Pending compaction request that is supposed to run after current task is finished,
1121/// typically used for manual compactions.
1122struct PendingCompaction {
1123    /// Compaction options. Currently, it can only be [StrictWindow].
1124    pub(crate) options: compact_request::Options,
1125    /// Waiters of pending requests.
1126    pub(crate) waiter: OptionOutputTx,
1127    /// Max parallelism for pending compaction.
1128    pub(crate) max_parallelism: usize,
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133    use std::assert_matches;
1134    use std::time::Duration;
1135
1136    use api::v1::region::StrictWindow;
1137    use common_datasource::compression::CompressionType;
1138    use common_meta::key::schema_name::SchemaNameValue;
1139    use common_time::DatabaseTimeToLive;
1140    use tokio::sync::{Barrier, oneshot};
1141
1142    use super::*;
1143    use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
1144    use crate::error::InvalidSchedulerStateSnafu;
1145    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1146    use crate::region::ManifestContext;
1147    use crate::schedule::scheduler::{Job, Scheduler};
1148    use crate::sst::FormatType;
1149    use crate::test_util::mock_schema_metadata_manager;
1150    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1151    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1152
1153    struct FailingScheduler;
1154
1155    #[async_trait::async_trait]
1156    impl Scheduler for FailingScheduler {
1157        fn schedule(&self, _job: Job) -> Result<()> {
1158            InvalidSchedulerStateSnafu.fail()
1159        }
1160
1161        async fn stop(&self, _await_termination: bool) -> Result<()> {
1162            Ok(())
1163        }
1164    }
1165
1166    #[tokio::test]
1167    async fn test_find_compaction_options_db_level() {
1168        let env = SchedulerEnv::new().await;
1169        let builder = VersionControlBuilder::new();
1170        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1171        let region_id = builder.region_id();
1172        let table_id = region_id.table_id();
1173        // Register table without ttl but with db-level compaction options
1174        let mut schema_value = SchemaNameValue {
1175            ttl: Some(DatabaseTimeToLive::default()),
1176            ..Default::default()
1177        };
1178        schema_value
1179            .extra_options
1180            .insert("compaction.type".to_string(), "twcs".to_string());
1181        schema_value
1182            .extra_options
1183            .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1184        schema_metadata_manager
1185            .register_region_table_info(
1186                table_id,
1187                "t",
1188                "c",
1189                "s",
1190                Some(schema_value),
1191                kv_backend.clone(),
1192            )
1193            .await;
1194
1195        let version_control = Arc::new(builder.build());
1196        let region_opts = version_control.current().version.options.clone();
1197        let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
1198            .await
1199            .unwrap();
1200        match opts {
1201            crate::region::options::CompactionOptions::Twcs(t) => {
1202                assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1203            }
1204        }
1205        let manifest_ctx = env
1206            .mock_manifest_context(version_control.current().version.metadata.clone())
1207            .await;
1208        let (tx, _rx) = mpsc::channel(4);
1209        let mut scheduler = env.mock_compaction_scheduler(tx);
1210        let (otx, _orx) = oneshot::channel();
1211        let request = scheduler
1212            .region_status
1213            .entry(region_id)
1214            .or_insert_with(|| {
1215                crate::compaction::CompactionStatus::new(
1216                    region_id,
1217                    version_control.clone(),
1218                    env.access_layer.clone(),
1219                )
1220            })
1221            .new_compaction_request(
1222                scheduler.request_sender.clone(),
1223                OptionOutputTx::new(Some(OutputTx::new(otx))),
1224                scheduler.engine_config.clone(),
1225                scheduler.cache_manager.clone(),
1226                &manifest_ctx,
1227                scheduler.listener.clone(),
1228                schema_metadata_manager.clone(),
1229                1,
1230            );
1231        scheduler
1232            .schedule_compaction_request(
1233                request,
1234                compact_request::Options::Regular(Default::default()),
1235            )
1236            .await
1237            .unwrap();
1238    }
1239
1240    #[tokio::test]
1241    async fn test_find_compaction_options_priority() {
1242        fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1243            let mut schema_value = SchemaNameValue {
1244                ttl: Some(DatabaseTimeToLive::default()),
1245                ..Default::default()
1246            };
1247            schema_value
1248                .extra_options
1249                .insert("compaction.type".to_string(), "twcs".to_string());
1250            schema_value.extra_options.insert(
1251                "compaction.twcs.time_window".to_string(),
1252                time_window.to_string(),
1253            );
1254            schema_value
1255        }
1256
1257        let cases = [
1258            (
1259                "db options set and table override set",
1260                Some(schema_value_with_twcs("2h")),
1261                true,
1262                Some(Duration::from_secs(5 * 3600)),
1263                Some(5 * 3600),
1264            ),
1265            (
1266                "db options set and table override not set",
1267                Some(schema_value_with_twcs("2h")),
1268                false,
1269                None,
1270                Some(2 * 3600),
1271            ),
1272            (
1273                "db options not set and table override set",
1274                None,
1275                true,
1276                Some(Duration::from_secs(4 * 3600)),
1277                Some(4 * 3600),
1278            ),
1279            (
1280                "db options not set and table override not set",
1281                None,
1282                false,
1283                None,
1284                None,
1285            ),
1286        ];
1287
1288        for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1289            let builder = VersionControlBuilder::new();
1290            let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1291            let table_id = builder.region_id().table_id();
1292            schema_metadata_manager
1293                .register_region_table_info(
1294                    table_id,
1295                    "t",
1296                    "c",
1297                    "s",
1298                    schema_value,
1299                    kv_backend.clone(),
1300                )
1301                .await;
1302
1303            let version_control = Arc::new(builder.build());
1304            let mut region_opts = version_control.current().version.options.clone();
1305            region_opts.compaction_override = override_set;
1306            if let Some(window) = table_window {
1307                let crate::region::options::CompactionOptions::Twcs(twcs) =
1308                    &mut region_opts.compaction;
1309                twcs.time_window = Some(window);
1310            }
1311
1312            let (opts, _) = find_dynamic_options(table_id, &region_opts, &schema_metadata_manager)
1313                .await
1314                .unwrap();
1315            match opts {
1316                crate::region::options::CompactionOptions::Twcs(t) => {
1317                    assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1318                }
1319            }
1320        }
1321    }
1322
1323    #[tokio::test]
1324    async fn test_schedule_empty() {
1325        let env = SchedulerEnv::new().await;
1326        let (tx, _rx) = mpsc::channel(4);
1327        let mut scheduler = env.mock_compaction_scheduler(tx);
1328        let mut builder = VersionControlBuilder::new();
1329        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1330        schema_metadata_manager
1331            .register_region_table_info(
1332                builder.region_id().table_id(),
1333                "test_table",
1334                "test_catalog",
1335                "test_schema",
1336                None,
1337                kv_backend,
1338            )
1339            .await;
1340        // Nothing to compact.
1341        let version_control = Arc::new(builder.build());
1342        let (output_tx, output_rx) = oneshot::channel();
1343        let waiter = OptionOutputTx::from(output_tx);
1344        let manifest_ctx = env
1345            .mock_manifest_context(version_control.current().version.metadata.clone())
1346            .await;
1347        scheduler
1348            .schedule_compaction(
1349                builder.region_id(),
1350                compact_request::Options::Regular(Default::default()),
1351                &version_control,
1352                &env.access_layer,
1353                waiter,
1354                &manifest_ctx,
1355                schema_metadata_manager.clone(),
1356                1,
1357            )
1358            .await
1359            .unwrap();
1360        let output = output_rx.await.unwrap().unwrap();
1361        assert_eq!(output, 0);
1362        assert!(scheduler.region_status.is_empty());
1363
1364        // Only one file, picker won't compact it.
1365        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1366        let (output_tx, output_rx) = oneshot::channel();
1367        let waiter = OptionOutputTx::from(output_tx);
1368        scheduler
1369            .schedule_compaction(
1370                builder.region_id(),
1371                compact_request::Options::Regular(Default::default()),
1372                &version_control,
1373                &env.access_layer,
1374                waiter,
1375                &manifest_ctx,
1376                schema_metadata_manager,
1377                1,
1378            )
1379            .await
1380            .unwrap();
1381        let output = output_rx.await.unwrap().unwrap();
1382        assert_eq!(output, 0);
1383        assert!(scheduler.region_status.is_empty());
1384    }
1385
1386    #[tokio::test]
1387    async fn test_schedule_on_finished() {
1388        common_telemetry::init_default_ut_logging();
1389        let job_scheduler = Arc::new(VecScheduler::default());
1390        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1391        let (tx, _rx) = mpsc::channel(4);
1392        let mut scheduler = env.mock_compaction_scheduler(tx);
1393        let mut builder = VersionControlBuilder::new();
1394        let purger = builder.file_purger();
1395        let region_id = builder.region_id();
1396
1397        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1398        schema_metadata_manager
1399            .register_region_table_info(
1400                builder.region_id().table_id(),
1401                "test_table",
1402                "test_catalog",
1403                "test_schema",
1404                None,
1405                kv_backend,
1406            )
1407            .await;
1408
1409        // 5 files to compact.
1410        let end = 1000 * 1000;
1411        let version_control = Arc::new(
1412            builder
1413                .push_l0_file(0, end)
1414                .push_l0_file(10, end)
1415                .push_l0_file(50, end)
1416                .push_l0_file(80, end)
1417                .push_l0_file(90, end)
1418                .build(),
1419        );
1420        let manifest_ctx = env
1421            .mock_manifest_context(version_control.current().version.metadata.clone())
1422            .await;
1423        scheduler
1424            .schedule_compaction(
1425                region_id,
1426                compact_request::Options::Regular(Default::default()),
1427                &version_control,
1428                &env.access_layer,
1429                OptionOutputTx::none(),
1430                &manifest_ctx,
1431                schema_metadata_manager.clone(),
1432                1,
1433            )
1434            .await
1435            .unwrap();
1436        // Should schedule 1 compaction.
1437        assert_eq!(1, scheduler.region_status.len());
1438        assert_eq!(1, job_scheduler.num_jobs());
1439        let data = version_control.current();
1440        let file_metas: Vec<_> = data.version.ssts.levels()[0]
1441            .files
1442            .values()
1443            .map(|file| file.meta_ref().clone())
1444            .collect();
1445
1446        // 5 files for next compaction and removes old files.
1447        apply_edit(
1448            &version_control,
1449            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1450            &file_metas,
1451            purger.clone(),
1452        );
1453        // The task is pending.
1454        let (tx, _rx) = oneshot::channel();
1455        scheduler
1456            .schedule_compaction(
1457                region_id,
1458                compact_request::Options::Regular(Default::default()),
1459                &version_control,
1460                &env.access_layer,
1461                OptionOutputTx::new(Some(OutputTx::new(tx))),
1462                &manifest_ctx,
1463                schema_metadata_manager.clone(),
1464                1,
1465            )
1466            .await
1467            .unwrap();
1468        assert_eq!(1, scheduler.region_status.len());
1469        assert_eq!(1, job_scheduler.num_jobs());
1470        assert!(
1471            !scheduler
1472                .region_status
1473                .get(&builder.region_id())
1474                .unwrap()
1475                .waiters
1476                .is_empty()
1477        );
1478
1479        // On compaction finished and schedule next compaction.
1480        scheduler
1481            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1482            .await;
1483        assert_eq!(1, scheduler.region_status.len());
1484        assert_eq!(2, job_scheduler.num_jobs());
1485
1486        // 5 files for next compaction.
1487        apply_edit(
1488            &version_control,
1489            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1490            &[],
1491            purger.clone(),
1492        );
1493        let (tx, _rx) = oneshot::channel();
1494        // The task is pending.
1495        scheduler
1496            .schedule_compaction(
1497                region_id,
1498                compact_request::Options::Regular(Default::default()),
1499                &version_control,
1500                &env.access_layer,
1501                OptionOutputTx::new(Some(OutputTx::new(tx))),
1502                &manifest_ctx,
1503                schema_metadata_manager,
1504                1,
1505            )
1506            .await
1507            .unwrap();
1508        assert_eq!(2, job_scheduler.num_jobs());
1509        assert!(
1510            !scheduler
1511                .region_status
1512                .get(&builder.region_id())
1513                .unwrap()
1514                .waiters
1515                .is_empty()
1516        );
1517    }
1518
1519    #[tokio::test]
1520    async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
1521        common_telemetry::init_default_ut_logging();
1522        let env = SchedulerEnv::new()
1523            .await
1524            .scheduler(Arc::new(FailingScheduler));
1525        let (tx, _rx) = mpsc::channel(4);
1526        let mut scheduler = env.mock_compaction_scheduler(tx);
1527        let mut builder = VersionControlBuilder::new();
1528        let end = 1000 * 1000;
1529        let version_control = Arc::new(
1530            builder
1531                .push_l0_file(0, end)
1532                .push_l0_file(10, end)
1533                .push_l0_file(50, end)
1534                .push_l0_file(80, end)
1535                .push_l0_file(90, end)
1536                .build(),
1537        );
1538        let region_id = builder.region_id();
1539        let manifest_ctx = env
1540            .mock_manifest_context(version_control.current().version.metadata.clone())
1541            .await;
1542        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1543        schema_metadata_manager
1544            .register_region_table_info(
1545                builder.region_id().table_id(),
1546                "test_table",
1547                "test_catalog",
1548                "test_schema",
1549                None,
1550                kv_backend,
1551            )
1552            .await;
1553
1554        let result = scheduler
1555            .schedule_compaction(
1556                region_id,
1557                compact_request::Options::Regular(Default::default()),
1558                &version_control,
1559                &env.access_layer,
1560                OptionOutputTx::none(),
1561                &manifest_ctx,
1562                schema_metadata_manager,
1563                1,
1564            )
1565            .await;
1566
1567        assert!(result.is_err());
1568        assert!(!scheduler.region_status.contains_key(&region_id));
1569    }
1570
1571    #[tokio::test]
1572    async fn test_manual_compaction_when_compaction_in_progress() {
1573        common_telemetry::init_default_ut_logging();
1574        let job_scheduler = Arc::new(VecScheduler::default());
1575        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1576        let (tx, _rx) = mpsc::channel(4);
1577        let mut scheduler = env.mock_compaction_scheduler(tx);
1578        let mut builder = VersionControlBuilder::new();
1579        let purger = builder.file_purger();
1580        let region_id = builder.region_id();
1581
1582        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1583        schema_metadata_manager
1584            .register_region_table_info(
1585                builder.region_id().table_id(),
1586                "test_table",
1587                "test_catalog",
1588                "test_schema",
1589                None,
1590                kv_backend,
1591            )
1592            .await;
1593
1594        // 5 files to compact.
1595        let end = 1000 * 1000;
1596        let version_control = Arc::new(
1597            builder
1598                .push_l0_file(0, end)
1599                .push_l0_file(10, end)
1600                .push_l0_file(50, end)
1601                .push_l0_file(80, end)
1602                .push_l0_file(90, end)
1603                .build(),
1604        );
1605        let manifest_ctx = env
1606            .mock_manifest_context(version_control.current().version.metadata.clone())
1607            .await;
1608
1609        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1610            .files
1611            .values()
1612            .map(|file| file.meta_ref().clone())
1613            .collect();
1614
1615        // 5 files for next compaction and removes old files.
1616        apply_edit(
1617            &version_control,
1618            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1619            &file_metas,
1620            purger.clone(),
1621        );
1622
1623        scheduler
1624            .schedule_compaction(
1625                region_id,
1626                compact_request::Options::Regular(Default::default()),
1627                &version_control,
1628                &env.access_layer,
1629                OptionOutputTx::none(),
1630                &manifest_ctx,
1631                schema_metadata_manager.clone(),
1632                1,
1633            )
1634            .await
1635            .unwrap();
1636        // Should schedule 1 compaction.
1637        assert_eq!(1, scheduler.region_status.len());
1638        assert_eq!(1, job_scheduler.num_jobs());
1639        assert!(
1640            scheduler
1641                .region_status
1642                .get(&region_id)
1643                .unwrap()
1644                .pending_request
1645                .is_none()
1646        );
1647
1648        // Schedule another manual compaction.
1649        let (tx, _rx) = oneshot::channel();
1650        scheduler
1651            .schedule_compaction(
1652                region_id,
1653                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1654                &version_control,
1655                &env.access_layer,
1656                OptionOutputTx::new(Some(OutputTx::new(tx))),
1657                &manifest_ctx,
1658                schema_metadata_manager.clone(),
1659                1,
1660            )
1661            .await
1662            .unwrap();
1663        assert_eq!(1, scheduler.region_status.len());
1664        // Current job num should be 1 since compaction is in progress.
1665        assert_eq!(1, job_scheduler.num_jobs());
1666        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1667        assert!(status.pending_request.is_some());
1668
1669        // On compaction finished and schedule next compaction.
1670        scheduler
1671            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1672            .await;
1673        assert_eq!(1, scheduler.region_status.len());
1674        assert_eq!(2, job_scheduler.num_jobs());
1675
1676        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1677        assert!(status.pending_request.is_none());
1678    }
1679
1680    #[tokio::test]
1681    async fn test_compaction_bypass_in_staging_mode() {
1682        let env = SchedulerEnv::new().await;
1683        let (tx, _rx) = mpsc::channel(4);
1684        let mut scheduler = env.mock_compaction_scheduler(tx);
1685
1686        // Create version control and manifest context for staging mode
1687        let builder = VersionControlBuilder::new();
1688        let version_control = Arc::new(builder.build());
1689        let region_id = version_control.current().version.metadata.region_id;
1690
1691        // Create staging manifest context using the same pattern as SchedulerEnv
1692        let staging_manifest_ctx = {
1693            let manager = RegionManifestManager::new(
1694                version_control.current().version.metadata.clone(),
1695                0,
1696                RegionManifestOptions {
1697                    manifest_dir: "".to_string(),
1698                    object_store: env.access_layer.object_store().clone(),
1699                    compress_type: CompressionType::Uncompressed,
1700                    checkpoint_distance: 10,
1701                    remove_file_options: Default::default(),
1702                    manifest_cache: None,
1703                },
1704                FormatType::PrimaryKey,
1705                &Default::default(),
1706            )
1707            .await
1708            .unwrap();
1709            Arc::new(ManifestContext::new(
1710                manager,
1711                RegionRoleState::Leader(RegionLeaderState::Staging),
1712            ))
1713        };
1714
1715        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1716
1717        // Test regular compaction bypass in staging mode
1718        let (tx, rx) = oneshot::channel();
1719        scheduler
1720            .schedule_compaction(
1721                region_id,
1722                compact_request::Options::Regular(Default::default()),
1723                &version_control,
1724                &env.access_layer,
1725                OptionOutputTx::new(Some(OutputTx::new(tx))),
1726                &staging_manifest_ctx,
1727                schema_metadata_manager,
1728                1,
1729            )
1730            .await
1731            .unwrap();
1732
1733        let result = rx.await.unwrap();
1734        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1735        assert_eq!(0, scheduler.region_status.len());
1736    }
1737
1738    #[tokio::test]
1739    async fn test_add_ddl_request_to_pending() {
1740        let env = SchedulerEnv::new().await;
1741        let (tx, _rx) = mpsc::channel(4);
1742        let mut scheduler = env.mock_compaction_scheduler(tx);
1743        let builder = VersionControlBuilder::new();
1744        let version_control = Arc::new(builder.build());
1745        let region_id = builder.region_id();
1746
1747        scheduler.region_status.insert(
1748            region_id,
1749            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1750        );
1751        scheduler
1752            .region_status
1753            .get_mut(&region_id)
1754            .unwrap()
1755            .start_local_task();
1756
1757        let (output_tx, _output_rx) = oneshot::channel();
1758        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1759            region_id,
1760            sender: OptionOutputTx::from(output_tx),
1761            request: crate::request::DdlRequest::EnterStaging(
1762                store_api::region_request::EnterStagingRequest {
1763                    partition_directive:
1764                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1765                },
1766            ),
1767        });
1768
1769        assert!(scheduler.has_pending_ddls(region_id));
1770    }
1771
1772    #[tokio::test]
1773    async fn test_request_cancel_state_transitions() {
1774        let env = SchedulerEnv::new().await;
1775        let builder = VersionControlBuilder::new();
1776        let region_id = builder.region_id();
1777        let version_control = Arc::new(builder.build());
1778        let mut status =
1779            CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1780        let state = status.start_local_task();
1781
1782        assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
1783        assert!(state.cancel_handle().is_cancelled());
1784        assert_eq!(
1785            status.request_cancel(),
1786            RequestCancelResult::AlreadyCancelling
1787        );
1788
1789        assert!(!state.mark_commit_started());
1790        assert_eq!(
1791            status.request_cancel(),
1792            RequestCancelResult::AlreadyCancelling
1793        );
1794
1795        assert!(status.clear_running_task());
1796        assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
1797    }
1798
1799    #[tokio::test]
1800    async fn test_request_cancel_remote_compaction_is_too_late() {
1801        let env = SchedulerEnv::new().await;
1802        let builder = VersionControlBuilder::new();
1803        let region_id = builder.region_id();
1804        let version_control = Arc::new(builder.build());
1805        let mut status =
1806            CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1807
1808        status.start_remote_task();
1809
1810        assert_eq!(
1811            status.request_cancel(),
1812            RequestCancelResult::TooLateToCancel
1813        );
1814        assert!(status.active_compaction.is_some());
1815    }
1816
1817    #[tokio::test]
1818    async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
1819        let job_scheduler = Arc::new(VecScheduler::default());
1820        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1821        let (tx, _rx) = mpsc::channel(4);
1822        let mut scheduler = env.mock_compaction_scheduler(tx);
1823        let builder = VersionControlBuilder::new();
1824        let version_control = Arc::new(builder.build());
1825        let region_id = builder.region_id();
1826        let _manifest_ctx = env
1827            .mock_manifest_context(version_control.current().version.metadata.clone())
1828            .await;
1829        let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1830
1831        scheduler.region_status.insert(
1832            region_id,
1833            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1834        );
1835        scheduler
1836            .region_status
1837            .get_mut(&region_id)
1838            .unwrap()
1839            .start_local_task();
1840
1841        let (output_tx, _output_rx) = oneshot::channel();
1842        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1843            region_id,
1844            sender: OptionOutputTx::from(output_tx),
1845            request: crate::request::DdlRequest::EnterStaging(
1846                store_api::region_request::EnterStagingRequest {
1847                    partition_directive:
1848                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1849                },
1850            ),
1851        });
1852
1853        let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
1854
1855        assert_eq!(pending_ddls.len(), 1);
1856        assert!(!scheduler.has_pending_ddls(region_id));
1857        assert!(!scheduler.region_status.contains_key(&region_id));
1858        assert_eq!(job_scheduler.num_jobs(), 0);
1859    }
1860
1861    #[tokio::test]
1862    async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
1863        let job_scheduler = Arc::new(VecScheduler::default());
1864        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1865        let (tx, _rx) = mpsc::channel(4);
1866        let mut scheduler = env.mock_compaction_scheduler(tx);
1867        let builder = VersionControlBuilder::new();
1868        let version_control = Arc::new(builder.build());
1869        let region_id = builder.region_id();
1870        let _manifest_ctx = env
1871            .mock_manifest_context(version_control.current().version.metadata.clone())
1872            .await;
1873        let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1874
1875        scheduler.region_status.insert(
1876            region_id,
1877            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1878        );
1879        let status = scheduler.region_status.get_mut(&region_id).unwrap();
1880        status.start_local_task();
1881        let (manual_tx, manual_rx) = oneshot::channel();
1882        status.set_pending_request(PendingCompaction {
1883            options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1884            waiter: OptionOutputTx::from(manual_tx),
1885            max_parallelism: 1,
1886        });
1887
1888        let (output_tx, _output_rx) = oneshot::channel();
1889        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1890            region_id,
1891            sender: OptionOutputTx::from(output_tx),
1892            request: crate::request::DdlRequest::EnterStaging(
1893                store_api::region_request::EnterStagingRequest {
1894                    partition_directive:
1895                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1896                },
1897            ),
1898        });
1899
1900        let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
1901
1902        assert_eq!(pending_ddls.len(), 1);
1903        assert!(!scheduler.region_status.contains_key(&region_id));
1904        assert_eq!(job_scheduler.num_jobs(), 0);
1905        assert_matches!(manual_rx.await.unwrap(), Err(_));
1906    }
1907
1908    #[tokio::test]
1909    async fn test_pending_ddl_request_failed_on_compaction_failed() {
1910        let env = SchedulerEnv::new().await;
1911        let (tx, _rx) = mpsc::channel(4);
1912        let mut scheduler = env.mock_compaction_scheduler(tx);
1913        let builder = VersionControlBuilder::new();
1914        let version_control = Arc::new(builder.build());
1915        let region_id = builder.region_id();
1916
1917        scheduler.region_status.insert(
1918            region_id,
1919            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1920        );
1921
1922        let (output_tx, output_rx) = oneshot::channel();
1923        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1924            region_id,
1925            sender: OptionOutputTx::from(output_tx),
1926            request: crate::request::DdlRequest::EnterStaging(
1927                store_api::region_request::EnterStagingRequest {
1928                    partition_directive:
1929                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1930                },
1931            ),
1932        });
1933
1934        assert!(scheduler.has_pending_ddls(region_id));
1935        scheduler
1936            .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1937
1938        assert!(!scheduler.has_pending_ddls(region_id));
1939        let result = output_rx.await.unwrap();
1940        assert_matches!(result, Err(_));
1941    }
1942
1943    #[tokio::test]
1944    async fn test_pending_ddl_request_failed_on_region_closed() {
1945        let env = SchedulerEnv::new().await;
1946        let (tx, _rx) = mpsc::channel(4);
1947        let mut scheduler = env.mock_compaction_scheduler(tx);
1948        let builder = VersionControlBuilder::new();
1949        let version_control = Arc::new(builder.build());
1950        let region_id = builder.region_id();
1951
1952        scheduler.region_status.insert(
1953            region_id,
1954            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1955        );
1956
1957        let (output_tx, output_rx) = oneshot::channel();
1958        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1959            region_id,
1960            sender: OptionOutputTx::from(output_tx),
1961            request: crate::request::DdlRequest::EnterStaging(
1962                store_api::region_request::EnterStagingRequest {
1963                    partition_directive:
1964                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1965                },
1966            ),
1967        });
1968
1969        assert!(scheduler.has_pending_ddls(region_id));
1970        scheduler.on_region_closed(region_id);
1971
1972        assert!(!scheduler.has_pending_ddls(region_id));
1973        let result = output_rx.await.unwrap();
1974        assert_matches!(result, Err(_));
1975    }
1976
1977    #[tokio::test]
1978    async fn test_pending_ddl_request_failed_on_region_dropped() {
1979        let env = SchedulerEnv::new().await;
1980        let (tx, _rx) = mpsc::channel(4);
1981        let mut scheduler = env.mock_compaction_scheduler(tx);
1982        let builder = VersionControlBuilder::new();
1983        let version_control = Arc::new(builder.build());
1984        let region_id = builder.region_id();
1985
1986        scheduler.region_status.insert(
1987            region_id,
1988            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1989        );
1990
1991        let (output_tx, output_rx) = oneshot::channel();
1992        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1993            region_id,
1994            sender: OptionOutputTx::from(output_tx),
1995            request: crate::request::DdlRequest::EnterStaging(
1996                store_api::region_request::EnterStagingRequest {
1997                    partition_directive:
1998                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1999                },
2000            ),
2001        });
2002
2003        assert!(scheduler.has_pending_ddls(region_id));
2004        scheduler.on_region_dropped(region_id);
2005
2006        assert!(!scheduler.has_pending_ddls(region_id));
2007        let result = output_rx.await.unwrap();
2008        assert_matches!(result, Err(_));
2009    }
2010
2011    #[tokio::test]
2012    async fn test_pending_ddl_request_failed_on_region_truncated() {
2013        let env = SchedulerEnv::new().await;
2014        let (tx, _rx) = mpsc::channel(4);
2015        let mut scheduler = env.mock_compaction_scheduler(tx);
2016        let builder = VersionControlBuilder::new();
2017        let version_control = Arc::new(builder.build());
2018        let region_id = builder.region_id();
2019
2020        scheduler.region_status.insert(
2021            region_id,
2022            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2023        );
2024
2025        let (output_tx, output_rx) = oneshot::channel();
2026        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2027            region_id,
2028            sender: OptionOutputTx::from(output_tx),
2029            request: crate::request::DdlRequest::EnterStaging(
2030                store_api::region_request::EnterStagingRequest {
2031                    partition_directive:
2032                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2033                },
2034            ),
2035        });
2036
2037        assert!(scheduler.has_pending_ddls(region_id));
2038        scheduler.on_region_truncated(region_id);
2039
2040        assert!(!scheduler.has_pending_ddls(region_id));
2041        let result = output_rx.await.unwrap();
2042        assert_matches!(result, Err(_));
2043    }
2044
2045    #[tokio::test]
2046    async fn test_on_compaction_finished_returns_pending_ddl_requests() {
2047        let job_scheduler = Arc::new(VecScheduler::default());
2048        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2049        let (tx, _rx) = mpsc::channel(4);
2050        let mut scheduler = env.mock_compaction_scheduler(tx);
2051        let builder = VersionControlBuilder::new();
2052        let version_control = Arc::new(builder.build());
2053        let region_id = builder.region_id();
2054        let manifest_ctx = env
2055            .mock_manifest_context(version_control.current().version.metadata.clone())
2056            .await;
2057        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2058
2059        scheduler.region_status.insert(
2060            region_id,
2061            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2062        );
2063        scheduler
2064            .region_status
2065            .get_mut(&region_id)
2066            .unwrap()
2067            .start_local_task();
2068
2069        let (output_tx, _output_rx) = oneshot::channel();
2070        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2071            region_id,
2072            sender: OptionOutputTx::from(output_tx),
2073            request: crate::request::DdlRequest::EnterStaging(
2074                store_api::region_request::EnterStagingRequest {
2075                    partition_directive:
2076                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2077                },
2078            ),
2079        });
2080
2081        let pending_ddls = scheduler
2082            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2083            .await;
2084
2085        assert_eq!(pending_ddls.len(), 1);
2086        assert!(!scheduler.has_pending_ddls(region_id));
2087        assert!(!scheduler.region_status.contains_key(&region_id));
2088        assert_eq!(job_scheduler.num_jobs(), 0);
2089    }
2090
2091    #[tokio::test]
2092    async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
2093        let env = SchedulerEnv::new().await;
2094        let (tx, _rx) = mpsc::channel(4);
2095        let mut scheduler = env.mock_compaction_scheduler(tx);
2096        let builder = VersionControlBuilder::new();
2097        let version_control = Arc::new(builder.build());
2098        let region_id = builder.region_id();
2099        let manifest_ctx = env
2100            .mock_manifest_context(version_control.current().version.metadata.clone())
2101            .await;
2102        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2103
2104        let (manual_tx, manual_rx) = oneshot::channel();
2105        let mut status =
2106            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2107        status.start_local_task();
2108        status.set_pending_request(PendingCompaction {
2109            options: compact_request::Options::Regular(Default::default()),
2110            waiter: OptionOutputTx::from(manual_tx),
2111            max_parallelism: 1,
2112        });
2113        scheduler.region_status.insert(region_id, status);
2114
2115        let (ddl_tx, _ddl_rx) = oneshot::channel();
2116        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2117            region_id,
2118            sender: OptionOutputTx::from(ddl_tx),
2119            request: crate::request::DdlRequest::EnterStaging(
2120                store_api::region_request::EnterStagingRequest {
2121                    partition_directive:
2122                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2123                },
2124            ),
2125        });
2126
2127        let pending_ddls = scheduler
2128            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2129            .await;
2130
2131        assert_eq!(pending_ddls.len(), 1);
2132        assert!(!scheduler.region_status.contains_key(&region_id));
2133        assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
2134    }
2135
2136    #[tokio::test]
2137    async fn test_on_compaction_finished_returns_empty_when_region_absent() {
2138        let env = SchedulerEnv::new().await;
2139        let (tx, _rx) = mpsc::channel(4);
2140        let mut scheduler = env.mock_compaction_scheduler(tx);
2141        let builder = VersionControlBuilder::new();
2142        let region_id = builder.region_id();
2143        let version_control = Arc::new(builder.build());
2144        let manifest_ctx = env
2145            .mock_manifest_context(version_control.current().version.metadata.clone())
2146            .await;
2147        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2148
2149        let pending_ddls = scheduler
2150            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2151            .await;
2152
2153        assert!(pending_ddls.is_empty());
2154    }
2155
2156    #[tokio::test]
2157    async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
2158        let env = SchedulerEnv::new()
2159            .await
2160            .scheduler(Arc::new(FailingScheduler));
2161        let (tx, _rx) = mpsc::channel(4);
2162        let mut scheduler = env.mock_compaction_scheduler(tx);
2163        let mut builder = VersionControlBuilder::new();
2164        let end = 1000 * 1000;
2165        let version_control = Arc::new(
2166            builder
2167                .push_l0_file(0, end)
2168                .push_l0_file(10, end)
2169                .push_l0_file(50, end)
2170                .push_l0_file(80, end)
2171                .push_l0_file(90, end)
2172                .build(),
2173        );
2174        let region_id = builder.region_id();
2175        let manifest_ctx = env
2176            .mock_manifest_context(version_control.current().version.metadata.clone())
2177            .await;
2178        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2179
2180        let (manual_tx, manual_rx) = oneshot::channel();
2181        let mut status =
2182            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2183        status.start_local_task();
2184        status.set_pending_request(PendingCompaction {
2185            options: compact_request::Options::Regular(Default::default()),
2186            waiter: OptionOutputTx::from(manual_tx),
2187            max_parallelism: 1,
2188        });
2189        scheduler.region_status.insert(region_id, status);
2190
2191        let (ddl_tx, ddl_rx) = oneshot::channel();
2192        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2193            region_id,
2194            sender: OptionOutputTx::from(ddl_tx),
2195            request: crate::request::DdlRequest::EnterStaging(
2196                store_api::region_request::EnterStagingRequest {
2197                    partition_directive:
2198                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2199                },
2200            ),
2201        });
2202
2203        let pending_ddls = scheduler
2204            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2205            .await;
2206
2207        assert!(pending_ddls.is_empty());
2208        assert!(!scheduler.region_status.contains_key(&region_id));
2209        assert!(manual_rx.await.is_err());
2210        assert_matches!(ddl_rx.await.unwrap(), Err(_));
2211    }
2212
2213    #[tokio::test]
2214    async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
2215        let env = SchedulerEnv::new().await;
2216        let (tx, _rx) = mpsc::channel(4);
2217        let mut scheduler = env.mock_compaction_scheduler(tx);
2218        let builder = VersionControlBuilder::new();
2219        let version_control = Arc::new(builder.build());
2220        let region_id = builder.region_id();
2221        let manifest_ctx = env
2222            .mock_manifest_context(version_control.current().version.metadata.clone())
2223            .await;
2224        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2225
2226        scheduler.region_status.insert(
2227            region_id,
2228            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2229        );
2230        scheduler
2231            .region_status
2232            .get_mut(&region_id)
2233            .unwrap()
2234            .start_local_task();
2235
2236        let pending_ddls = scheduler
2237            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2238            .await;
2239
2240        assert!(pending_ddls.is_empty());
2241        assert!(!scheduler.region_status.contains_key(&region_id));
2242    }
2243
2244    #[tokio::test]
2245    async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
2246        let env = SchedulerEnv::new()
2247            .await
2248            .scheduler(Arc::new(FailingScheduler));
2249        let (tx, _rx) = mpsc::channel(4);
2250        let mut scheduler = env.mock_compaction_scheduler(tx);
2251        let mut builder = VersionControlBuilder::new();
2252        let end = 1000 * 1000;
2253        let version_control = Arc::new(
2254            builder
2255                .push_l0_file(0, end)
2256                .push_l0_file(10, end)
2257                .push_l0_file(50, end)
2258                .push_l0_file(80, end)
2259                .push_l0_file(90, end)
2260                .build(),
2261        );
2262        let region_id = builder.region_id();
2263        let manifest_ctx = env
2264            .mock_manifest_context(version_control.current().version.metadata.clone())
2265            .await;
2266        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2267
2268        scheduler.region_status.insert(
2269            region_id,
2270            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2271        );
2272        scheduler
2273            .region_status
2274            .get_mut(&region_id)
2275            .unwrap()
2276            .start_local_task();
2277
2278        let pending_ddls = scheduler
2279            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2280            .await;
2281
2282        assert!(pending_ddls.is_empty());
2283        assert!(!scheduler.region_status.contains_key(&region_id));
2284    }
2285
2286    #[tokio::test]
2287    async fn test_concurrent_memory_competition() {
2288        let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
2289        let barrier = Arc::new(Barrier::new(3));
2290        let mut handles = vec![];
2291
2292        // Spawn 3 tasks competing for memory, each trying to acquire 2MB
2293        for _i in 0..3 {
2294            let mgr = manager.clone();
2295            let bar = barrier.clone();
2296            let handle = tokio::spawn(async move {
2297                bar.wait().await; // Synchronize start
2298                mgr.try_acquire(2 * 1024 * 1024)
2299            });
2300            handles.push(handle);
2301        }
2302
2303        let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
2304            .await
2305            .into_iter()
2306            .map(|r| r.unwrap())
2307            .collect();
2308
2309        // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
2310        let succeeded = results.iter().filter(|r| r.is_some()).count();
2311        let failed = results.iter().filter(|r| r.is_none()).count();
2312
2313        assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
2314        assert_eq!(failed, 2, "Expected 2 tasks to fail");
2315
2316        // Clean up
2317        drop(results);
2318        assert_eq!(manager.used_bytes(), 0);
2319    }
2320}