Skip to main content

mito2/
compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::{Arc, Mutex};
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use arrow_schema::Schema;
33use common_base::Plugins;
34use common_base::cancellation::CancellationHandle;
35use common_memory_manager::OnExhaustedPolicy;
36use common_meta::key::SchemaMetadataManagerRef;
37use common_telemetry::{debug, error, info, warn};
38use common_time::range::TimestampRange;
39use common_time::timestamp::TimeUnit;
40use common_time::{TimeToLive, Timestamp};
41use datafusion_common::ScalarValue;
42use datafusion_expr::Expr;
43use datatypes::extension::json::is_json_extension_type;
44use datatypes::schema::ext::ArrowSchemaExt;
45use datatypes::types::json_type::JsonNativeType;
46use parquet::arrow::parquet_to_arrow_schema;
47use parquet::file::metadata::PageIndexPolicy;
48use serde::{Deserialize, Serialize};
49use snafu::{OptionExt, ResultExt};
50use store_api::metadata::RegionMetadataRef;
51use store_api::storage::RegionId;
52use task::MAX_PARALLEL_COMPACTION;
53use tokio::sync::mpsc::{self, Sender};
54
55use crate::access_layer::AccessLayerRef;
56use crate::cache::{CacheManagerRef, CacheStrategy};
57use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
58use crate::compaction::memory_manager::CompactionMemoryManager;
59use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
60use crate::compaction::task::CompactionTaskImpl;
61use crate::config::MitoConfig;
62use crate::error::{
63    CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error,
64    GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, ParquetToArrowSchemaSnafu,
65    RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
66    TimeRangePredicateOverflowSnafu, TimeoutSnafu,
67};
68use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
69use crate::read::FlatSource;
70use crate::read::flat_projection::FlatProjectionMapper;
71use crate::read::read_columns::ReadColumns;
72use crate::read::scan_region::{PredicateGroup, ScanInput};
73use crate::read::seq_scan::SeqScan;
74use crate::region::options::{MergeMode, RegionOptions};
75use crate::region::version::VersionControlRef;
76use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
77use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
78use crate::schedule::remote_job_scheduler::{
79    CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
80};
81use crate::schedule::scheduler::SchedulerRef;
82use crate::sst::file::{FileHandle, FileMeta, Level};
83use crate::sst::parquet::reader::MetadataCacheMetrics;
84use crate::sst::version::LevelMeta;
85use crate::worker::WorkerListener;
86
87/// Region compaction request.
88pub struct CompactionRequest {
89    pub(crate) engine_config: Arc<MitoConfig>,
90    pub(crate) current_version: CompactionVersion,
91    pub(crate) access_layer: AccessLayerRef,
92    /// Sender to send notification to the region worker.
93    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
94    /// Waiters of the compaction request.
95    pub(crate) waiters: Vec<OutputTx>,
96    /// Start time of compaction task.
97    pub(crate) start_time: Instant,
98    pub(crate) cache_manager: CacheManagerRef,
99    pub(crate) manifest_ctx: ManifestContextRef,
100    pub(crate) listener: WorkerListener,
101    pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
102    pub(crate) max_parallelism: usize,
103}
104
105impl CompactionRequest {
106    pub(crate) fn region_id(&self) -> RegionId {
107        self.current_version.metadata.region_id
108    }
109}
110
111/// Compaction scheduler tracks and manages compaction tasks.
112pub(crate) struct CompactionScheduler {
113    scheduler: SchedulerRef,
114    /// Compacting regions.
115    region_status: HashMap<RegionId, CompactionStatus>,
116    /// Request sender of the worker that this scheduler belongs to.
117    request_sender: Sender<WorkerRequestWithTime>,
118    cache_manager: CacheManagerRef,
119    engine_config: Arc<MitoConfig>,
120    memory_manager: Arc<CompactionMemoryManager>,
121    memory_policy: OnExhaustedPolicy,
122    listener: WorkerListener,
123    /// Plugins for the compaction scheduler.
124    plugins: Plugins,
125}
126
127impl CompactionScheduler {
128    #[allow(clippy::too_many_arguments)]
129    pub(crate) fn new(
130        scheduler: SchedulerRef,
131        request_sender: Sender<WorkerRequestWithTime>,
132        cache_manager: CacheManagerRef,
133        engine_config: Arc<MitoConfig>,
134        listener: WorkerListener,
135        plugins: Plugins,
136        memory_manager: Arc<CompactionMemoryManager>,
137        memory_policy: OnExhaustedPolicy,
138    ) -> Self {
139        Self {
140            scheduler,
141            region_status: HashMap::new(),
142            request_sender,
143            cache_manager,
144            engine_config,
145            memory_manager,
146            memory_policy,
147            listener,
148            plugins,
149        }
150    }
151
152    /// Schedules a compaction for the region.
153    /// Returns whether a compaction is scheduled.
154    #[allow(clippy::too_many_arguments)]
155    pub(crate) async fn schedule_compaction(
156        &mut self,
157        region_id: RegionId,
158        compact_options: compact_request::Options,
159        version_control: &VersionControlRef,
160        access_layer: &AccessLayerRef,
161        waiter: OptionOutputTx,
162        manifest_ctx: &ManifestContextRef,
163        schema_metadata_manager: SchemaMetadataManagerRef,
164        max_parallelism: usize,
165    ) -> Result<bool> {
166        // skip compaction if region is in staging state
167        let current_state = manifest_ctx.current_state();
168        if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
169            info!(
170                "Skipping compaction for region {} in staging mode, options: {:?}",
171                region_id, compact_options
172            );
173            waiter.send(Ok(0));
174            return Ok(false);
175        }
176
177        if let Some(status) = self.region_status.get_mut(&region_id) {
178            match compact_options {
179                Options::Regular(_) => {
180                    // Region is compacting. Add the waiter to pending list.
181                    status.merge_waiter(waiter);
182                }
183                options @ Options::StrictWindow(_) => {
184                    // Incoming compaction request is manually triggered.
185                    status.set_pending_request(PendingCompaction {
186                        options,
187                        waiter,
188                        max_parallelism,
189                    });
190                    info!(
191                        "Region {} is compacting, manually compaction will be re-scheduled.",
192                        region_id
193                    );
194                }
195            }
196            return Ok(false);
197        }
198
199        // The region can compact directly.
200        let mut status =
201            CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
202        let request = status.new_compaction_request(
203            self.request_sender.clone(),
204            waiter,
205            self.engine_config.clone(),
206            self.cache_manager.clone(),
207            manifest_ctx,
208            self.listener.clone(),
209            schema_metadata_manager,
210            max_parallelism,
211        );
212
213        match self
214            .schedule_compaction_request(request, compact_options)
215            .await
216        {
217            Ok(Some(active_compaction)) => {
218                // Publish CompactionStatus only after a task has been accepted by the scheduler.
219                // This avoids exposing a half-initialized region status that could collect pending
220                // DDL/compaction state even though no compaction is actually running.
221                status.active_compaction = Some(active_compaction);
222                self.region_status.insert(region_id, status);
223
224                self.listener.on_compaction_scheduled(region_id);
225                Ok(true)
226            }
227            Ok(None) => Ok(false),
228            Err(e) => Err(e),
229        }
230    }
231
232    // Handle pending manual compaction request for the region.
233    //
234    // Returns true if should early return, false otherwise.
235    pub(crate) async fn handle_pending_compaction_request(
236        &mut self,
237        region_id: RegionId,
238        manifest_ctx: &ManifestContextRef,
239        schema_metadata_manager: SchemaMetadataManagerRef,
240    ) -> bool {
241        let Some(status) = self.region_status.get_mut(&region_id) else {
242            return true;
243        };
244
245        // If there is a pending manual compaction request, schedule it.
246        // and defer returning the pending DDL requests to the caller.
247        let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
248            return false;
249        };
250
251        let PendingCompaction {
252            options,
253            waiter,
254            max_parallelism,
255        } = pending_request;
256
257        let request = {
258            status.new_compaction_request(
259                self.request_sender.clone(),
260                waiter,
261                self.engine_config.clone(),
262                self.cache_manager.clone(),
263                manifest_ctx,
264                self.listener.clone(),
265                schema_metadata_manager,
266                max_parallelism,
267            )
268        };
269
270        match self.schedule_compaction_request(request, options).await {
271            Ok(Some(active_compaction)) => {
272                let status = self.region_status.get_mut(&region_id).unwrap();
273                status.active_compaction = Some(active_compaction);
274                debug!(
275                    "Successfully scheduled manual compaction for region id: {}",
276                    region_id
277                );
278                true
279            }
280            Ok(None) => {
281                // We still need to handle the pending DDL requests.
282                // So we can't return early here.
283                false
284            }
285            Err(e) => {
286                error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
287                self.remove_region_on_failure(region_id, Arc::new(e));
288                true
289            }
290        }
291    }
292
293    /// Notifies the scheduler that the compaction job is finished successfully.
294    pub(crate) async fn on_compaction_finished(
295        &mut self,
296        region_id: RegionId,
297        manifest_ctx: &ManifestContextRef,
298        schema_metadata_manager: SchemaMetadataManagerRef,
299    ) -> Vec<SenderDdlRequest> {
300        let Some(status) = self.region_status.get_mut(&region_id) else {
301            return Vec::new();
302        };
303        status.clear_running_task();
304
305        // If there a pending compaction request, handle it first
306        // and defer returning the pending DDL requests to the caller.
307        if self
308            .handle_pending_compaction_request(
309                region_id,
310                manifest_ctx,
311                schema_metadata_manager.clone(),
312            )
313            .await
314        {
315            return Vec::new();
316        }
317
318        let Some(status) = self.region_status.get_mut(&region_id) else {
319            // The region status might be removed by the previous steps.
320            // So we return empty DDL requests.
321            return Vec::new();
322        };
323
324        for waiter in std::mem::take(&mut status.waiters) {
325            waiter.send(Ok(0));
326        }
327
328        // If there are pending DDL requests, run them.
329        let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
330        if !pending_ddl_requests.is_empty() {
331            self.region_status.remove(&region_id);
332            // If there are pending DDL requests, we should return them to the caller.
333            // And skip try to schedule next compaction task.
334            return pending_ddl_requests;
335        }
336        Vec::new()
337    }
338
339    pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
340        self.region_status
341            .get(&region_id)
342            .map(|status| status.active_compaction.is_some())
343            .unwrap_or(false)
344    }
345
346    /// Schedules next compaction upon a finished compaction.
347    /// Returns whether the compaction is scheduled.
348    pub(crate) async fn schedule_next_compaction(
349        &mut self,
350        region_id: RegionId,
351        manifest_ctx: &ManifestContextRef,
352        schema_metadata_manager: SchemaMetadataManagerRef,
353    ) -> bool {
354        let Some(status) = self.region_status.get_mut(&region_id) else {
355            return false;
356        };
357
358        // We should always try to compact the region until picker returns None.
359        let request = status.new_compaction_request(
360            self.request_sender.clone(),
361            OptionOutputTx::none(),
362            self.engine_config.clone(),
363            self.cache_manager.clone(),
364            manifest_ctx,
365            self.listener.clone(),
366            schema_metadata_manager,
367            MAX_PARALLEL_COMPACTION,
368        );
369
370        // Try to schedule next compaction task for this region.
371        match self
372            .schedule_compaction_request(
373                request,
374                compact_request::Options::Regular(Default::default()),
375            )
376            .await
377        {
378            Ok(Some(active_compaction)) => {
379                self.region_status
380                    .get_mut(&region_id)
381                    .unwrap()
382                    .active_compaction = Some(active_compaction);
383                debug!(
384                    "Successfully scheduled next compaction for region id: {}",
385                    region_id
386                );
387                true
388            }
389            Ok(None) => {
390                // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region.
391                // All DDL requests and pending compaction requests have already been processed.
392                // Safe to remove the region from status tracking.
393                self.region_status.remove(&region_id);
394                false
395            }
396            Err(e) => {
397                error!(e; "Failed to schedule next compaction for region {}", region_id);
398                self.remove_region_on_failure(region_id, Arc::new(e));
399                false
400            }
401        }
402    }
403
404    /// Notifies the scheduler that the compaction job is cancelled cooperatively.
405    pub(crate) async fn on_compaction_cancelled(
406        &mut self,
407        region_id: RegionId,
408    ) -> Vec<SenderDdlRequest> {
409        self.remove_region_on_cancel(region_id)
410    }
411
412    /// Notifies the scheduler that the compaction job is failed.
413    pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
414        error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
415        self.remove_region_on_failure(region_id, err);
416    }
417
418    /// Notifies the scheduler that the region is dropped.
419    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
420        self.remove_region_on_failure(
421            region_id,
422            Arc::new(RegionDroppedSnafu { region_id }.build()),
423        );
424    }
425
426    /// Notifies the scheduler that the region is closed.
427    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
428        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
429    }
430
431    /// Notifies the scheduler that the region is truncated.
432    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
433        self.remove_region_on_failure(
434            region_id,
435            Arc::new(RegionTruncatedSnafu { region_id }.build()),
436        );
437    }
438
439    /// Add ddl request to pending queue.
440    ///
441    /// # Panics
442    /// Panics if region didn't request compaction.
443    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
444        debug!(
445            "Added pending DDL request for region: {}, ddl: {:?}",
446            request.region_id, request.request
447        );
448        let status = self.region_status.get_mut(&request.region_id).unwrap();
449        status.pending_ddl_requests.push(request);
450    }
451
452    #[cfg(test)]
453    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
454        let has_pending = self
455            .region_status
456            .get(&region_id)
457            .map(|status| !status.pending_ddl_requests.is_empty())
458            .unwrap_or(false);
459        debug!(
460            "Checked pending DDL requests for region: {}, has_pending: {}",
461            region_id, has_pending
462        );
463        has_pending
464    }
465
466    pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
467        let Some(status) = self.region_status.get_mut(&region_id) else {
468            return RequestCancelResult::NotRunning;
469        };
470
471        status.request_cancel()
472    }
473
474    /// Schedules a compaction request.
475    ///
476    /// Returns the active compaction state if the request is scheduled successfully.
477    /// Returns `None` if no compaction task can be scheduled for this region.
478    async fn schedule_compaction_request(
479        &mut self,
480        request: CompactionRequest,
481        options: compact_request::Options,
482    ) -> Result<Option<ActiveCompaction>> {
483        let region_id = request.region_id();
484        let (dynamic_compaction_opts, ttl) = find_dynamic_options(
485            region_id,
486            &request.current_version.options,
487            &request.schema_metadata_manager,
488        )
489        .await
490        .unwrap_or_else(|e| {
491            warn!(e; "Failed to find dynamic options for region: {}", region_id);
492            (
493                request.current_version.options.compaction.clone(),
494                request.current_version.options.ttl.unwrap_or_default(),
495            )
496        });
497
498        let picker = new_picker(
499            &options,
500            &dynamic_compaction_opts,
501            request.current_version.options.append_mode,
502            Some(self.engine_config.max_background_compactions),
503        );
504        let region_id = request.region_id();
505        let CompactionRequest {
506            engine_config,
507            current_version,
508            access_layer,
509            request_sender,
510            waiters,
511            start_time,
512            cache_manager,
513            manifest_ctx,
514            listener,
515            schema_metadata_manager: _,
516            max_parallelism,
517        } = request;
518
519        debug!(
520            "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
521            picker, region_id, ttl
522        );
523
524        let compaction_region = CompactionRegion {
525            region_id,
526            current_version: current_version.clone(),
527            region_options: RegionOptions {
528                compaction: dynamic_compaction_opts.clone(),
529                ..current_version.options.clone()
530            },
531            engine_config: engine_config.clone(),
532            region_metadata: current_version.metadata.clone(),
533            cache_manager: cache_manager.clone(),
534            access_layer: access_layer.clone(),
535            manifest_ctx: manifest_ctx.clone(),
536            file_purger: None,
537            ttl: Some(ttl),
538            max_parallelism,
539        };
540
541        let picker_output = {
542            let _pick_timer = COMPACTION_STAGE_ELAPSED
543                .with_label_values(&["pick"])
544                .start_timer();
545            picker.pick(&compaction_region)
546        };
547
548        let picker_output = if let Some(picker_output) = picker_output {
549            picker_output
550        } else {
551            // Nothing to compact, we are done. Notifies all waiters as we consume the compaction request.
552            for waiter in waiters {
553                waiter.send(Ok(0));
554            }
555            return Ok(None);
556        };
557
558        // If specified to run compaction remotely, we schedule the compaction job remotely.
559        // It will fall back to local compaction if there is no remote job scheduler.
560        let waiters = if dynamic_compaction_opts.remote_compaction() {
561            if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
562                let remote_compaction_job = CompactionJob {
563                    compaction_region: compaction_region.clone(),
564                    picker_output: picker_output.clone(),
565                    start_time,
566                    waiters,
567                    ttl,
568                };
569
570                let result = remote_job_scheduler
571                    .schedule(
572                        RemoteJob::CompactionJob(remote_compaction_job),
573                        Box::new(DefaultNotifier {
574                            request_sender: request_sender.clone(),
575                        }),
576                    )
577                    .await;
578
579                match result {
580                    Ok(job_id) => {
581                        info!(
582                            "Scheduled remote compaction job {} for region {}",
583                            job_id, region_id
584                        );
585                        INFLIGHT_COMPACTION_COUNT.inc();
586                        return Ok(Some(ActiveCompaction::Remote));
587                    }
588                    Err(e) => {
589                        if !dynamic_compaction_opts.fallback_to_local() {
590                            error!(e; "Failed to schedule remote compaction job for region {}", region_id);
591                            return RemoteCompactionSnafu {
592                                region_id,
593                                job_id: None,
594                                reason: e.reason,
595                            }
596                            .fail();
597                        }
598
599                        error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
600
601                        // Return the waiters back to the caller for local compaction.
602                        e.waiters
603                    }
604                }
605            } else {
606                debug!(
607                    "Remote compaction is not enabled, fallback to local compaction for region {}",
608                    region_id
609                );
610                waiters
611            }
612        } else {
613            waiters
614        };
615
616        // Create a local compaction task.
617        let estimated_bytes = estimate_compaction_bytes(&picker_output);
618
619        let cancel_handle = Arc::new(CancellationHandle::default());
620        let state = LocalCompactionState::new(cancel_handle.clone());
621        let local_compaction_task = Box::new(CompactionTaskImpl {
622            state: state.clone(),
623            request_sender,
624            waiters,
625            start_time,
626            listener,
627            picker_output,
628            compaction_region,
629            compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
630            memory_manager: self.memory_manager.clone(),
631            memory_policy: self.memory_policy,
632            estimated_memory_bytes: estimated_bytes,
633        });
634
635        self.submit_compaction_task(local_compaction_task, region_id)
636            .map(|_| Some(ActiveCompaction::Local { state }))
637    }
638
639    fn submit_compaction_task(
640        &mut self,
641        mut task: Box<CompactionTaskImpl>,
642        region_id: RegionId,
643    ) -> Result<()> {
644        self.scheduler
645            .schedule(Box::pin(async move {
646                INFLIGHT_COMPACTION_COUNT.inc();
647                task.run().await;
648                INFLIGHT_COMPACTION_COUNT.dec();
649            }))
650            .inspect_err(
651                |e| error!(e; "Failed to submit compaction request for region {}", region_id),
652            )
653    }
654
655    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
656        // Remove this region.
657        let Some(status) = self.region_status.remove(&region_id) else {
658            return;
659        };
660
661        // Notifies all pending tasks.
662        status.on_failure(err);
663    }
664
665    fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
666        let Some(status) = self.region_status.remove(&region_id) else {
667            return Vec::new();
668        };
669
670        status.on_cancel()
671    }
672}
673
674#[derive(Debug, Clone)]
675pub(crate) struct LocalCompactionState {
676    cancel_handle: Arc<CancellationHandle>,
677    commit_started: Arc<Mutex<bool>>,
678}
679
680#[derive(Debug)]
681enum ActiveCompaction {
682    Local { state: LocalCompactionState },
683    Remote,
684}
685
686impl LocalCompactionState {
687    fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
688        Self {
689            cancel_handle,
690            commit_started: Arc::new(Mutex::new(false)),
691        }
692    }
693
694    /// Returns the cancellation handle for this compaction task.
695    pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
696        self.cancel_handle.clone()
697    }
698
699    /// Marks the compaction task as started to commit,
700    /// which means the compaction task is in the final stage and is about to update region version and manifest.
701    /// It will reject cancellation request after this method is called.
702    ///
703    /// Returns true if this is the first time to mark commit started, false otherwise.
704    pub(crate) fn mark_commit_started(&self) -> bool {
705        let mut commit_started = self.commit_started.lock().unwrap();
706        if self.cancel_handle.is_cancelled() {
707            return false;
708        }
709        *commit_started = true;
710        true
711    }
712
713    /// Request cancellation for this compaction task.
714    pub(crate) fn request_cancel(&self) -> RequestCancelResult {
715        // The cancel handle must under the lock of `commit_started` to avoid racing between cancellation and commit.
716        let commit_started = self.commit_started.lock().unwrap();
717        if *commit_started {
718            return RequestCancelResult::TooLateToCancel;
719        }
720        if self.cancel_handle.is_cancelled() {
721            return RequestCancelResult::AlreadyCancelling;
722        }
723
724        self.cancel_handle.cancel();
725        RequestCancelResult::CancelIssued
726    }
727}
728
729#[derive(Debug, Clone, Copy, PartialEq, Eq)]
730pub(crate) enum RequestCancelResult {
731    CancelIssued,
732    AlreadyCancelling,
733    TooLateToCancel,
734    NotRunning,
735}
736
737impl Drop for CompactionScheduler {
738    fn drop(&mut self) {
739        for (region_id, status) in self.region_status.drain() {
740            // We are shutting down so notify all pending tasks.
741            status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
742        }
743    }
744}
745
746/// Finds compaction options and TTL together with a single metadata fetch to reduce RTT.
747async fn find_dynamic_options(
748    region_id: RegionId,
749    region_options: &crate::region::options::RegionOptions,
750    schema_metadata_manager: &SchemaMetadataManagerRef,
751) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
752    let table_id = region_id.table_id();
753    if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
754        debug!(
755            "Use region options directly for table {}: compaction={:?}, ttl={:?}",
756            table_id, region_options.compaction, region_options.ttl
757        );
758        return Ok((region_options.compaction.clone(), ttl));
759    }
760
761    let db_options = tokio::time::timeout(
762        crate::config::FETCH_OPTION_TIMEOUT,
763        schema_metadata_manager.get_schema_options_by_table_id(table_id),
764    )
765    .await
766    .context(TimeoutSnafu)?
767    .context(GetSchemaMetadataSnafu)?;
768
769    let ttl = if let Some(ttl) = region_options.ttl {
770        debug!(
771            "Use region TTL directly for table {}: ttl={:?}",
772            table_id, region_options.ttl
773        );
774        ttl
775    } else {
776        db_options
777            .as_ref()
778            .and_then(|options| options.ttl)
779            .unwrap_or_default()
780            .into()
781    };
782
783    let compaction = if !region_options.compaction_override {
784        if let Some(schema_opts) = db_options {
785            let map: HashMap<String, String> = schema_opts
786                .extra_options
787                .iter()
788                .filter_map(|(k, v)| {
789                    if k.starts_with("compaction.") {
790                        Some((k.clone(), v.clone()))
791                    } else {
792                        None
793                    }
794                })
795                .collect();
796            if map.is_empty() {
797                region_options.compaction.clone()
798            } else {
799                crate::region::options::RegionOptions::try_from_options(region_id, &map)
800                    .map(|o| o.compaction)
801                    .unwrap_or_else(|e| {
802                        error!(e; "Failed to create RegionOptions from map");
803                        region_options.compaction.clone()
804                    })
805            }
806        } else {
807            debug!(
808                "DB options is None for table {}, use region compaction: compaction={:?}",
809                table_id, region_options.compaction
810            );
811            region_options.compaction.clone()
812        }
813    } else {
814        debug!(
815            "No schema options for table {}, use region compaction: compaction={:?}",
816            table_id, region_options.compaction
817        );
818        region_options.compaction.clone()
819    };
820
821    debug!(
822        "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
823        table_id, compaction, ttl
824    );
825    Ok((compaction, ttl))
826}
827
828/// Status of running and pending region compaction tasks.
829struct CompactionStatus {
830    /// Id of the region.
831    region_id: RegionId,
832    /// Version control of the region.
833    version_control: VersionControlRef,
834    /// Access layer of the region.
835    access_layer: AccessLayerRef,
836    /// Pending waiters for compaction.
837    waiters: Vec<OutputTx>,
838    /// Pending compactions that are supposed to run as soon as current compaction task finished.
839    pending_request: Option<PendingCompaction>,
840    /// Pending DDL requests that should run when compaction is done.
841    pending_ddl_requests: Vec<SenderDdlRequest>,
842    /// Active compaction state.
843    active_compaction: Option<ActiveCompaction>,
844}
845
846impl CompactionStatus {
847    /// Creates a new [CompactionStatus]
848    fn new(
849        region_id: RegionId,
850        version_control: VersionControlRef,
851        access_layer: AccessLayerRef,
852    ) -> CompactionStatus {
853        CompactionStatus {
854            region_id,
855            version_control,
856            access_layer,
857            waiters: Vec::new(),
858            pending_request: None,
859            pending_ddl_requests: Vec::new(),
860            active_compaction: None,
861        }
862    }
863
864    #[cfg(test)]
865    fn start_local_task(&mut self) -> LocalCompactionState {
866        let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
867        self.active_compaction = Some(ActiveCompaction::Local {
868            state: state.clone(),
869        });
870        state
871    }
872
873    #[cfg(test)]
874    fn start_remote_task(&mut self) {
875        self.active_compaction = Some(ActiveCompaction::Remote);
876    }
877
878    fn request_cancel(&mut self) -> RequestCancelResult {
879        let Some(active_compaction) = &self.active_compaction else {
880            return RequestCancelResult::NotRunning;
881        };
882
883        match active_compaction {
884            ActiveCompaction::Local { state, .. } => state.request_cancel(),
885            ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
886        }
887    }
888
889    fn clear_running_task(&mut self) -> bool {
890        self.active_compaction.take().is_some()
891    }
892
893    /// Merge the waiter to the pending compaction.
894    fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
895        if let Some(waiter) = waiter.take_inner() {
896            self.waiters.push(waiter);
897        }
898    }
899
900    /// Set pending compaction request or replace current value if already exist.
901    fn set_pending_request(&mut self, pending: PendingCompaction) {
902        if let Some(prev) = self.pending_request.replace(pending) {
903            debug!(
904                "Replace pending compaction options with new request {:?} for region: {}",
905                prev.options, self.region_id
906            );
907            prev.waiter.send(ManualCompactionOverrideSnafu.fail());
908        }
909    }
910
911    fn on_failure(mut self, err: Arc<Error>) {
912        for waiter in self.waiters.drain(..) {
913            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
914                region_id: self.region_id,
915            }));
916        }
917
918        if let Some(pending_compaction) = self.pending_request {
919            pending_compaction
920                .waiter
921                .send(Err(err.clone()).context(CompactRegionSnafu {
922                    region_id: self.region_id,
923                }));
924        }
925
926        for pending_ddl in self.pending_ddl_requests {
927            pending_ddl
928                .sender
929                .send(Err(err.clone()).context(CompactRegionSnafu {
930                    region_id: self.region_id,
931                }));
932        }
933    }
934
935    #[must_use]
936    fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
937        for waiter in self.waiters.drain(..) {
938            waiter.send(CompactionCancelledSnafu.fail());
939        }
940
941        if let Some(pending_compaction) = self.pending_request {
942            pending_compaction.waiter.send(
943                Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
944                    region_id: self.region_id,
945                }),
946            );
947        }
948
949        std::mem::take(&mut self.pending_ddl_requests)
950    }
951
952    /// Creates a new compaction request for compaction picker.
953    ///
954    /// It consumes all pending compaction waiters.
955    #[allow(clippy::too_many_arguments)]
956    fn new_compaction_request(
957        &mut self,
958        request_sender: Sender<WorkerRequestWithTime>,
959        mut waiter: OptionOutputTx,
960        engine_config: Arc<MitoConfig>,
961        cache_manager: CacheManagerRef,
962        manifest_ctx: &ManifestContextRef,
963        listener: WorkerListener,
964        schema_metadata_manager: SchemaMetadataManagerRef,
965        max_parallelism: usize,
966    ) -> CompactionRequest {
967        let current_version = CompactionVersion::from(self.version_control.current().version);
968        let start_time = Instant::now();
969        let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
970        waiters.extend(std::mem::take(&mut self.waiters));
971
972        if let Some(waiter) = waiter.take_inner() {
973            waiters.push(waiter);
974        }
975
976        CompactionRequest {
977            engine_config,
978            current_version,
979            access_layer: self.access_layer.clone(),
980            request_sender: request_sender.clone(),
981            waiters,
982            start_time,
983            cache_manager,
984            manifest_ctx: manifest_ctx.clone(),
985            listener,
986            schema_metadata_manager,
987            max_parallelism,
988        }
989    }
990}
991
992#[derive(Debug, Clone)]
993pub struct CompactionOutput {
994    /// Compaction output file level.
995    pub output_level: Level,
996    /// Compaction input files.
997    pub inputs: Vec<FileHandle>,
998    /// Whether to remove deletion markers.
999    pub filter_deleted: bool,
1000    /// Compaction output time range. Only windowed compaction specifies output time range.
1001    pub output_time_range: Option<TimestampRange>,
1002}
1003
1004/// SerializedCompactionOutput is a serialized version of [CompactionOutput] by replacing [FileHandle] with [FileMeta].
1005#[derive(Debug, Clone, Serialize, Deserialize)]
1006pub struct SerializedCompactionOutput {
1007    output_level: Level,
1008    inputs: Vec<FileMeta>,
1009    filter_deleted: bool,
1010    output_time_range: Option<TimestampRange>,
1011}
1012
1013/// Builders to create [BoxedRecordBatchStream] for compaction.
1014struct CompactionSstReaderBuilder<'a> {
1015    metadata: RegionMetadataRef,
1016    sst_layer: AccessLayerRef,
1017    cache: CacheManagerRef,
1018    inputs: &'a [FileHandle],
1019    append_mode: bool,
1020    filter_deleted: bool,
1021    time_range: Option<TimestampRange>,
1022    merge_mode: MergeMode,
1023}
1024
1025impl CompactionSstReaderBuilder<'_> {
1026    /// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files,
1027    /// for compaction. The schema of the [FlatSource] is unified.
1028    async fn build_flat_sst_reader(self) -> Result<FlatSource> {
1029        let scan_input = self.build_scan_input().await?.with_compaction(true);
1030
1031        let schema = scan_input.mapper.output_schema();
1032        let schema = schema.arrow_schema();
1033
1034        let stream = SeqScan::new(scan_input)
1035            .build_flat_reader_for_compaction()
1036            .await?;
1037        Ok(FlatSource::new_stream(schema.clone(), stream))
1038    }
1039
1040    async fn build_scan_input(self) -> Result<ScanInput> {
1041        let schema = self.metadata.schema.arrow_schema();
1042        let json_type_hint = if schema.has_json_extension_field() {
1043            let mut json_type_hint = schema
1044                .fields()
1045                .iter()
1046                .filter(|&field| is_json_extension_type(field))
1047                .map(|field| (field.name().clone(), JsonNativeType::Null))
1048                .collect::<HashMap<_, _>>();
1049
1050            let schemas = self.collect_arrow_schemas_from_parquet().await?;
1051            for schema in schemas {
1052                for field in schema.fields() {
1053                    let Some(merged) = json_type_hint.get_mut(field.name()) else {
1054                        continue;
1055                    };
1056
1057                    let json_type = JsonNativeType::try_from(field.data_type())
1058                        .context(DataTypeMismatchSnafu)?;
1059                    merged.merge(&json_type);
1060                }
1061            }
1062
1063            Some(json_type_hint)
1064        } else {
1065            None
1066        };
1067
1068        let projection = (0..self.metadata.column_metadatas.len()).collect();
1069        let read_columns = ReadColumns::from_deduped_column_ids(
1070            self.metadata.column_metadatas.iter().map(|x| x.column_id),
1071        );
1072        let mapper = FlatProjectionMapper::new_with_read_columns(
1073            &self.metadata,
1074            projection,
1075            read_columns,
1076            json_type_hint.as_ref(),
1077        )?;
1078
1079        let mut scan_input = ScanInput::new(self.sst_layer, mapper)
1080            .with_files(self.inputs.to_vec())
1081            .with_append_mode(self.append_mode)
1082            // We use special cache strategy for compaction.
1083            .with_cache(CacheStrategy::Compaction(self.cache))
1084            .with_filter_deleted(self.filter_deleted)
1085            // We ignore file not found error during compaction.
1086            .with_ignore_file_not_found(true)
1087            .with_merge_mode(self.merge_mode);
1088
1089        // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
1090        // by converting time ranges into predicate.
1091        if let Some(time_range) = self.time_range {
1092            scan_input =
1093                scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
1094        }
1095
1096        Ok(scan_input)
1097    }
1098
1099    async fn collect_arrow_schemas_from_parquet(&self) -> Result<Vec<Schema>> {
1100        let mut schemas = Vec::with_capacity(self.inputs.len());
1101
1102        for file_handle in self.inputs {
1103            let file_path =
1104                file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type());
1105            let file_size = file_handle.meta_ref().file_size;
1106            let parquet_metadata = match self
1107                .sst_layer
1108                .read_sst(file_handle.clone())
1109                .cache(CacheStrategy::Compaction(self.cache.clone()))
1110                .read_parquet_metadata(
1111                    &file_path,
1112                    file_size,
1113                    &mut MetadataCacheMetrics::default(),
1114                    PageIndexPolicy::default(),
1115                )
1116                .await
1117                .map(|x| x.0.parquet_metadata())
1118            {
1119                Ok(x) => x,
1120                Err(e) if e.is_object_not_found() => continue,
1121                Err(e) => return Err(e),
1122            };
1123            let file_metadata = parquet_metadata.file_metadata();
1124
1125            let schema = parquet_to_arrow_schema(
1126                file_metadata.schema_descr(),
1127                file_metadata.key_value_metadata(),
1128            )
1129            .context(ParquetToArrowSchemaSnafu { file: file_path })?;
1130
1131            schemas.push(schema);
1132        }
1133        Ok(schemas)
1134    }
1135}
1136
1137/// Converts time range to predicates so that rows outside the range will be filtered.
1138fn time_range_to_predicate(
1139    range: TimestampRange,
1140    metadata: &RegionMetadataRef,
1141) -> Result<PredicateGroup> {
1142    let ts_col = metadata.time_index_column();
1143
1144    // safety: time index column's type must be a valid timestamp type.
1145    let ts_col_unit = ts_col
1146        .column_schema
1147        .data_type
1148        .as_timestamp()
1149        .unwrap()
1150        .unit();
1151
1152    let exprs = match (range.start(), range.end()) {
1153        (Some(start), Some(end)) => {
1154            vec![
1155                datafusion_expr::col(ts_col.column_schema.name.clone())
1156                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1157                datafusion_expr::col(ts_col.column_schema.name.clone())
1158                    .lt(ts_to_lit(*end, ts_col_unit)?),
1159            ]
1160        }
1161        (Some(start), None) => {
1162            vec![
1163                datafusion_expr::col(ts_col.column_schema.name.clone())
1164                    .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1165            ]
1166        }
1167
1168        (None, Some(end)) => {
1169            vec![
1170                datafusion_expr::col(ts_col.column_schema.name.clone())
1171                    .lt(ts_to_lit(*end, ts_col_unit)?),
1172            ]
1173        }
1174        (None, None) => {
1175            return Ok(PredicateGroup::default());
1176        }
1177    };
1178
1179    let predicate = PredicateGroup::new(metadata, &exprs)?;
1180    Ok(predicate)
1181}
1182
1183fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
1184    let ts = ts
1185        .convert_to(ts_col_unit)
1186        .context(TimeRangePredicateOverflowSnafu {
1187            timestamp: ts,
1188            unit: ts_col_unit,
1189        })?;
1190    let val = ts.value();
1191    let scalar_value = match ts_col_unit {
1192        TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
1193        TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
1194        TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
1195        TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
1196    };
1197    Ok(datafusion_expr::lit(scalar_value))
1198}
1199
1200/// Finds all expired SSTs across levels.
1201fn get_expired_ssts(
1202    levels: &[LevelMeta],
1203    ttl: Option<TimeToLive>,
1204    now: Timestamp,
1205) -> Vec<FileHandle> {
1206    let Some(ttl) = ttl else {
1207        return vec![];
1208    };
1209
1210    levels
1211        .iter()
1212        .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
1213        .collect()
1214}
1215
1216/// Estimates compaction memory as the sum of all input files' maximum row-group
1217/// uncompressed sizes.
1218fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
1219    picker_output
1220        .outputs
1221        .iter()
1222        .flat_map(|output| output.inputs.iter())
1223        .map(|file: &FileHandle| {
1224            let meta = file.meta_ref();
1225            meta.max_row_group_uncompressed_size
1226        })
1227        .sum()
1228}
1229
1230/// Pending compaction request that is supposed to run after current task is finished,
1231/// typically used for manual compactions.
1232struct PendingCompaction {
1233    /// Compaction options. Currently, it can only be [StrictWindow].
1234    pub(crate) options: compact_request::Options,
1235    /// Waiters of pending requests.
1236    pub(crate) waiter: OptionOutputTx,
1237    /// Max parallelism for pending compaction.
1238    pub(crate) max_parallelism: usize,
1239}
1240
1241#[cfg(test)]
1242mod tests {
1243    use std::assert_matches;
1244    use std::time::Duration;
1245
1246    use api::v1::region::StrictWindow;
1247    use common_datasource::compression::CompressionType;
1248    use common_meta::key::schema_name::SchemaNameValue;
1249    use common_time::DatabaseTimeToLive;
1250    use tokio::sync::{Barrier, oneshot};
1251
1252    use super::*;
1253    use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
1254    use crate::error::InvalidSchedulerStateSnafu;
1255    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1256    use crate::region::ManifestContext;
1257    use crate::schedule::scheduler::{Job, Scheduler};
1258    use crate::sst::FormatType;
1259    use crate::test_util::mock_schema_metadata_manager;
1260    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1261    use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1262
1263    struct FailingScheduler;
1264
1265    #[async_trait::async_trait]
1266    impl Scheduler for FailingScheduler {
1267        fn schedule(&self, _job: Job) -> Result<()> {
1268            InvalidSchedulerStateSnafu.fail()
1269        }
1270
1271        async fn stop(&self, _await_termination: bool) -> Result<()> {
1272            Ok(())
1273        }
1274    }
1275
1276    #[tokio::test]
1277    async fn test_find_compaction_options_db_level() {
1278        let env = SchedulerEnv::new().await;
1279        let builder = VersionControlBuilder::new();
1280        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1281        let region_id = builder.region_id();
1282        let table_id = region_id.table_id();
1283        // Register table without ttl but with db-level compaction options
1284        let mut schema_value = SchemaNameValue {
1285            ttl: Some(DatabaseTimeToLive::default()),
1286            ..Default::default()
1287        };
1288        schema_value
1289            .extra_options
1290            .insert("compaction.type".to_string(), "twcs".to_string());
1291        schema_value
1292            .extra_options
1293            .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1294        schema_metadata_manager
1295            .register_region_table_info(
1296                table_id,
1297                "t",
1298                "c",
1299                "s",
1300                Some(schema_value),
1301                kv_backend.clone(),
1302            )
1303            .await;
1304
1305        let version_control = Arc::new(builder.build());
1306        let region_opts = version_control.current().version.options.clone();
1307        let (opts, _) = find_dynamic_options(region_id, &region_opts, &schema_metadata_manager)
1308            .await
1309            .unwrap();
1310        match opts {
1311            crate::region::options::CompactionOptions::Twcs(t) => {
1312                assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1313            }
1314        }
1315        let manifest_ctx = env
1316            .mock_manifest_context(version_control.current().version.metadata.clone())
1317            .await;
1318        let (tx, _rx) = mpsc::channel(4);
1319        let mut scheduler = env.mock_compaction_scheduler(tx);
1320        let (otx, _orx) = oneshot::channel();
1321        let request = scheduler
1322            .region_status
1323            .entry(region_id)
1324            .or_insert_with(|| {
1325                crate::compaction::CompactionStatus::new(
1326                    region_id,
1327                    version_control.clone(),
1328                    env.access_layer.clone(),
1329                )
1330            })
1331            .new_compaction_request(
1332                scheduler.request_sender.clone(),
1333                OptionOutputTx::new(Some(OutputTx::new(otx))),
1334                scheduler.engine_config.clone(),
1335                scheduler.cache_manager.clone(),
1336                &manifest_ctx,
1337                scheduler.listener.clone(),
1338                schema_metadata_manager.clone(),
1339                1,
1340            );
1341        scheduler
1342            .schedule_compaction_request(
1343                request,
1344                compact_request::Options::Regular(Default::default()),
1345            )
1346            .await
1347            .unwrap();
1348    }
1349
1350    #[tokio::test]
1351    async fn test_find_compaction_options_priority() {
1352        fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1353            let mut schema_value = SchemaNameValue {
1354                ttl: Some(DatabaseTimeToLive::default()),
1355                ..Default::default()
1356            };
1357            schema_value
1358                .extra_options
1359                .insert("compaction.type".to_string(), "twcs".to_string());
1360            schema_value.extra_options.insert(
1361                "compaction.twcs.time_window".to_string(),
1362                time_window.to_string(),
1363            );
1364            schema_value
1365        }
1366
1367        let cases = [
1368            (
1369                "db options set and table override set",
1370                Some(schema_value_with_twcs("2h")),
1371                true,
1372                Some(Duration::from_secs(5 * 3600)),
1373                Some(5 * 3600),
1374            ),
1375            (
1376                "db options set and table override not set",
1377                Some(schema_value_with_twcs("2h")),
1378                false,
1379                None,
1380                Some(2 * 3600),
1381            ),
1382            (
1383                "db options not set and table override set",
1384                None,
1385                true,
1386                Some(Duration::from_secs(4 * 3600)),
1387                Some(4 * 3600),
1388            ),
1389            (
1390                "db options not set and table override not set",
1391                None,
1392                false,
1393                None,
1394                None,
1395            ),
1396        ];
1397
1398        for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1399            let builder = VersionControlBuilder::new();
1400            let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1401            let region_id = builder.region_id();
1402            let table_id = region_id.table_id();
1403            schema_metadata_manager
1404                .register_region_table_info(
1405                    table_id,
1406                    "t",
1407                    "c",
1408                    "s",
1409                    schema_value,
1410                    kv_backend.clone(),
1411                )
1412                .await;
1413
1414            let version_control = Arc::new(builder.build());
1415            let mut region_opts = version_control.current().version.options.clone();
1416            region_opts.compaction_override = override_set;
1417            if let Some(window) = table_window {
1418                let crate::region::options::CompactionOptions::Twcs(twcs) =
1419                    &mut region_opts.compaction;
1420                twcs.time_window = Some(window);
1421            }
1422
1423            let (opts, _) = find_dynamic_options(region_id, &region_opts, &schema_metadata_manager)
1424                .await
1425                .unwrap();
1426            match opts {
1427                crate::region::options::CompactionOptions::Twcs(t) => {
1428                    assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1429                }
1430            }
1431        }
1432    }
1433
1434    #[tokio::test]
1435    async fn test_schedule_empty() {
1436        let env = SchedulerEnv::new().await;
1437        let (tx, _rx) = mpsc::channel(4);
1438        let mut scheduler = env.mock_compaction_scheduler(tx);
1439        let mut builder = VersionControlBuilder::new();
1440        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1441        schema_metadata_manager
1442            .register_region_table_info(
1443                builder.region_id().table_id(),
1444                "test_table",
1445                "test_catalog",
1446                "test_schema",
1447                None,
1448                kv_backend,
1449            )
1450            .await;
1451        // Nothing to compact.
1452        let version_control = Arc::new(builder.build());
1453        let (output_tx, output_rx) = oneshot::channel();
1454        let waiter = OptionOutputTx::from(output_tx);
1455        let manifest_ctx = env
1456            .mock_manifest_context(version_control.current().version.metadata.clone())
1457            .await;
1458        let scheduled = scheduler
1459            .schedule_compaction(
1460                builder.region_id(),
1461                compact_request::Options::Regular(Default::default()),
1462                &version_control,
1463                &env.access_layer,
1464                waiter,
1465                &manifest_ctx,
1466                schema_metadata_manager.clone(),
1467                1,
1468            )
1469            .await
1470            .unwrap();
1471        assert!(!scheduled);
1472        let output = output_rx.await.unwrap().unwrap();
1473        assert_eq!(output, 0);
1474        assert!(scheduler.region_status.is_empty());
1475
1476        // Only one file, picker won't compact it.
1477        let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1478        let (output_tx, output_rx) = oneshot::channel();
1479        let waiter = OptionOutputTx::from(output_tx);
1480        let scheduled = scheduler
1481            .schedule_compaction(
1482                builder.region_id(),
1483                compact_request::Options::Regular(Default::default()),
1484                &version_control,
1485                &env.access_layer,
1486                waiter,
1487                &manifest_ctx,
1488                schema_metadata_manager,
1489                1,
1490            )
1491            .await
1492            .unwrap();
1493        assert!(!scheduled);
1494        let output = output_rx.await.unwrap().unwrap();
1495        assert_eq!(output, 0);
1496        assert!(scheduler.region_status.is_empty());
1497    }
1498
1499    #[tokio::test]
1500    async fn test_schedule_compaction_returns_true_when_task_scheduled() {
1501        let job_scheduler = Arc::new(VecScheduler::default());
1502        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1503        let (tx, _rx) = mpsc::channel(4);
1504        let mut scheduler = env.mock_compaction_scheduler(tx);
1505        let mut builder = VersionControlBuilder::new();
1506        let region_id = builder.region_id();
1507        let end = 1000 * 1000;
1508        // Five overlapping L0 files are enough for the regular picker to create a task.
1509        let version_control = Arc::new(
1510            builder
1511                .push_l0_file(0, end)
1512                .push_l0_file(10, end)
1513                .push_l0_file(50, end)
1514                .push_l0_file(80, end)
1515                .push_l0_file(90, end)
1516                .build(),
1517        );
1518        let manifest_ctx = env
1519            .mock_manifest_context(version_control.current().version.metadata.clone())
1520            .await;
1521        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1522        schema_metadata_manager
1523            .register_region_table_info(
1524                region_id.table_id(),
1525                "test_table",
1526                "test_catalog",
1527                "test_schema",
1528                None,
1529                kv_backend,
1530            )
1531            .await;
1532
1533        let scheduled = scheduler
1534            .schedule_compaction(
1535                region_id,
1536                Options::Regular(Default::default()),
1537                &version_control,
1538                &env.access_layer,
1539                OptionOutputTx::none(),
1540                &manifest_ctx,
1541                schema_metadata_manager,
1542                1,
1543            )
1544            .await
1545            .unwrap();
1546
1547        // The boolean result is what the worker uses to decide whether to update
1548        // last_schedule_compaction_millis.
1549        assert!(scheduled);
1550        assert_eq!(1, job_scheduler.num_jobs());
1551        assert!(scheduler.region_status.contains_key(&region_id));
1552    }
1553
1554    #[tokio::test]
1555    async fn test_schedule_on_finished() {
1556        common_telemetry::init_default_ut_logging();
1557        let job_scheduler = Arc::new(VecScheduler::default());
1558        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1559        let (tx, _rx) = mpsc::channel(4);
1560        let mut scheduler = env.mock_compaction_scheduler(tx);
1561        let mut builder = VersionControlBuilder::new();
1562        let purger = builder.file_purger();
1563        let region_id = builder.region_id();
1564
1565        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1566        schema_metadata_manager
1567            .register_region_table_info(
1568                builder.region_id().table_id(),
1569                "test_table",
1570                "test_catalog",
1571                "test_schema",
1572                None,
1573                kv_backend,
1574            )
1575            .await;
1576
1577        // 5 files to compact.
1578        let end = 1000 * 1000;
1579        let version_control = Arc::new(
1580            builder
1581                .push_l0_file(0, end)
1582                .push_l0_file(10, end)
1583                .push_l0_file(50, end)
1584                .push_l0_file(80, end)
1585                .push_l0_file(90, end)
1586                .build(),
1587        );
1588        let manifest_ctx = env
1589            .mock_manifest_context(version_control.current().version.metadata.clone())
1590            .await;
1591        let scheduled = scheduler
1592            .schedule_compaction(
1593                region_id,
1594                compact_request::Options::Regular(Default::default()),
1595                &version_control,
1596                &env.access_layer,
1597                OptionOutputTx::none(),
1598                &manifest_ctx,
1599                schema_metadata_manager.clone(),
1600                1,
1601            )
1602            .await
1603            .unwrap();
1604        // Should schedule 1 compaction.
1605        assert!(scheduled);
1606        assert_eq!(1, scheduler.region_status.len());
1607        assert_eq!(1, job_scheduler.num_jobs());
1608        let data = version_control.current();
1609        let file_metas: Vec<_> = data.version.ssts.levels()[0]
1610            .files
1611            .values()
1612            .map(|file| file.meta_ref().clone())
1613            .collect();
1614
1615        // 5 files for next compaction and removes old files.
1616        apply_edit(
1617            &version_control,
1618            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1619            &file_metas,
1620            purger.clone(),
1621        );
1622        // The task is pending.
1623        let (tx, _rx) = oneshot::channel();
1624        let scheduled = scheduler
1625            .schedule_compaction(
1626                region_id,
1627                compact_request::Options::Regular(Default::default()),
1628                &version_control,
1629                &env.access_layer,
1630                OptionOutputTx::new(Some(OutputTx::new(tx))),
1631                &manifest_ctx,
1632                schema_metadata_manager.clone(),
1633                1,
1634            )
1635            .await
1636            .unwrap();
1637        assert!(!scheduled);
1638        assert_eq!(1, scheduler.region_status.len());
1639        assert_eq!(1, job_scheduler.num_jobs());
1640        assert!(
1641            !scheduler
1642                .region_status
1643                .get(&builder.region_id())
1644                .unwrap()
1645                .waiters
1646                .is_empty()
1647        );
1648
1649        // On compaction finished and schedule next compaction.
1650        scheduler
1651            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1652            .await;
1653        let scheduled = scheduler
1654            .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager.clone())
1655            .await;
1656        assert!(scheduled);
1657        assert_eq!(1, scheduler.region_status.len());
1658        assert_eq!(2, job_scheduler.num_jobs());
1659
1660        // 5 files for next compaction.
1661        apply_edit(
1662            &version_control,
1663            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1664            &[],
1665            purger.clone(),
1666        );
1667        let (tx, _rx) = oneshot::channel();
1668        // The task is pending.
1669        let scheduled = scheduler
1670            .schedule_compaction(
1671                region_id,
1672                compact_request::Options::Regular(Default::default()),
1673                &version_control,
1674                &env.access_layer,
1675                OptionOutputTx::new(Some(OutputTx::new(tx))),
1676                &manifest_ctx,
1677                schema_metadata_manager,
1678                1,
1679            )
1680            .await
1681            .unwrap();
1682        assert!(!scheduled);
1683        assert_eq!(2, job_scheduler.num_jobs());
1684        assert!(
1685            !scheduler
1686                .region_status
1687                .get(&builder.region_id())
1688                .unwrap()
1689                .waiters
1690                .is_empty()
1691        );
1692    }
1693
1694    #[tokio::test]
1695    async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
1696        common_telemetry::init_default_ut_logging();
1697        let env = SchedulerEnv::new()
1698            .await
1699            .scheduler(Arc::new(FailingScheduler));
1700        let (tx, _rx) = mpsc::channel(4);
1701        let mut scheduler = env.mock_compaction_scheduler(tx);
1702        let mut builder = VersionControlBuilder::new();
1703        let end = 1000 * 1000;
1704        let version_control = Arc::new(
1705            builder
1706                .push_l0_file(0, end)
1707                .push_l0_file(10, end)
1708                .push_l0_file(50, end)
1709                .push_l0_file(80, end)
1710                .push_l0_file(90, end)
1711                .build(),
1712        );
1713        let region_id = builder.region_id();
1714        let manifest_ctx = env
1715            .mock_manifest_context(version_control.current().version.metadata.clone())
1716            .await;
1717        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1718        schema_metadata_manager
1719            .register_region_table_info(
1720                builder.region_id().table_id(),
1721                "test_table",
1722                "test_catalog",
1723                "test_schema",
1724                None,
1725                kv_backend,
1726            )
1727            .await;
1728
1729        let result = scheduler
1730            .schedule_compaction(
1731                region_id,
1732                compact_request::Options::Regular(Default::default()),
1733                &version_control,
1734                &env.access_layer,
1735                OptionOutputTx::none(),
1736                &manifest_ctx,
1737                schema_metadata_manager,
1738                1,
1739            )
1740            .await;
1741
1742        assert!(result.is_err());
1743        assert!(!scheduler.region_status.contains_key(&region_id));
1744    }
1745
1746    #[tokio::test]
1747    async fn test_manual_compaction_when_compaction_in_progress() {
1748        common_telemetry::init_default_ut_logging();
1749        let job_scheduler = Arc::new(VecScheduler::default());
1750        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1751        let (tx, _rx) = mpsc::channel(4);
1752        let mut scheduler = env.mock_compaction_scheduler(tx);
1753        let mut builder = VersionControlBuilder::new();
1754        let purger = builder.file_purger();
1755        let region_id = builder.region_id();
1756
1757        let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1758        schema_metadata_manager
1759            .register_region_table_info(
1760                builder.region_id().table_id(),
1761                "test_table",
1762                "test_catalog",
1763                "test_schema",
1764                None,
1765                kv_backend,
1766            )
1767            .await;
1768
1769        // 5 files to compact.
1770        let end = 1000 * 1000;
1771        let version_control = Arc::new(
1772            builder
1773                .push_l0_file(0, end)
1774                .push_l0_file(10, end)
1775                .push_l0_file(50, end)
1776                .push_l0_file(80, end)
1777                .push_l0_file(90, end)
1778                .build(),
1779        );
1780        let manifest_ctx = env
1781            .mock_manifest_context(version_control.current().version.metadata.clone())
1782            .await;
1783
1784        let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1785            .files
1786            .values()
1787            .map(|file| file.meta_ref().clone())
1788            .collect();
1789
1790        // 5 files for next compaction and removes old files.
1791        apply_edit(
1792            &version_control,
1793            &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1794            &file_metas,
1795            purger.clone(),
1796        );
1797
1798        scheduler
1799            .schedule_compaction(
1800                region_id,
1801                compact_request::Options::Regular(Default::default()),
1802                &version_control,
1803                &env.access_layer,
1804                OptionOutputTx::none(),
1805                &manifest_ctx,
1806                schema_metadata_manager.clone(),
1807                1,
1808            )
1809            .await
1810            .unwrap();
1811        // Should schedule 1 compaction.
1812        assert_eq!(1, scheduler.region_status.len());
1813        assert_eq!(1, job_scheduler.num_jobs());
1814        assert!(
1815            scheduler
1816                .region_status
1817                .get(&region_id)
1818                .unwrap()
1819                .pending_request
1820                .is_none()
1821        );
1822
1823        // Schedule another manual compaction.
1824        let (tx, _rx) = oneshot::channel();
1825        scheduler
1826            .schedule_compaction(
1827                region_id,
1828                compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1829                &version_control,
1830                &env.access_layer,
1831                OptionOutputTx::new(Some(OutputTx::new(tx))),
1832                &manifest_ctx,
1833                schema_metadata_manager.clone(),
1834                1,
1835            )
1836            .await
1837            .unwrap();
1838        assert_eq!(1, scheduler.region_status.len());
1839        // Current job num should be 1 since compaction is in progress.
1840        assert_eq!(1, job_scheduler.num_jobs());
1841        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1842        assert!(status.pending_request.is_some());
1843
1844        // On compaction finished and schedule next compaction.
1845        scheduler
1846            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1847            .await;
1848        assert_eq!(1, scheduler.region_status.len());
1849        assert_eq!(2, job_scheduler.num_jobs());
1850
1851        let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1852        assert!(status.pending_request.is_none());
1853    }
1854
1855    #[tokio::test]
1856    async fn test_compaction_bypass_in_staging_mode() {
1857        let env = SchedulerEnv::new().await;
1858        let (tx, _rx) = mpsc::channel(4);
1859        let mut scheduler = env.mock_compaction_scheduler(tx);
1860
1861        // Create version control and manifest context for staging mode
1862        let builder = VersionControlBuilder::new();
1863        let version_control = Arc::new(builder.build());
1864        let region_id = version_control.current().version.metadata.region_id;
1865
1866        // Create staging manifest context using the same pattern as SchedulerEnv
1867        let staging_manifest_ctx = {
1868            let manager = RegionManifestManager::new(
1869                version_control.current().version.metadata.clone(),
1870                0,
1871                RegionManifestOptions {
1872                    manifest_dir: "".to_string(),
1873                    object_store: env.access_layer.object_store().clone(),
1874                    compress_type: CompressionType::Uncompressed,
1875                    checkpoint_distance: 10,
1876                    remove_file_options: Default::default(),
1877                    manifest_cache: None,
1878                },
1879                FormatType::PrimaryKey,
1880                &Default::default(),
1881            )
1882            .await
1883            .unwrap();
1884            Arc::new(ManifestContext::new(
1885                manager,
1886                RegionRoleState::Leader(RegionLeaderState::Staging),
1887            ))
1888        };
1889
1890        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1891
1892        // Test regular compaction bypass in staging mode
1893        let (tx, rx) = oneshot::channel();
1894        scheduler
1895            .schedule_compaction(
1896                region_id,
1897                compact_request::Options::Regular(Default::default()),
1898                &version_control,
1899                &env.access_layer,
1900                OptionOutputTx::new(Some(OutputTx::new(tx))),
1901                &staging_manifest_ctx,
1902                schema_metadata_manager,
1903                1,
1904            )
1905            .await
1906            .unwrap();
1907
1908        let result = rx.await.unwrap();
1909        assert_eq!(result.unwrap(), 0); // is there a better way to check this?
1910        assert_eq!(0, scheduler.region_status.len());
1911    }
1912
1913    #[tokio::test]
1914    async fn test_add_ddl_request_to_pending() {
1915        let env = SchedulerEnv::new().await;
1916        let (tx, _rx) = mpsc::channel(4);
1917        let mut scheduler = env.mock_compaction_scheduler(tx);
1918        let builder = VersionControlBuilder::new();
1919        let version_control = Arc::new(builder.build());
1920        let region_id = builder.region_id();
1921
1922        scheduler.region_status.insert(
1923            region_id,
1924            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1925        );
1926        scheduler
1927            .region_status
1928            .get_mut(&region_id)
1929            .unwrap()
1930            .start_local_task();
1931
1932        let (output_tx, _output_rx) = oneshot::channel();
1933        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1934            region_id,
1935            sender: OptionOutputTx::from(output_tx),
1936            request: crate::request::DdlRequest::EnterStaging(
1937                store_api::region_request::EnterStagingRequest {
1938                    partition_directive:
1939                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1940                },
1941            ),
1942        });
1943
1944        assert!(scheduler.has_pending_ddls(region_id));
1945    }
1946
1947    #[tokio::test]
1948    async fn test_request_cancel_state_transitions() {
1949        let env = SchedulerEnv::new().await;
1950        let builder = VersionControlBuilder::new();
1951        let region_id = builder.region_id();
1952        let version_control = Arc::new(builder.build());
1953        let mut status =
1954            CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1955        let state = status.start_local_task();
1956
1957        assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
1958        assert!(state.cancel_handle().is_cancelled());
1959        assert_eq!(
1960            status.request_cancel(),
1961            RequestCancelResult::AlreadyCancelling
1962        );
1963
1964        assert!(!state.mark_commit_started());
1965        assert_eq!(
1966            status.request_cancel(),
1967            RequestCancelResult::AlreadyCancelling
1968        );
1969
1970        assert!(status.clear_running_task());
1971        assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
1972    }
1973
1974    #[tokio::test]
1975    async fn test_request_cancel_remote_compaction_is_too_late() {
1976        let env = SchedulerEnv::new().await;
1977        let builder = VersionControlBuilder::new();
1978        let region_id = builder.region_id();
1979        let version_control = Arc::new(builder.build());
1980        let mut status =
1981            CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1982
1983        status.start_remote_task();
1984
1985        assert_eq!(
1986            status.request_cancel(),
1987            RequestCancelResult::TooLateToCancel
1988        );
1989        assert!(status.active_compaction.is_some());
1990    }
1991
1992    #[tokio::test]
1993    async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
1994        let job_scheduler = Arc::new(VecScheduler::default());
1995        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1996        let (tx, _rx) = mpsc::channel(4);
1997        let mut scheduler = env.mock_compaction_scheduler(tx);
1998        let builder = VersionControlBuilder::new();
1999        let version_control = Arc::new(builder.build());
2000        let region_id = builder.region_id();
2001        let _manifest_ctx = env
2002            .mock_manifest_context(version_control.current().version.metadata.clone())
2003            .await;
2004        let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2005
2006        scheduler.region_status.insert(
2007            region_id,
2008            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2009        );
2010        scheduler
2011            .region_status
2012            .get_mut(&region_id)
2013            .unwrap()
2014            .start_local_task();
2015
2016        let (output_tx, _output_rx) = oneshot::channel();
2017        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2018            region_id,
2019            sender: OptionOutputTx::from(output_tx),
2020            request: crate::request::DdlRequest::EnterStaging(
2021                store_api::region_request::EnterStagingRequest {
2022                    partition_directive:
2023                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2024                },
2025            ),
2026        });
2027
2028        let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
2029
2030        assert_eq!(pending_ddls.len(), 1);
2031        assert!(!scheduler.has_pending_ddls(region_id));
2032        assert!(!scheduler.region_status.contains_key(&region_id));
2033        assert_eq!(job_scheduler.num_jobs(), 0);
2034    }
2035
2036    #[tokio::test]
2037    async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
2038        let job_scheduler = Arc::new(VecScheduler::default());
2039        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2040        let (tx, _rx) = mpsc::channel(4);
2041        let mut scheduler = env.mock_compaction_scheduler(tx);
2042        let builder = VersionControlBuilder::new();
2043        let version_control = Arc::new(builder.build());
2044        let region_id = builder.region_id();
2045        let _manifest_ctx = env
2046            .mock_manifest_context(version_control.current().version.metadata.clone())
2047            .await;
2048        let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2049
2050        scheduler.region_status.insert(
2051            region_id,
2052            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2053        );
2054        let status = scheduler.region_status.get_mut(&region_id).unwrap();
2055        status.start_local_task();
2056        let (manual_tx, manual_rx) = oneshot::channel();
2057        status.set_pending_request(PendingCompaction {
2058            options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
2059            waiter: OptionOutputTx::from(manual_tx),
2060            max_parallelism: 1,
2061        });
2062
2063        let (output_tx, _output_rx) = oneshot::channel();
2064        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2065            region_id,
2066            sender: OptionOutputTx::from(output_tx),
2067            request: crate::request::DdlRequest::EnterStaging(
2068                store_api::region_request::EnterStagingRequest {
2069                    partition_directive:
2070                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2071                },
2072            ),
2073        });
2074
2075        let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
2076
2077        assert_eq!(pending_ddls.len(), 1);
2078        assert!(!scheduler.region_status.contains_key(&region_id));
2079        assert_eq!(job_scheduler.num_jobs(), 0);
2080        assert_matches!(manual_rx.await.unwrap(), Err(_));
2081    }
2082
2083    #[tokio::test]
2084    async fn test_pending_ddl_request_failed_on_compaction_failed() {
2085        let env = SchedulerEnv::new().await;
2086        let (tx, _rx) = mpsc::channel(4);
2087        let mut scheduler = env.mock_compaction_scheduler(tx);
2088        let builder = VersionControlBuilder::new();
2089        let version_control = Arc::new(builder.build());
2090        let region_id = builder.region_id();
2091
2092        scheduler.region_status.insert(
2093            region_id,
2094            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2095        );
2096
2097        let (output_tx, output_rx) = oneshot::channel();
2098        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2099            region_id,
2100            sender: OptionOutputTx::from(output_tx),
2101            request: crate::request::DdlRequest::EnterStaging(
2102                store_api::region_request::EnterStagingRequest {
2103                    partition_directive:
2104                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2105                },
2106            ),
2107        });
2108
2109        assert!(scheduler.has_pending_ddls(region_id));
2110        scheduler
2111            .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
2112
2113        assert!(!scheduler.has_pending_ddls(region_id));
2114        let result = output_rx.await.unwrap();
2115        assert_matches!(result, Err(_));
2116    }
2117
2118    #[tokio::test]
2119    async fn test_pending_ddl_request_failed_on_region_closed() {
2120        let env = SchedulerEnv::new().await;
2121        let (tx, _rx) = mpsc::channel(4);
2122        let mut scheduler = env.mock_compaction_scheduler(tx);
2123        let builder = VersionControlBuilder::new();
2124        let version_control = Arc::new(builder.build());
2125        let region_id = builder.region_id();
2126
2127        scheduler.region_status.insert(
2128            region_id,
2129            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2130        );
2131
2132        let (output_tx, output_rx) = oneshot::channel();
2133        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2134            region_id,
2135            sender: OptionOutputTx::from(output_tx),
2136            request: crate::request::DdlRequest::EnterStaging(
2137                store_api::region_request::EnterStagingRequest {
2138                    partition_directive:
2139                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2140                },
2141            ),
2142        });
2143
2144        assert!(scheduler.has_pending_ddls(region_id));
2145        scheduler.on_region_closed(region_id);
2146
2147        assert!(!scheduler.has_pending_ddls(region_id));
2148        let result = output_rx.await.unwrap();
2149        assert_matches!(result, Err(_));
2150    }
2151
2152    #[tokio::test]
2153    async fn test_pending_ddl_request_failed_on_region_dropped() {
2154        let env = SchedulerEnv::new().await;
2155        let (tx, _rx) = mpsc::channel(4);
2156        let mut scheduler = env.mock_compaction_scheduler(tx);
2157        let builder = VersionControlBuilder::new();
2158        let version_control = Arc::new(builder.build());
2159        let region_id = builder.region_id();
2160
2161        scheduler.region_status.insert(
2162            region_id,
2163            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2164        );
2165
2166        let (output_tx, output_rx) = oneshot::channel();
2167        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2168            region_id,
2169            sender: OptionOutputTx::from(output_tx),
2170            request: crate::request::DdlRequest::EnterStaging(
2171                store_api::region_request::EnterStagingRequest {
2172                    partition_directive:
2173                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2174                },
2175            ),
2176        });
2177
2178        assert!(scheduler.has_pending_ddls(region_id));
2179        scheduler.on_region_dropped(region_id);
2180
2181        assert!(!scheduler.has_pending_ddls(region_id));
2182        let result = output_rx.await.unwrap();
2183        assert_matches!(result, Err(_));
2184    }
2185
2186    #[tokio::test]
2187    async fn test_pending_ddl_request_failed_on_region_truncated() {
2188        let env = SchedulerEnv::new().await;
2189        let (tx, _rx) = mpsc::channel(4);
2190        let mut scheduler = env.mock_compaction_scheduler(tx);
2191        let builder = VersionControlBuilder::new();
2192        let version_control = Arc::new(builder.build());
2193        let region_id = builder.region_id();
2194
2195        scheduler.region_status.insert(
2196            region_id,
2197            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2198        );
2199
2200        let (output_tx, output_rx) = oneshot::channel();
2201        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2202            region_id,
2203            sender: OptionOutputTx::from(output_tx),
2204            request: crate::request::DdlRequest::EnterStaging(
2205                store_api::region_request::EnterStagingRequest {
2206                    partition_directive:
2207                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2208                },
2209            ),
2210        });
2211
2212        assert!(scheduler.has_pending_ddls(region_id));
2213        scheduler.on_region_truncated(region_id);
2214
2215        assert!(!scheduler.has_pending_ddls(region_id));
2216        let result = output_rx.await.unwrap();
2217        assert_matches!(result, Err(_));
2218    }
2219
2220    #[tokio::test]
2221    async fn test_on_compaction_finished_returns_pending_ddl_requests() {
2222        let job_scheduler = Arc::new(VecScheduler::default());
2223        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2224        let (tx, _rx) = mpsc::channel(4);
2225        let mut scheduler = env.mock_compaction_scheduler(tx);
2226        let builder = VersionControlBuilder::new();
2227        let version_control = Arc::new(builder.build());
2228        let region_id = builder.region_id();
2229        let manifest_ctx = env
2230            .mock_manifest_context(version_control.current().version.metadata.clone())
2231            .await;
2232        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2233
2234        scheduler.region_status.insert(
2235            region_id,
2236            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2237        );
2238        scheduler
2239            .region_status
2240            .get_mut(&region_id)
2241            .unwrap()
2242            .start_local_task();
2243
2244        let (output_tx, _output_rx) = oneshot::channel();
2245        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2246            region_id,
2247            sender: OptionOutputTx::from(output_tx),
2248            request: crate::request::DdlRequest::EnterStaging(
2249                store_api::region_request::EnterStagingRequest {
2250                    partition_directive:
2251                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2252                },
2253            ),
2254        });
2255
2256        let pending_ddls = scheduler
2257            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2258            .await;
2259
2260        assert_eq!(pending_ddls.len(), 1);
2261        assert!(!scheduler.has_pending_ddls(region_id));
2262        assert!(!scheduler.region_status.contains_key(&region_id));
2263        assert_eq!(job_scheduler.num_jobs(), 0);
2264    }
2265
2266    #[tokio::test]
2267    async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
2268        let env = SchedulerEnv::new().await;
2269        let (tx, _rx) = mpsc::channel(4);
2270        let mut scheduler = env.mock_compaction_scheduler(tx);
2271        let builder = VersionControlBuilder::new();
2272        let version_control = Arc::new(builder.build());
2273        let region_id = builder.region_id();
2274        let manifest_ctx = env
2275            .mock_manifest_context(version_control.current().version.metadata.clone())
2276            .await;
2277        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2278
2279        let (manual_tx, manual_rx) = oneshot::channel();
2280        let mut status =
2281            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2282        status.start_local_task();
2283        status.set_pending_request(PendingCompaction {
2284            options: compact_request::Options::Regular(Default::default()),
2285            waiter: OptionOutputTx::from(manual_tx),
2286            max_parallelism: 1,
2287        });
2288        scheduler.region_status.insert(region_id, status);
2289
2290        let (ddl_tx, _ddl_rx) = oneshot::channel();
2291        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2292            region_id,
2293            sender: OptionOutputTx::from(ddl_tx),
2294            request: crate::request::DdlRequest::EnterStaging(
2295                store_api::region_request::EnterStagingRequest {
2296                    partition_directive:
2297                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2298                },
2299            ),
2300        });
2301
2302        let pending_ddls = scheduler
2303            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2304            .await;
2305
2306        assert_eq!(pending_ddls.len(), 1);
2307        assert!(!scheduler.region_status.contains_key(&region_id));
2308        assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
2309    }
2310
2311    #[tokio::test]
2312    async fn test_on_compaction_finished_returns_empty_when_region_absent() {
2313        let env = SchedulerEnv::new().await;
2314        let (tx, _rx) = mpsc::channel(4);
2315        let mut scheduler = env.mock_compaction_scheduler(tx);
2316        let builder = VersionControlBuilder::new();
2317        let region_id = builder.region_id();
2318        let version_control = Arc::new(builder.build());
2319        let manifest_ctx = env
2320            .mock_manifest_context(version_control.current().version.metadata.clone())
2321            .await;
2322        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2323
2324        let pending_ddls = scheduler
2325            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2326            .await;
2327
2328        assert!(pending_ddls.is_empty());
2329    }
2330
2331    #[tokio::test]
2332    async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
2333        let env = SchedulerEnv::new()
2334            .await
2335            .scheduler(Arc::new(FailingScheduler));
2336        let (tx, _rx) = mpsc::channel(4);
2337        let mut scheduler = env.mock_compaction_scheduler(tx);
2338        let mut builder = VersionControlBuilder::new();
2339        let end = 1000 * 1000;
2340        let version_control = Arc::new(
2341            builder
2342                .push_l0_file(0, end)
2343                .push_l0_file(10, end)
2344                .push_l0_file(50, end)
2345                .push_l0_file(80, end)
2346                .push_l0_file(90, end)
2347                .build(),
2348        );
2349        let region_id = builder.region_id();
2350        let manifest_ctx = env
2351            .mock_manifest_context(version_control.current().version.metadata.clone())
2352            .await;
2353        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2354
2355        let (manual_tx, manual_rx) = oneshot::channel();
2356        let mut status =
2357            CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2358        status.start_local_task();
2359        status.set_pending_request(PendingCompaction {
2360            options: compact_request::Options::Regular(Default::default()),
2361            waiter: OptionOutputTx::from(manual_tx),
2362            max_parallelism: 1,
2363        });
2364        scheduler.region_status.insert(region_id, status);
2365
2366        let (ddl_tx, ddl_rx) = oneshot::channel();
2367        scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2368            region_id,
2369            sender: OptionOutputTx::from(ddl_tx),
2370            request: crate::request::DdlRequest::EnterStaging(
2371                store_api::region_request::EnterStagingRequest {
2372                    partition_directive:
2373                        store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2374                },
2375            ),
2376        });
2377
2378        let pending_ddls = scheduler
2379            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2380            .await;
2381
2382        assert!(pending_ddls.is_empty());
2383        assert!(!scheduler.region_status.contains_key(&region_id));
2384        assert!(manual_rx.await.is_err());
2385        assert_matches!(ddl_rx.await.unwrap(), Err(_));
2386    }
2387
2388    #[tokio::test]
2389    async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
2390        let env = SchedulerEnv::new().await;
2391        let (tx, _rx) = mpsc::channel(4);
2392        let mut scheduler = env.mock_compaction_scheduler(tx);
2393        let builder = VersionControlBuilder::new();
2394        let version_control = Arc::new(builder.build());
2395        let region_id = builder.region_id();
2396        let manifest_ctx = env
2397            .mock_manifest_context(version_control.current().version.metadata.clone())
2398            .await;
2399        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2400
2401        scheduler.region_status.insert(
2402            region_id,
2403            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2404        );
2405        scheduler
2406            .region_status
2407            .get_mut(&region_id)
2408            .unwrap()
2409            .start_local_task();
2410
2411        let pending_ddls = scheduler
2412            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2413            .await;
2414
2415        assert!(pending_ddls.is_empty());
2416        assert!(scheduler.region_status.contains_key(&region_id));
2417
2418        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2419        // With no compactable files, next scheduling returns false and removes
2420        // the status without creating a background task.
2421        let scheduled = scheduler
2422            .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
2423            .await;
2424        assert!(!scheduled);
2425        assert!(!scheduler.region_status.contains_key(&region_id));
2426    }
2427
2428    #[tokio::test]
2429    async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
2430        let env = SchedulerEnv::new()
2431            .await
2432            .scheduler(Arc::new(FailingScheduler));
2433        let (tx, _rx) = mpsc::channel(4);
2434        let mut scheduler = env.mock_compaction_scheduler(tx);
2435        let mut builder = VersionControlBuilder::new();
2436        let end = 1000 * 1000;
2437        let version_control = Arc::new(
2438            builder
2439                .push_l0_file(0, end)
2440                .push_l0_file(10, end)
2441                .push_l0_file(50, end)
2442                .push_l0_file(80, end)
2443                .push_l0_file(90, end)
2444                .build(),
2445        );
2446        let region_id = builder.region_id();
2447        let manifest_ctx = env
2448            .mock_manifest_context(version_control.current().version.metadata.clone())
2449            .await;
2450        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2451
2452        scheduler.region_status.insert(
2453            region_id,
2454            CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2455        );
2456        scheduler
2457            .region_status
2458            .get_mut(&region_id)
2459            .unwrap()
2460            .start_local_task();
2461
2462        let pending_ddls = scheduler
2463            .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2464            .await;
2465
2466        assert!(pending_ddls.is_empty());
2467        assert!(scheduler.region_status.contains_key(&region_id));
2468
2469        let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2470        // The failing scheduler simulates a submit error; callers must see false.
2471        let scheduled = scheduler
2472            .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
2473            .await;
2474        assert!(!scheduled);
2475        assert!(!scheduler.region_status.contains_key(&region_id));
2476    }
2477
2478    #[tokio::test]
2479    async fn test_concurrent_memory_competition() {
2480        let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
2481        let barrier = Arc::new(Barrier::new(3));
2482        let mut handles = vec![];
2483
2484        // Spawn 3 tasks competing for memory, each trying to acquire 2MB
2485        for _i in 0..3 {
2486            let mgr = manager.clone();
2487            let bar = barrier.clone();
2488            let handle = tokio::spawn(async move {
2489                bar.wait().await; // Synchronize start
2490                mgr.try_acquire(2 * 1024 * 1024)
2491            });
2492            handles.push(handle);
2493        }
2494
2495        let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
2496            .await
2497            .into_iter()
2498            .map(|r| r.unwrap())
2499            .collect();
2500
2501        // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
2502        let succeeded = results.iter().filter(|r| r.is_some()).count();
2503        let failed = results.iter().filter(|r| r.is_none()).count();
2504
2505        assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
2506        assert_eq!(failed, 2, "Expected 2 tasks to fail");
2507
2508        // Clean up
2509        drop(results);
2510        assert_eq!(manager.used_bytes(), 0);
2511    }
2512}