1mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::BoxedRecordBatchStream;
62use crate::read::projection::ProjectionMapper;
63use crate::read::scan_region::{PredicateGroup, ScanInput};
64use crate::read::seq_scan::SeqScan;
65use crate::region::options::{MergeMode, RegionOptions};
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77pub struct CompactionRequest {
79 pub(crate) engine_config: Arc<MitoConfig>,
80 pub(crate) current_version: CompactionVersion,
81 pub(crate) access_layer: AccessLayerRef,
82 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84 pub(crate) waiters: Vec<OutputTx>,
86 pub(crate) start_time: Instant,
88 pub(crate) cache_manager: CacheManagerRef,
89 pub(crate) manifest_ctx: ManifestContextRef,
90 pub(crate) listener: WorkerListener,
91 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92 pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96 pub(crate) fn region_id(&self) -> RegionId {
97 self.current_version.metadata.region_id
98 }
99}
100
101pub(crate) struct CompactionScheduler {
103 scheduler: SchedulerRef,
104 region_status: HashMap<RegionId, CompactionStatus>,
106 request_sender: Sender<WorkerRequestWithTime>,
108 cache_manager: CacheManagerRef,
109 engine_config: Arc<MitoConfig>,
110 memory_manager: Arc<CompactionMemoryManager>,
111 memory_policy: OnExhaustedPolicy,
112 listener: WorkerListener,
113 plugins: Plugins,
115}
116
117impl CompactionScheduler {
118 #[allow(clippy::too_many_arguments)]
119 pub(crate) fn new(
120 scheduler: SchedulerRef,
121 request_sender: Sender<WorkerRequestWithTime>,
122 cache_manager: CacheManagerRef,
123 engine_config: Arc<MitoConfig>,
124 listener: WorkerListener,
125 plugins: Plugins,
126 memory_manager: Arc<CompactionMemoryManager>,
127 memory_policy: OnExhaustedPolicy,
128 ) -> Self {
129 Self {
130 scheduler,
131 region_status: HashMap::new(),
132 request_sender,
133 cache_manager,
134 engine_config,
135 memory_manager,
136 memory_policy,
137 listener,
138 plugins,
139 }
140 }
141
142 #[allow(clippy::too_many_arguments)]
144 pub(crate) async fn schedule_compaction(
145 &mut self,
146 region_id: RegionId,
147 compact_options: compact_request::Options,
148 version_control: &VersionControlRef,
149 access_layer: &AccessLayerRef,
150 waiter: OptionOutputTx,
151 manifest_ctx: &ManifestContextRef,
152 schema_metadata_manager: SchemaMetadataManagerRef,
153 max_parallelism: usize,
154 ) -> Result<()> {
155 let current_state = manifest_ctx.current_state();
157 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158 info!(
159 "Skipping compaction for region {} in staging mode, options: {:?}",
160 region_id, compact_options
161 );
162 waiter.send(Ok(0));
163 return Ok(());
164 }
165
166 if let Some(status) = self.region_status.get_mut(®ion_id) {
167 match compact_options {
168 Options::Regular(_) => {
169 status.merge_waiter(waiter);
171 }
172 options @ Options::StrictWindow(_) => {
173 status.set_pending_request(PendingCompaction {
175 options,
176 waiter,
177 max_parallelism,
178 });
179 info!(
180 "Region {} is compacting, manually compaction will be re-scheduled.",
181 region_id
182 );
183 }
184 }
185 return Ok(());
186 }
187
188 let mut status: CompactionStatus =
190 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191 let request = status.new_compaction_request(
192 self.request_sender.clone(),
193 waiter,
194 self.engine_config.clone(),
195 self.cache_manager.clone(),
196 manifest_ctx,
197 self.listener.clone(),
198 schema_metadata_manager,
199 max_parallelism,
200 );
201
202 let result = self
203 .schedule_compaction_request(request, compact_options)
204 .await;
205 if matches!(result, Ok(true)) {
206 self.region_status.insert(region_id, status);
209 }
210
211 self.listener.on_compaction_scheduled(region_id);
212 result.map(|_| ())
213 }
214
215 pub(crate) async fn handle_pending_compaction_request(
219 &mut self,
220 region_id: RegionId,
221 manifest_ctx: &ManifestContextRef,
222 schema_metadata_manager: SchemaMetadataManagerRef,
223 ) -> bool {
224 let Some(status) = self.region_status.get_mut(®ion_id) else {
225 return true;
226 };
227
228 let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
231 return false;
232 };
233
234 let PendingCompaction {
235 options,
236 waiter,
237 max_parallelism,
238 } = pending_request;
239
240 let request = {
241 status.new_compaction_request(
242 self.request_sender.clone(),
243 waiter,
244 self.engine_config.clone(),
245 self.cache_manager.clone(),
246 manifest_ctx,
247 self.listener.clone(),
248 schema_metadata_manager,
249 max_parallelism,
250 )
251 };
252
253 match self.schedule_compaction_request(request, options).await {
254 Ok(true) => {
255 debug!(
256 "Successfully scheduled manual compaction for region id: {}",
257 region_id
258 );
259 true
260 }
261 Ok(false) => {
262 false
265 }
266 Err(e) => {
267 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
268 self.remove_region_on_failure(region_id, Arc::new(e));
269 true
270 }
271 }
272 }
273
274 pub(crate) async fn on_compaction_finished(
276 &mut self,
277 region_id: RegionId,
278 manifest_ctx: &ManifestContextRef,
279 schema_metadata_manager: SchemaMetadataManagerRef,
280 ) -> Vec<SenderDdlRequest> {
281 if self
284 .handle_pending_compaction_request(
285 region_id,
286 manifest_ctx,
287 schema_metadata_manager.clone(),
288 )
289 .await
290 {
291 return Vec::new();
292 }
293
294 let Some(status) = self.region_status.get_mut(®ion_id) else {
295 return Vec::new();
298 };
299
300 for waiter in std::mem::take(&mut status.waiters) {
302 waiter.send(Ok(0));
303 }
304
305 let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
307 if !pending_ddl_requests.is_empty() {
308 self.region_status.remove(®ion_id);
309 return pending_ddl_requests;
312 }
313
314 let request = status.new_compaction_request(
316 self.request_sender.clone(),
317 OptionOutputTx::none(),
318 self.engine_config.clone(),
319 self.cache_manager.clone(),
320 manifest_ctx,
321 self.listener.clone(),
322 schema_metadata_manager,
323 MAX_PARALLEL_COMPACTION,
324 );
325
326 match self
328 .schedule_compaction_request(
329 request,
330 compact_request::Options::Regular(Default::default()),
331 )
332 .await
333 {
334 Ok(true) => {
335 debug!(
336 "Successfully scheduled next compaction for region id: {}",
337 region_id
338 );
339 }
340 Ok(false) => {
341 self.region_status.remove(®ion_id);
345 }
346 Err(e) => {
347 error!(e; "Failed to schedule next compaction for region {}", region_id);
348 self.remove_region_on_failure(region_id, Arc::new(e));
349 }
350 }
351
352 Vec::new()
353 }
354
355 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
357 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
358 self.remove_region_on_failure(region_id, err);
359 }
360
361 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
363 self.remove_region_on_failure(
364 region_id,
365 Arc::new(RegionDroppedSnafu { region_id }.build()),
366 );
367 }
368
369 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
371 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
372 }
373
374 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
376 self.remove_region_on_failure(
377 region_id,
378 Arc::new(RegionTruncatedSnafu { region_id }.build()),
379 );
380 }
381
382 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
387 debug!(
388 "Added pending DDL request for region: {}, ddl: {:?}",
389 request.region_id, request.request
390 );
391 let status = self.region_status.get_mut(&request.region_id).unwrap();
392 status.pending_ddl_requests.push(request);
393 }
394
395 #[cfg(test)]
396 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
397 let has_pending = self
398 .region_status
399 .get(®ion_id)
400 .map(|status| !status.pending_ddl_requests.is_empty())
401 .unwrap_or(false);
402 debug!(
403 "Checked pending DDL requests for region: {}, has_pending: {}",
404 region_id, has_pending
405 );
406 has_pending
407 }
408
409 pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
411 self.region_status.contains_key(®ion_id)
412 }
413
414 async fn schedule_compaction_request(
419 &mut self,
420 request: CompactionRequest,
421 options: compact_request::Options,
422 ) -> Result<bool> {
423 let region_id = request.region_id();
424 let (dynamic_compaction_opts, ttl) = find_dynamic_options(
425 region_id.table_id(),
426 &request.current_version.options,
427 &request.schema_metadata_manager,
428 )
429 .await
430 .unwrap_or_else(|e| {
431 warn!(e; "Failed to find dynamic options for region: {}", region_id);
432 (
433 request.current_version.options.compaction.clone(),
434 request.current_version.options.ttl.unwrap_or_default(),
435 )
436 });
437
438 let picker = new_picker(
439 &options,
440 &dynamic_compaction_opts,
441 request.current_version.options.append_mode,
442 Some(self.engine_config.max_background_compactions),
443 );
444 let region_id = request.region_id();
445 let CompactionRequest {
446 engine_config,
447 current_version,
448 access_layer,
449 request_sender,
450 waiters,
451 start_time,
452 cache_manager,
453 manifest_ctx,
454 listener,
455 schema_metadata_manager: _,
456 max_parallelism,
457 } = request;
458
459 debug!(
460 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
461 picker, region_id, ttl
462 );
463
464 let compaction_region = CompactionRegion {
465 region_id,
466 current_version: current_version.clone(),
467 region_options: RegionOptions {
468 compaction: dynamic_compaction_opts.clone(),
469 ..current_version.options.clone()
470 },
471 engine_config: engine_config.clone(),
472 region_metadata: current_version.metadata.clone(),
473 cache_manager: cache_manager.clone(),
474 access_layer: access_layer.clone(),
475 manifest_ctx: manifest_ctx.clone(),
476 file_purger: None,
477 ttl: Some(ttl),
478 max_parallelism,
479 };
480
481 let picker_output = {
482 let _pick_timer = COMPACTION_STAGE_ELAPSED
483 .with_label_values(&["pick"])
484 .start_timer();
485 picker.pick(&compaction_region)
486 };
487
488 let picker_output = if let Some(picker_output) = picker_output {
489 picker_output
490 } else {
491 for waiter in waiters {
493 waiter.send(Ok(0));
494 }
495 return Ok(false);
496 };
497
498 let waiters = if dynamic_compaction_opts.remote_compaction() {
501 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
502 let remote_compaction_job = CompactionJob {
503 compaction_region: compaction_region.clone(),
504 picker_output: picker_output.clone(),
505 start_time,
506 waiters,
507 ttl,
508 };
509
510 let result = remote_job_scheduler
511 .schedule(
512 RemoteJob::CompactionJob(remote_compaction_job),
513 Box::new(DefaultNotifier {
514 request_sender: request_sender.clone(),
515 }),
516 )
517 .await;
518
519 match result {
520 Ok(job_id) => {
521 info!(
522 "Scheduled remote compaction job {} for region {}",
523 job_id, region_id
524 );
525 INFLIGHT_COMPACTION_COUNT.inc();
526 return Ok(true);
527 }
528 Err(e) => {
529 if !dynamic_compaction_opts.fallback_to_local() {
530 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
531 return RemoteCompactionSnafu {
532 region_id,
533 job_id: None,
534 reason: e.reason,
535 }
536 .fail();
537 }
538
539 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
540
541 e.waiters
543 }
544 }
545 } else {
546 debug!(
547 "Remote compaction is not enabled, fallback to local compaction for region {}",
548 region_id
549 );
550 waiters
551 }
552 } else {
553 waiters
554 };
555
556 let estimated_bytes = estimate_compaction_bytes(&picker_output);
558 let local_compaction_task = Box::new(CompactionTaskImpl {
559 request_sender,
560 waiters,
561 start_time,
562 listener,
563 picker_output,
564 compaction_region,
565 compactor: Arc::new(DefaultCompactor {}),
566 memory_manager: self.memory_manager.clone(),
567 memory_policy: self.memory_policy,
568 estimated_memory_bytes: estimated_bytes,
569 });
570
571 self.submit_compaction_task(local_compaction_task, region_id)
572 .map(|_| true)
573 }
574
575 fn submit_compaction_task(
576 &mut self,
577 mut task: Box<CompactionTaskImpl>,
578 region_id: RegionId,
579 ) -> Result<()> {
580 self.scheduler
581 .schedule(Box::pin(async move {
582 INFLIGHT_COMPACTION_COUNT.inc();
583 task.run().await;
584 INFLIGHT_COMPACTION_COUNT.dec();
585 }))
586 .inspect_err(
587 |e| error!(e; "Failed to submit compaction request for region {}", region_id),
588 )
589 }
590
591 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
592 let Some(status) = self.region_status.remove(®ion_id) else {
594 return;
595 };
596
597 status.on_failure(err);
599 }
600}
601
602impl Drop for CompactionScheduler {
603 fn drop(&mut self) {
604 for (region_id, status) in self.region_status.drain() {
605 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
607 }
608 }
609}
610
611async fn find_dynamic_options(
613 table_id: TableId,
614 region_options: &crate::region::options::RegionOptions,
615 schema_metadata_manager: &SchemaMetadataManagerRef,
616) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
617 if region_options.compaction_override && region_options.ttl.is_some() {
618 debug!(
619 "Use region options directly for table {}: compaction={:?}, ttl={:?}",
620 table_id, region_options.compaction, region_options.ttl
621 );
622 return Ok((
623 region_options.compaction.clone(),
624 region_options.ttl.unwrap(),
625 ));
626 }
627
628 let db_options = tokio::time::timeout(
629 crate::config::FETCH_OPTION_TIMEOUT,
630 schema_metadata_manager.get_schema_options_by_table_id(table_id),
631 )
632 .await
633 .context(TimeoutSnafu)?
634 .context(GetSchemaMetadataSnafu)?;
635
636 let ttl = if region_options.ttl.is_some() {
637 debug!(
638 "Use region TTL directly for table {}: ttl={:?}",
639 table_id, region_options.ttl
640 );
641 region_options.ttl.unwrap()
642 } else {
643 db_options
644 .as_ref()
645 .and_then(|options| options.ttl)
646 .unwrap_or_default()
647 .into()
648 };
649
650 let compaction = if !region_options.compaction_override {
651 if let Some(schema_opts) = db_options {
652 let map: HashMap<String, String> = schema_opts
653 .extra_options
654 .iter()
655 .filter_map(|(k, v)| {
656 if k.starts_with("compaction.") {
657 Some((k.clone(), v.clone()))
658 } else {
659 None
660 }
661 })
662 .collect();
663 if map.is_empty() {
664 region_options.compaction.clone()
665 } else {
666 crate::region::options::RegionOptions::try_from(&map)
667 .map(|o| o.compaction)
668 .unwrap_or_else(|e| {
669 error!(e; "Failed to create RegionOptions from map");
670 region_options.compaction.clone()
671 })
672 }
673 } else {
674 debug!(
675 "DB options is None for table {}, use region compaction: compaction={:?}",
676 table_id, region_options.compaction
677 );
678 region_options.compaction.clone()
679 }
680 } else {
681 debug!(
682 "No schema options for table {}, use region compaction: compaction={:?}",
683 table_id, region_options.compaction
684 );
685 region_options.compaction.clone()
686 };
687
688 debug!(
689 "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
690 table_id, compaction, ttl
691 );
692 Ok((compaction, ttl))
693}
694
695struct CompactionStatus {
697 region_id: RegionId,
699 version_control: VersionControlRef,
701 access_layer: AccessLayerRef,
703 waiters: Vec<OutputTx>,
705 pending_request: Option<PendingCompaction>,
707 pending_ddl_requests: Vec<SenderDdlRequest>,
709}
710
711impl CompactionStatus {
712 fn new(
714 region_id: RegionId,
715 version_control: VersionControlRef,
716 access_layer: AccessLayerRef,
717 ) -> CompactionStatus {
718 CompactionStatus {
719 region_id,
720 version_control,
721 access_layer,
722 waiters: Vec::new(),
723 pending_request: None,
724 pending_ddl_requests: Vec::new(),
725 }
726 }
727
728 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
730 if let Some(waiter) = waiter.take_inner() {
731 self.waiters.push(waiter);
732 }
733 }
734
735 fn set_pending_request(&mut self, pending: PendingCompaction) {
737 if let Some(prev) = self.pending_request.replace(pending) {
738 debug!(
739 "Replace pending compaction options with new request {:?} for region: {}",
740 prev.options, self.region_id
741 );
742 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
743 }
744 }
745
746 fn on_failure(mut self, err: Arc<Error>) {
747 for waiter in self.waiters.drain(..) {
748 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
749 region_id: self.region_id,
750 }));
751 }
752
753 if let Some(pending_compaction) = self.pending_request {
754 pending_compaction
755 .waiter
756 .send(Err(err.clone()).context(CompactRegionSnafu {
757 region_id: self.region_id,
758 }));
759 }
760
761 for pending_ddl in self.pending_ddl_requests {
762 pending_ddl
763 .sender
764 .send(Err(err.clone()).context(CompactRegionSnafu {
765 region_id: self.region_id,
766 }));
767 }
768 }
769
770 #[allow(clippy::too_many_arguments)]
774 fn new_compaction_request(
775 &mut self,
776 request_sender: Sender<WorkerRequestWithTime>,
777 mut waiter: OptionOutputTx,
778 engine_config: Arc<MitoConfig>,
779 cache_manager: CacheManagerRef,
780 manifest_ctx: &ManifestContextRef,
781 listener: WorkerListener,
782 schema_metadata_manager: SchemaMetadataManagerRef,
783 max_parallelism: usize,
784 ) -> CompactionRequest {
785 let current_version = CompactionVersion::from(self.version_control.current().version);
786 let start_time = Instant::now();
787 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
788 waiters.extend(std::mem::take(&mut self.waiters));
789
790 if let Some(waiter) = waiter.take_inner() {
791 waiters.push(waiter);
792 }
793
794 CompactionRequest {
795 engine_config,
796 current_version,
797 access_layer: self.access_layer.clone(),
798 request_sender: request_sender.clone(),
799 waiters,
800 start_time,
801 cache_manager,
802 manifest_ctx: manifest_ctx.clone(),
803 listener,
804 schema_metadata_manager,
805 max_parallelism,
806 }
807 }
808}
809
810#[derive(Debug, Clone)]
811pub struct CompactionOutput {
812 pub output_level: Level,
814 pub inputs: Vec<FileHandle>,
816 pub filter_deleted: bool,
818 pub output_time_range: Option<TimestampRange>,
820}
821
822#[derive(Debug, Clone, Serialize, Deserialize)]
824pub struct SerializedCompactionOutput {
825 output_level: Level,
826 inputs: Vec<FileMeta>,
827 filter_deleted: bool,
828 output_time_range: Option<TimestampRange>,
829}
830
831struct CompactionSstReaderBuilder<'a> {
833 metadata: RegionMetadataRef,
834 sst_layer: AccessLayerRef,
835 cache: CacheManagerRef,
836 inputs: &'a [FileHandle],
837 append_mode: bool,
838 filter_deleted: bool,
839 time_range: Option<TimestampRange>,
840 merge_mode: MergeMode,
841}
842
843impl CompactionSstReaderBuilder<'_> {
844 async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
846 let scan_input = self.build_scan_input()?.with_compaction(true);
847
848 SeqScan::new(scan_input)
849 .build_flat_reader_for_compaction()
850 .await
851 }
852
853 fn build_scan_input(self) -> Result<ScanInput> {
854 let mapper = ProjectionMapper::all(&self.metadata, true)?;
855 let mut scan_input = ScanInput::new(self.sst_layer, mapper)
856 .with_files(self.inputs.to_vec())
857 .with_append_mode(self.append_mode)
858 .with_cache(CacheStrategy::Compaction(self.cache))
860 .with_filter_deleted(self.filter_deleted)
861 .with_ignore_file_not_found(true)
863 .with_merge_mode(self.merge_mode)
864 .with_flat_format(true);
865
866 if let Some(time_range) = self.time_range {
869 scan_input =
870 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
871 }
872
873 Ok(scan_input)
874 }
875}
876
877fn time_range_to_predicate(
879 range: TimestampRange,
880 metadata: &RegionMetadataRef,
881) -> Result<PredicateGroup> {
882 let ts_col = metadata.time_index_column();
883
884 let ts_col_unit = ts_col
886 .column_schema
887 .data_type
888 .as_timestamp()
889 .unwrap()
890 .unit();
891
892 let exprs = match (range.start(), range.end()) {
893 (Some(start), Some(end)) => {
894 vec![
895 datafusion_expr::col(ts_col.column_schema.name.clone())
896 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
897 datafusion_expr::col(ts_col.column_schema.name.clone())
898 .lt(ts_to_lit(*end, ts_col_unit)?),
899 ]
900 }
901 (Some(start), None) => {
902 vec![
903 datafusion_expr::col(ts_col.column_schema.name.clone())
904 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
905 ]
906 }
907
908 (None, Some(end)) => {
909 vec![
910 datafusion_expr::col(ts_col.column_schema.name.clone())
911 .lt(ts_to_lit(*end, ts_col_unit)?),
912 ]
913 }
914 (None, None) => {
915 return Ok(PredicateGroup::default());
916 }
917 };
918
919 let predicate = PredicateGroup::new(metadata, &exprs)?;
920 Ok(predicate)
921}
922
923fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
924 let ts = ts
925 .convert_to(ts_col_unit)
926 .context(TimeRangePredicateOverflowSnafu {
927 timestamp: ts,
928 unit: ts_col_unit,
929 })?;
930 let val = ts.value();
931 let scalar_value = match ts_col_unit {
932 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
933 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
934 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
935 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
936 };
937 Ok(datafusion_expr::lit(scalar_value))
938}
939
940fn get_expired_ssts(
942 levels: &[LevelMeta],
943 ttl: Option<TimeToLive>,
944 now: Timestamp,
945) -> Vec<FileHandle> {
946 let Some(ttl) = ttl else {
947 return vec![];
948 };
949
950 levels
951 .iter()
952 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
953 .collect()
954}
955
956fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
959 picker_output
960 .outputs
961 .iter()
962 .flat_map(|output| output.inputs.iter())
963 .map(|file: &FileHandle| {
964 let meta = file.meta_ref();
965 meta.max_row_group_uncompressed_size
966 })
967 .sum()
968}
969
970struct PendingCompaction {
973 pub(crate) options: compact_request::Options,
975 pub(crate) waiter: OptionOutputTx,
977 pub(crate) max_parallelism: usize,
979}
980
981#[cfg(test)]
982mod tests {
983 use std::assert_matches::assert_matches;
984 use std::time::Duration;
985
986 use api::v1::region::StrictWindow;
987 use common_datasource::compression::CompressionType;
988 use common_meta::key::schema_name::SchemaNameValue;
989 use common_time::DatabaseTimeToLive;
990 use tokio::sync::{Barrier, oneshot};
991
992 use super::*;
993 use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
994 use crate::error::InvalidSchedulerStateSnafu;
995 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
996 use crate::region::ManifestContext;
997 use crate::schedule::scheduler::{Job, Scheduler};
998 use crate::sst::FormatType;
999 use crate::test_util::mock_schema_metadata_manager;
1000 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1001 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1002
1003 struct FailingScheduler;
1004
1005 #[async_trait::async_trait]
1006 impl Scheduler for FailingScheduler {
1007 fn schedule(&self, _job: Job) -> Result<()> {
1008 InvalidSchedulerStateSnafu.fail()
1009 }
1010
1011 async fn stop(&self, _await_termination: bool) -> Result<()> {
1012 Ok(())
1013 }
1014 }
1015
1016 #[tokio::test]
1017 async fn test_find_compaction_options_db_level() {
1018 let env = SchedulerEnv::new().await;
1019 let builder = VersionControlBuilder::new();
1020 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1021 let region_id = builder.region_id();
1022 let table_id = region_id.table_id();
1023 let mut schema_value = SchemaNameValue {
1025 ttl: Some(DatabaseTimeToLive::default()),
1026 ..Default::default()
1027 };
1028 schema_value
1029 .extra_options
1030 .insert("compaction.type".to_string(), "twcs".to_string());
1031 schema_value
1032 .extra_options
1033 .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1034 schema_metadata_manager
1035 .register_region_table_info(
1036 table_id,
1037 "t",
1038 "c",
1039 "s",
1040 Some(schema_value),
1041 kv_backend.clone(),
1042 )
1043 .await;
1044
1045 let version_control = Arc::new(builder.build());
1046 let region_opts = version_control.current().version.options.clone();
1047 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
1048 .await
1049 .unwrap();
1050 match opts {
1051 crate::region::options::CompactionOptions::Twcs(t) => {
1052 assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1053 }
1054 }
1055 let manifest_ctx = env
1056 .mock_manifest_context(version_control.current().version.metadata.clone())
1057 .await;
1058 let (tx, _rx) = mpsc::channel(4);
1059 let mut scheduler = env.mock_compaction_scheduler(tx);
1060 let (otx, _orx) = oneshot::channel();
1061 let request = scheduler
1062 .region_status
1063 .entry(region_id)
1064 .or_insert_with(|| {
1065 crate::compaction::CompactionStatus::new(
1066 region_id,
1067 version_control.clone(),
1068 env.access_layer.clone(),
1069 )
1070 })
1071 .new_compaction_request(
1072 scheduler.request_sender.clone(),
1073 OptionOutputTx::new(Some(OutputTx::new(otx))),
1074 scheduler.engine_config.clone(),
1075 scheduler.cache_manager.clone(),
1076 &manifest_ctx,
1077 scheduler.listener.clone(),
1078 schema_metadata_manager.clone(),
1079 1,
1080 );
1081 scheduler
1082 .schedule_compaction_request(
1083 request,
1084 compact_request::Options::Regular(Default::default()),
1085 )
1086 .await
1087 .unwrap();
1088 }
1089
1090 #[tokio::test]
1091 async fn test_find_compaction_options_priority() {
1092 fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1093 let mut schema_value = SchemaNameValue {
1094 ttl: Some(DatabaseTimeToLive::default()),
1095 ..Default::default()
1096 };
1097 schema_value
1098 .extra_options
1099 .insert("compaction.type".to_string(), "twcs".to_string());
1100 schema_value.extra_options.insert(
1101 "compaction.twcs.time_window".to_string(),
1102 time_window.to_string(),
1103 );
1104 schema_value
1105 }
1106
1107 let cases = [
1108 (
1109 "db options set and table override set",
1110 Some(schema_value_with_twcs("2h")),
1111 true,
1112 Some(Duration::from_secs(5 * 3600)),
1113 Some(5 * 3600),
1114 ),
1115 (
1116 "db options set and table override not set",
1117 Some(schema_value_with_twcs("2h")),
1118 false,
1119 None,
1120 Some(2 * 3600),
1121 ),
1122 (
1123 "db options not set and table override set",
1124 None,
1125 true,
1126 Some(Duration::from_secs(4 * 3600)),
1127 Some(4 * 3600),
1128 ),
1129 (
1130 "db options not set and table override not set",
1131 None,
1132 false,
1133 None,
1134 None,
1135 ),
1136 ];
1137
1138 for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1139 let builder = VersionControlBuilder::new();
1140 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1141 let table_id = builder.region_id().table_id();
1142 schema_metadata_manager
1143 .register_region_table_info(
1144 table_id,
1145 "t",
1146 "c",
1147 "s",
1148 schema_value,
1149 kv_backend.clone(),
1150 )
1151 .await;
1152
1153 let version_control = Arc::new(builder.build());
1154 let mut region_opts = version_control.current().version.options.clone();
1155 region_opts.compaction_override = override_set;
1156 if let Some(window) = table_window {
1157 let crate::region::options::CompactionOptions::Twcs(twcs) =
1158 &mut region_opts.compaction;
1159 twcs.time_window = Some(window);
1160 }
1161
1162 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
1163 .await
1164 .unwrap();
1165 match opts {
1166 crate::region::options::CompactionOptions::Twcs(t) => {
1167 assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1168 }
1169 }
1170 }
1171 }
1172
1173 #[tokio::test]
1174 async fn test_schedule_empty() {
1175 let env = SchedulerEnv::new().await;
1176 let (tx, _rx) = mpsc::channel(4);
1177 let mut scheduler = env.mock_compaction_scheduler(tx);
1178 let mut builder = VersionControlBuilder::new();
1179 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1180 schema_metadata_manager
1181 .register_region_table_info(
1182 builder.region_id().table_id(),
1183 "test_table",
1184 "test_catalog",
1185 "test_schema",
1186 None,
1187 kv_backend,
1188 )
1189 .await;
1190 let version_control = Arc::new(builder.build());
1192 let (output_tx, output_rx) = oneshot::channel();
1193 let waiter = OptionOutputTx::from(output_tx);
1194 let manifest_ctx = env
1195 .mock_manifest_context(version_control.current().version.metadata.clone())
1196 .await;
1197 scheduler
1198 .schedule_compaction(
1199 builder.region_id(),
1200 compact_request::Options::Regular(Default::default()),
1201 &version_control,
1202 &env.access_layer,
1203 waiter,
1204 &manifest_ctx,
1205 schema_metadata_manager.clone(),
1206 1,
1207 )
1208 .await
1209 .unwrap();
1210 let output = output_rx.await.unwrap().unwrap();
1211 assert_eq!(output, 0);
1212 assert!(scheduler.region_status.is_empty());
1213
1214 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1216 let (output_tx, output_rx) = oneshot::channel();
1217 let waiter = OptionOutputTx::from(output_tx);
1218 scheduler
1219 .schedule_compaction(
1220 builder.region_id(),
1221 compact_request::Options::Regular(Default::default()),
1222 &version_control,
1223 &env.access_layer,
1224 waiter,
1225 &manifest_ctx,
1226 schema_metadata_manager,
1227 1,
1228 )
1229 .await
1230 .unwrap();
1231 let output = output_rx.await.unwrap().unwrap();
1232 assert_eq!(output, 0);
1233 assert!(scheduler.region_status.is_empty());
1234 }
1235
1236 #[tokio::test]
1237 async fn test_schedule_on_finished() {
1238 common_telemetry::init_default_ut_logging();
1239 let job_scheduler = Arc::new(VecScheduler::default());
1240 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1241 let (tx, _rx) = mpsc::channel(4);
1242 let mut scheduler = env.mock_compaction_scheduler(tx);
1243 let mut builder = VersionControlBuilder::new();
1244 let purger = builder.file_purger();
1245 let region_id = builder.region_id();
1246
1247 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1248 schema_metadata_manager
1249 .register_region_table_info(
1250 builder.region_id().table_id(),
1251 "test_table",
1252 "test_catalog",
1253 "test_schema",
1254 None,
1255 kv_backend,
1256 )
1257 .await;
1258
1259 let end = 1000 * 1000;
1261 let version_control = Arc::new(
1262 builder
1263 .push_l0_file(0, end)
1264 .push_l0_file(10, end)
1265 .push_l0_file(50, end)
1266 .push_l0_file(80, end)
1267 .push_l0_file(90, end)
1268 .build(),
1269 );
1270 let manifest_ctx = env
1271 .mock_manifest_context(version_control.current().version.metadata.clone())
1272 .await;
1273 scheduler
1274 .schedule_compaction(
1275 region_id,
1276 compact_request::Options::Regular(Default::default()),
1277 &version_control,
1278 &env.access_layer,
1279 OptionOutputTx::none(),
1280 &manifest_ctx,
1281 schema_metadata_manager.clone(),
1282 1,
1283 )
1284 .await
1285 .unwrap();
1286 assert_eq!(1, scheduler.region_status.len());
1288 assert_eq!(1, job_scheduler.num_jobs());
1289 let data = version_control.current();
1290 let file_metas: Vec<_> = data.version.ssts.levels()[0]
1291 .files
1292 .values()
1293 .map(|file| file.meta_ref().clone())
1294 .collect();
1295
1296 apply_edit(
1298 &version_control,
1299 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1300 &file_metas,
1301 purger.clone(),
1302 );
1303 let (tx, _rx) = oneshot::channel();
1305 scheduler
1306 .schedule_compaction(
1307 region_id,
1308 compact_request::Options::Regular(Default::default()),
1309 &version_control,
1310 &env.access_layer,
1311 OptionOutputTx::new(Some(OutputTx::new(tx))),
1312 &manifest_ctx,
1313 schema_metadata_manager.clone(),
1314 1,
1315 )
1316 .await
1317 .unwrap();
1318 assert_eq!(1, scheduler.region_status.len());
1319 assert_eq!(1, job_scheduler.num_jobs());
1320 assert!(
1321 !scheduler
1322 .region_status
1323 .get(&builder.region_id())
1324 .unwrap()
1325 .waiters
1326 .is_empty()
1327 );
1328
1329 scheduler
1331 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1332 .await;
1333 assert_eq!(1, scheduler.region_status.len());
1334 assert_eq!(2, job_scheduler.num_jobs());
1335
1336 apply_edit(
1338 &version_control,
1339 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1340 &[],
1341 purger.clone(),
1342 );
1343 let (tx, _rx) = oneshot::channel();
1344 scheduler
1346 .schedule_compaction(
1347 region_id,
1348 compact_request::Options::Regular(Default::default()),
1349 &version_control,
1350 &env.access_layer,
1351 OptionOutputTx::new(Some(OutputTx::new(tx))),
1352 &manifest_ctx,
1353 schema_metadata_manager,
1354 1,
1355 )
1356 .await
1357 .unwrap();
1358 assert_eq!(2, job_scheduler.num_jobs());
1359 assert!(
1360 !scheduler
1361 .region_status
1362 .get(&builder.region_id())
1363 .unwrap()
1364 .waiters
1365 .is_empty()
1366 );
1367 }
1368
1369 #[tokio::test]
1370 async fn test_manual_compaction_when_compaction_in_progress() {
1371 common_telemetry::init_default_ut_logging();
1372 let job_scheduler = Arc::new(VecScheduler::default());
1373 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1374 let (tx, _rx) = mpsc::channel(4);
1375 let mut scheduler = env.mock_compaction_scheduler(tx);
1376 let mut builder = VersionControlBuilder::new();
1377 let purger = builder.file_purger();
1378 let region_id = builder.region_id();
1379
1380 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1381 schema_metadata_manager
1382 .register_region_table_info(
1383 builder.region_id().table_id(),
1384 "test_table",
1385 "test_catalog",
1386 "test_schema",
1387 None,
1388 kv_backend,
1389 )
1390 .await;
1391
1392 let end = 1000 * 1000;
1394 let version_control = Arc::new(
1395 builder
1396 .push_l0_file(0, end)
1397 .push_l0_file(10, end)
1398 .push_l0_file(50, end)
1399 .push_l0_file(80, end)
1400 .push_l0_file(90, end)
1401 .build(),
1402 );
1403 let manifest_ctx = env
1404 .mock_manifest_context(version_control.current().version.metadata.clone())
1405 .await;
1406
1407 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1408 .files
1409 .values()
1410 .map(|file| file.meta_ref().clone())
1411 .collect();
1412
1413 apply_edit(
1415 &version_control,
1416 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1417 &file_metas,
1418 purger.clone(),
1419 );
1420
1421 scheduler
1422 .schedule_compaction(
1423 region_id,
1424 compact_request::Options::Regular(Default::default()),
1425 &version_control,
1426 &env.access_layer,
1427 OptionOutputTx::none(),
1428 &manifest_ctx,
1429 schema_metadata_manager.clone(),
1430 1,
1431 )
1432 .await
1433 .unwrap();
1434 assert_eq!(1, scheduler.region_status.len());
1436 assert_eq!(1, job_scheduler.num_jobs());
1437 assert!(
1438 scheduler
1439 .region_status
1440 .get(®ion_id)
1441 .unwrap()
1442 .pending_request
1443 .is_none()
1444 );
1445
1446 let (tx, _rx) = oneshot::channel();
1448 scheduler
1449 .schedule_compaction(
1450 region_id,
1451 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1452 &version_control,
1453 &env.access_layer,
1454 OptionOutputTx::new(Some(OutputTx::new(tx))),
1455 &manifest_ctx,
1456 schema_metadata_manager.clone(),
1457 1,
1458 )
1459 .await
1460 .unwrap();
1461 assert_eq!(1, scheduler.region_status.len());
1462 assert_eq!(1, job_scheduler.num_jobs());
1464 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1465 assert!(status.pending_request.is_some());
1466
1467 scheduler
1469 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1470 .await;
1471 assert_eq!(1, scheduler.region_status.len());
1472 assert_eq!(2, job_scheduler.num_jobs());
1473
1474 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1475 assert!(status.pending_request.is_none());
1476 }
1477
1478 #[tokio::test]
1479 async fn test_compaction_bypass_in_staging_mode() {
1480 let env = SchedulerEnv::new().await;
1481 let (tx, _rx) = mpsc::channel(4);
1482 let mut scheduler = env.mock_compaction_scheduler(tx);
1483
1484 let builder = VersionControlBuilder::new();
1486 let version_control = Arc::new(builder.build());
1487 let region_id = version_control.current().version.metadata.region_id;
1488
1489 let staging_manifest_ctx = {
1491 let manager = RegionManifestManager::new(
1492 version_control.current().version.metadata.clone(),
1493 0,
1494 RegionManifestOptions {
1495 manifest_dir: "".to_string(),
1496 object_store: env.access_layer.object_store().clone(),
1497 compress_type: CompressionType::Uncompressed,
1498 checkpoint_distance: 10,
1499 remove_file_options: Default::default(),
1500 manifest_cache: None,
1501 },
1502 FormatType::PrimaryKey,
1503 &Default::default(),
1504 )
1505 .await
1506 .unwrap();
1507 Arc::new(ManifestContext::new(
1508 manager,
1509 RegionRoleState::Leader(RegionLeaderState::Staging),
1510 ))
1511 };
1512
1513 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1514
1515 let (tx, rx) = oneshot::channel();
1517 scheduler
1518 .schedule_compaction(
1519 region_id,
1520 compact_request::Options::Regular(Default::default()),
1521 &version_control,
1522 &env.access_layer,
1523 OptionOutputTx::new(Some(OutputTx::new(tx))),
1524 &staging_manifest_ctx,
1525 schema_metadata_manager,
1526 1,
1527 )
1528 .await
1529 .unwrap();
1530
1531 let result = rx.await.unwrap();
1532 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1534 }
1535
1536 #[tokio::test]
1537 async fn test_add_ddl_request_to_pending() {
1538 let env = SchedulerEnv::new().await;
1539 let (tx, _rx) = mpsc::channel(4);
1540 let mut scheduler = env.mock_compaction_scheduler(tx);
1541 let builder = VersionControlBuilder::new();
1542 let version_control = Arc::new(builder.build());
1543 let region_id = builder.region_id();
1544
1545 scheduler.region_status.insert(
1546 region_id,
1547 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1548 );
1549
1550 let (output_tx, _output_rx) = oneshot::channel();
1551 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1552 region_id,
1553 sender: OptionOutputTx::from(output_tx),
1554 request: crate::request::DdlRequest::EnterStaging(
1555 store_api::region_request::EnterStagingRequest {
1556 partition_directive:
1557 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1558 },
1559 ),
1560 });
1561
1562 assert!(scheduler.has_pending_ddls(region_id));
1563 }
1564
1565 #[tokio::test]
1566 async fn test_pending_ddl_request_failed_on_compaction_failed() {
1567 let env = SchedulerEnv::new().await;
1568 let (tx, _rx) = mpsc::channel(4);
1569 let mut scheduler = env.mock_compaction_scheduler(tx);
1570 let builder = VersionControlBuilder::new();
1571 let version_control = Arc::new(builder.build());
1572 let region_id = builder.region_id();
1573
1574 scheduler.region_status.insert(
1575 region_id,
1576 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1577 );
1578
1579 let (output_tx, output_rx) = oneshot::channel();
1580 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1581 region_id,
1582 sender: OptionOutputTx::from(output_tx),
1583 request: crate::request::DdlRequest::EnterStaging(
1584 store_api::region_request::EnterStagingRequest {
1585 partition_directive:
1586 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1587 },
1588 ),
1589 });
1590
1591 assert!(scheduler.has_pending_ddls(region_id));
1592 scheduler
1593 .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1594
1595 assert!(!scheduler.has_pending_ddls(region_id));
1596 let result = output_rx.await.unwrap();
1597 assert_matches!(result, Err(_));
1598 }
1599
1600 #[tokio::test]
1601 async fn test_pending_ddl_request_failed_on_region_closed() {
1602 let env = SchedulerEnv::new().await;
1603 let (tx, _rx) = mpsc::channel(4);
1604 let mut scheduler = env.mock_compaction_scheduler(tx);
1605 let builder = VersionControlBuilder::new();
1606 let version_control = Arc::new(builder.build());
1607 let region_id = builder.region_id();
1608
1609 scheduler.region_status.insert(
1610 region_id,
1611 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1612 );
1613
1614 let (output_tx, output_rx) = oneshot::channel();
1615 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1616 region_id,
1617 sender: OptionOutputTx::from(output_tx),
1618 request: crate::request::DdlRequest::EnterStaging(
1619 store_api::region_request::EnterStagingRequest {
1620 partition_directive:
1621 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1622 },
1623 ),
1624 });
1625
1626 assert!(scheduler.has_pending_ddls(region_id));
1627 scheduler.on_region_closed(region_id);
1628
1629 assert!(!scheduler.has_pending_ddls(region_id));
1630 let result = output_rx.await.unwrap();
1631 assert_matches!(result, Err(_));
1632 }
1633
1634 #[tokio::test]
1635 async fn test_pending_ddl_request_failed_on_region_dropped() {
1636 let env = SchedulerEnv::new().await;
1637 let (tx, _rx) = mpsc::channel(4);
1638 let mut scheduler = env.mock_compaction_scheduler(tx);
1639 let builder = VersionControlBuilder::new();
1640 let version_control = Arc::new(builder.build());
1641 let region_id = builder.region_id();
1642
1643 scheduler.region_status.insert(
1644 region_id,
1645 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1646 );
1647
1648 let (output_tx, output_rx) = oneshot::channel();
1649 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1650 region_id,
1651 sender: OptionOutputTx::from(output_tx),
1652 request: crate::request::DdlRequest::EnterStaging(
1653 store_api::region_request::EnterStagingRequest {
1654 partition_directive:
1655 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1656 },
1657 ),
1658 });
1659
1660 assert!(scheduler.has_pending_ddls(region_id));
1661 scheduler.on_region_dropped(region_id);
1662
1663 assert!(!scheduler.has_pending_ddls(region_id));
1664 let result = output_rx.await.unwrap();
1665 assert_matches!(result, Err(_));
1666 }
1667
1668 #[tokio::test]
1669 async fn test_pending_ddl_request_failed_on_region_truncated() {
1670 let env = SchedulerEnv::new().await;
1671 let (tx, _rx) = mpsc::channel(4);
1672 let mut scheduler = env.mock_compaction_scheduler(tx);
1673 let builder = VersionControlBuilder::new();
1674 let version_control = Arc::new(builder.build());
1675 let region_id = builder.region_id();
1676
1677 scheduler.region_status.insert(
1678 region_id,
1679 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1680 );
1681
1682 let (output_tx, output_rx) = oneshot::channel();
1683 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1684 region_id,
1685 sender: OptionOutputTx::from(output_tx),
1686 request: crate::request::DdlRequest::EnterStaging(
1687 store_api::region_request::EnterStagingRequest {
1688 partition_directive:
1689 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1690 },
1691 ),
1692 });
1693
1694 assert!(scheduler.has_pending_ddls(region_id));
1695 scheduler.on_region_truncated(region_id);
1696
1697 assert!(!scheduler.has_pending_ddls(region_id));
1698 let result = output_rx.await.unwrap();
1699 assert_matches!(result, Err(_));
1700 }
1701
1702 #[tokio::test]
1703 async fn test_on_compaction_finished_returns_pending_ddl_requests() {
1704 let job_scheduler = Arc::new(VecScheduler::default());
1705 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1706 let (tx, _rx) = mpsc::channel(4);
1707 let mut scheduler = env.mock_compaction_scheduler(tx);
1708 let builder = VersionControlBuilder::new();
1709 let version_control = Arc::new(builder.build());
1710 let region_id = builder.region_id();
1711 let manifest_ctx = env
1712 .mock_manifest_context(version_control.current().version.metadata.clone())
1713 .await;
1714 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1715
1716 scheduler.region_status.insert(
1717 region_id,
1718 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1719 );
1720
1721 let (output_tx, _output_rx) = oneshot::channel();
1722 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1723 region_id,
1724 sender: OptionOutputTx::from(output_tx),
1725 request: crate::request::DdlRequest::EnterStaging(
1726 store_api::region_request::EnterStagingRequest {
1727 partition_directive:
1728 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1729 },
1730 ),
1731 });
1732
1733 let pending_ddls = scheduler
1734 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1735 .await;
1736
1737 assert_eq!(pending_ddls.len(), 1);
1738 assert!(!scheduler.has_pending_ddls(region_id));
1739 assert!(!scheduler.region_status.contains_key(®ion_id));
1740 assert_eq!(job_scheduler.num_jobs(), 0);
1741 }
1742
1743 #[tokio::test]
1744 async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
1745 let env = SchedulerEnv::new().await;
1746 let (tx, _rx) = mpsc::channel(4);
1747 let mut scheduler = env.mock_compaction_scheduler(tx);
1748 let builder = VersionControlBuilder::new();
1749 let version_control = Arc::new(builder.build());
1750 let region_id = builder.region_id();
1751 let manifest_ctx = env
1752 .mock_manifest_context(version_control.current().version.metadata.clone())
1753 .await;
1754 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1755
1756 let (manual_tx, manual_rx) = oneshot::channel();
1757 let mut status =
1758 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
1759 status.set_pending_request(PendingCompaction {
1760 options: compact_request::Options::Regular(Default::default()),
1761 waiter: OptionOutputTx::from(manual_tx),
1762 max_parallelism: 1,
1763 });
1764 scheduler.region_status.insert(region_id, status);
1765
1766 let (ddl_tx, _ddl_rx) = oneshot::channel();
1767 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1768 region_id,
1769 sender: OptionOutputTx::from(ddl_tx),
1770 request: crate::request::DdlRequest::EnterStaging(
1771 store_api::region_request::EnterStagingRequest {
1772 partition_directive:
1773 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1774 },
1775 ),
1776 });
1777
1778 let pending_ddls = scheduler
1779 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1780 .await;
1781
1782 assert_eq!(pending_ddls.len(), 1);
1783 assert!(!scheduler.region_status.contains_key(®ion_id));
1784 assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
1785 }
1786
1787 #[tokio::test]
1788 async fn test_on_compaction_finished_returns_empty_when_region_absent() {
1789 let env = SchedulerEnv::new().await;
1790 let (tx, _rx) = mpsc::channel(4);
1791 let mut scheduler = env.mock_compaction_scheduler(tx);
1792 let builder = VersionControlBuilder::new();
1793 let region_id = builder.region_id();
1794 let version_control = Arc::new(builder.build());
1795 let manifest_ctx = env
1796 .mock_manifest_context(version_control.current().version.metadata.clone())
1797 .await;
1798 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1799
1800 let pending_ddls = scheduler
1801 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1802 .await;
1803
1804 assert!(pending_ddls.is_empty());
1805 }
1806
1807 #[tokio::test]
1808 async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
1809 let env = SchedulerEnv::new()
1810 .await
1811 .scheduler(Arc::new(FailingScheduler));
1812 let (tx, _rx) = mpsc::channel(4);
1813 let mut scheduler = env.mock_compaction_scheduler(tx);
1814 let mut builder = VersionControlBuilder::new();
1815 let end = 1000 * 1000;
1816 let version_control = Arc::new(
1817 builder
1818 .push_l0_file(0, end)
1819 .push_l0_file(10, end)
1820 .push_l0_file(50, end)
1821 .push_l0_file(80, end)
1822 .push_l0_file(90, end)
1823 .build(),
1824 );
1825 let region_id = builder.region_id();
1826 let manifest_ctx = env
1827 .mock_manifest_context(version_control.current().version.metadata.clone())
1828 .await;
1829 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1830
1831 let (manual_tx, manual_rx) = oneshot::channel();
1832 let mut status =
1833 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
1834 status.set_pending_request(PendingCompaction {
1835 options: compact_request::Options::Regular(Default::default()),
1836 waiter: OptionOutputTx::from(manual_tx),
1837 max_parallelism: 1,
1838 });
1839 scheduler.region_status.insert(region_id, status);
1840
1841 let (ddl_tx, ddl_rx) = oneshot::channel();
1842 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1843 region_id,
1844 sender: OptionOutputTx::from(ddl_tx),
1845 request: crate::request::DdlRequest::EnterStaging(
1846 store_api::region_request::EnterStagingRequest {
1847 partition_directive:
1848 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1849 },
1850 ),
1851 });
1852
1853 let pending_ddls = scheduler
1854 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1855 .await;
1856
1857 assert!(pending_ddls.is_empty());
1858 assert!(!scheduler.region_status.contains_key(®ion_id));
1859 assert!(manual_rx.await.is_err());
1860 assert_matches!(ddl_rx.await.unwrap(), Err(_));
1861 }
1862
1863 #[tokio::test]
1864 async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
1865 let env = SchedulerEnv::new().await;
1866 let (tx, _rx) = mpsc::channel(4);
1867 let mut scheduler = env.mock_compaction_scheduler(tx);
1868 let builder = VersionControlBuilder::new();
1869 let version_control = Arc::new(builder.build());
1870 let region_id = builder.region_id();
1871 let manifest_ctx = env
1872 .mock_manifest_context(version_control.current().version.metadata.clone())
1873 .await;
1874 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1875
1876 scheduler.region_status.insert(
1877 region_id,
1878 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1879 );
1880
1881 let pending_ddls = scheduler
1882 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1883 .await;
1884
1885 assert!(pending_ddls.is_empty());
1886 assert!(!scheduler.region_status.contains_key(®ion_id));
1887 }
1888
1889 #[tokio::test]
1890 async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
1891 let env = SchedulerEnv::new()
1892 .await
1893 .scheduler(Arc::new(FailingScheduler));
1894 let (tx, _rx) = mpsc::channel(4);
1895 let mut scheduler = env.mock_compaction_scheduler(tx);
1896 let mut builder = VersionControlBuilder::new();
1897 let end = 1000 * 1000;
1898 let version_control = Arc::new(
1899 builder
1900 .push_l0_file(0, end)
1901 .push_l0_file(10, end)
1902 .push_l0_file(50, end)
1903 .push_l0_file(80, end)
1904 .push_l0_file(90, end)
1905 .build(),
1906 );
1907 let region_id = builder.region_id();
1908 let manifest_ctx = env
1909 .mock_manifest_context(version_control.current().version.metadata.clone())
1910 .await;
1911 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1912
1913 scheduler.region_status.insert(
1914 region_id,
1915 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1916 );
1917
1918 let pending_ddls = scheduler
1919 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
1920 .await;
1921
1922 assert!(pending_ddls.is_empty());
1923 assert!(!scheduler.region_status.contains_key(®ion_id));
1924 }
1925
1926 #[tokio::test]
1927 async fn test_concurrent_memory_competition() {
1928 let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); let barrier = Arc::new(Barrier::new(3));
1930 let mut handles = vec![];
1931
1932 for _i in 0..3 {
1934 let mgr = manager.clone();
1935 let bar = barrier.clone();
1936 let handle = tokio::spawn(async move {
1937 bar.wait().await; mgr.try_acquire(2 * 1024 * 1024)
1939 });
1940 handles.push(handle);
1941 }
1942
1943 let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1944 .await
1945 .into_iter()
1946 .map(|r| r.unwrap())
1947 .collect();
1948
1949 let succeeded = results.iter().filter(|r| r.is_some()).count();
1951 let failed = results.iter().filter(|r| r.is_none()).count();
1952
1953 assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1954 assert_eq!(failed, 2, "Expected 2 tasks to fail");
1955
1956 drop(results);
1958 assert_eq!(manager.used_bytes(), 0);
1959 }
1960}