1mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::projection::ProjectionMapper;
62use crate::read::scan_region::{PredicateGroup, ScanInput};
63use crate::read::seq_scan::SeqScan;
64use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
65use crate::region::options::{MergeMode, RegionOptions};
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77pub struct CompactionRequest {
79 pub(crate) engine_config: Arc<MitoConfig>,
80 pub(crate) current_version: CompactionVersion,
81 pub(crate) access_layer: AccessLayerRef,
82 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84 pub(crate) waiters: Vec<OutputTx>,
86 pub(crate) start_time: Instant,
88 pub(crate) cache_manager: CacheManagerRef,
89 pub(crate) manifest_ctx: ManifestContextRef,
90 pub(crate) listener: WorkerListener,
91 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92 pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96 pub(crate) fn region_id(&self) -> RegionId {
97 self.current_version.metadata.region_id
98 }
99}
100
101pub(crate) struct CompactionScheduler {
103 scheduler: SchedulerRef,
104 region_status: HashMap<RegionId, CompactionStatus>,
106 request_sender: Sender<WorkerRequestWithTime>,
108 cache_manager: CacheManagerRef,
109 engine_config: Arc<MitoConfig>,
110 memory_manager: Arc<CompactionMemoryManager>,
111 memory_policy: OnExhaustedPolicy,
112 listener: WorkerListener,
113 plugins: Plugins,
115}
116
117impl CompactionScheduler {
118 #[allow(clippy::too_many_arguments)]
119 pub(crate) fn new(
120 scheduler: SchedulerRef,
121 request_sender: Sender<WorkerRequestWithTime>,
122 cache_manager: CacheManagerRef,
123 engine_config: Arc<MitoConfig>,
124 listener: WorkerListener,
125 plugins: Plugins,
126 memory_manager: Arc<CompactionMemoryManager>,
127 memory_policy: OnExhaustedPolicy,
128 ) -> Self {
129 Self {
130 scheduler,
131 region_status: HashMap::new(),
132 request_sender,
133 cache_manager,
134 engine_config,
135 memory_manager,
136 memory_policy,
137 listener,
138 plugins,
139 }
140 }
141
142 #[allow(clippy::too_many_arguments)]
144 pub(crate) async fn schedule_compaction(
145 &mut self,
146 region_id: RegionId,
147 compact_options: compact_request::Options,
148 version_control: &VersionControlRef,
149 access_layer: &AccessLayerRef,
150 waiter: OptionOutputTx,
151 manifest_ctx: &ManifestContextRef,
152 schema_metadata_manager: SchemaMetadataManagerRef,
153 max_parallelism: usize,
154 ) -> Result<()> {
155 let current_state = manifest_ctx.current_state();
157 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158 info!(
159 "Skipping compaction for region {} in staging mode, options: {:?}",
160 region_id, compact_options
161 );
162 waiter.send(Ok(0));
163 return Ok(());
164 }
165
166 if let Some(status) = self.region_status.get_mut(®ion_id) {
167 match compact_options {
168 Options::Regular(_) => {
169 status.merge_waiter(waiter);
171 }
172 options @ Options::StrictWindow(_) => {
173 status.set_pending_request(PendingCompaction {
175 options,
176 waiter,
177 max_parallelism,
178 });
179 info!(
180 "Region {} is compacting, manually compaction will be re-scheduled.",
181 region_id
182 );
183 }
184 }
185 return Ok(());
186 }
187
188 let mut status =
190 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191 let request = status.new_compaction_request(
192 self.request_sender.clone(),
193 waiter,
194 self.engine_config.clone(),
195 self.cache_manager.clone(),
196 manifest_ctx,
197 self.listener.clone(),
198 schema_metadata_manager,
199 max_parallelism,
200 );
201 self.region_status.insert(region_id, status);
202 let result = self
203 .schedule_compaction_request(request, compact_options)
204 .await;
205
206 self.listener.on_compaction_scheduled(region_id);
207 result
208 }
209
210 pub(crate) async fn on_compaction_finished(
212 &mut self,
213 region_id: RegionId,
214 manifest_ctx: &ManifestContextRef,
215 schema_metadata_manager: SchemaMetadataManagerRef,
216 ) {
217 let Some(status) = self.region_status.get_mut(®ion_id) else {
218 return;
219 };
220
221 if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
222 let PendingCompaction {
223 options,
224 waiter,
225 max_parallelism,
226 } = pending_request;
227
228 let request = status.new_compaction_request(
229 self.request_sender.clone(),
230 waiter,
231 self.engine_config.clone(),
232 self.cache_manager.clone(),
233 manifest_ctx,
234 self.listener.clone(),
235 schema_metadata_manager,
236 max_parallelism,
237 );
238
239 if let Err(e) = self.schedule_compaction_request(request, options).await {
240 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
241 } else {
242 debug!(
243 "Successfully scheduled manual compaction for region id: {}",
244 region_id
245 );
246 }
247 return;
248 }
249
250 let request = status.new_compaction_request(
252 self.request_sender.clone(),
253 OptionOutputTx::none(),
254 self.engine_config.clone(),
255 self.cache_manager.clone(),
256 manifest_ctx,
257 self.listener.clone(),
258 schema_metadata_manager,
259 MAX_PARALLEL_COMPACTION,
260 );
261 if let Err(e) = self
263 .schedule_compaction_request(
264 request,
265 compact_request::Options::Regular(Default::default()),
266 )
267 .await
268 {
269 error!(e; "Failed to schedule next compaction for region {}", region_id);
270 }
271 }
272
273 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
275 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
276 let Some(status) = self.region_status.remove(®ion_id) else {
278 return;
279 };
280
281 status.on_failure(err);
283 }
284
285 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
287 self.remove_region_on_failure(
288 region_id,
289 Arc::new(RegionDroppedSnafu { region_id }.build()),
290 );
291 }
292
293 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
295 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
296 }
297
298 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
300 self.remove_region_on_failure(
301 region_id,
302 Arc::new(RegionTruncatedSnafu { region_id }.build()),
303 );
304 }
305
306 async fn schedule_compaction_request(
310 &mut self,
311 request: CompactionRequest,
312 options: compact_request::Options,
313 ) -> Result<()> {
314 let region_id = request.region_id();
315 let (dynamic_compaction_opts, ttl) = find_dynamic_options(
316 region_id.table_id(),
317 &request.current_version.options,
318 &request.schema_metadata_manager,
319 )
320 .await
321 .unwrap_or_else(|e| {
322 warn!(e; "Failed to find dynamic options for region: {}", region_id);
323 (
324 request.current_version.options.compaction.clone(),
325 request.current_version.options.ttl.unwrap_or_default(),
326 )
327 });
328
329 let picker = new_picker(
330 &options,
331 &dynamic_compaction_opts,
332 request.current_version.options.append_mode,
333 Some(self.engine_config.max_background_compactions),
334 );
335 let region_id = request.region_id();
336 let CompactionRequest {
337 engine_config,
338 current_version,
339 access_layer,
340 request_sender,
341 waiters,
342 start_time,
343 cache_manager,
344 manifest_ctx,
345 listener,
346 schema_metadata_manager: _,
347 max_parallelism,
348 } = request;
349
350 debug!(
351 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
352 picker, region_id, ttl
353 );
354
355 let compaction_region = CompactionRegion {
356 region_id,
357 current_version: current_version.clone(),
358 region_options: RegionOptions {
359 compaction: dynamic_compaction_opts.clone(),
360 ..current_version.options.clone()
361 },
362 engine_config: engine_config.clone(),
363 region_metadata: current_version.metadata.clone(),
364 cache_manager: cache_manager.clone(),
365 access_layer: access_layer.clone(),
366 manifest_ctx: manifest_ctx.clone(),
367 file_purger: None,
368 ttl: Some(ttl),
369 max_parallelism,
370 };
371
372 let picker_output = {
373 let _pick_timer = COMPACTION_STAGE_ELAPSED
374 .with_label_values(&["pick"])
375 .start_timer();
376 picker.pick(&compaction_region)
377 };
378
379 let picker_output = if let Some(picker_output) = picker_output {
380 picker_output
381 } else {
382 for waiter in waiters {
384 waiter.send(Ok(0));
385 }
386 self.region_status.remove(®ion_id);
387 return Ok(());
388 };
389
390 let waiters = if dynamic_compaction_opts.remote_compaction() {
393 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
394 let remote_compaction_job = CompactionJob {
395 compaction_region: compaction_region.clone(),
396 picker_output: picker_output.clone(),
397 start_time,
398 waiters,
399 ttl,
400 };
401
402 let result = remote_job_scheduler
403 .schedule(
404 RemoteJob::CompactionJob(remote_compaction_job),
405 Box::new(DefaultNotifier {
406 request_sender: request_sender.clone(),
407 }),
408 )
409 .await;
410
411 match result {
412 Ok(job_id) => {
413 info!(
414 "Scheduled remote compaction job {} for region {}",
415 job_id, region_id
416 );
417 INFLIGHT_COMPACTION_COUNT.inc();
418 return Ok(());
419 }
420 Err(e) => {
421 if !dynamic_compaction_opts.fallback_to_local() {
422 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
423 return RemoteCompactionSnafu {
424 region_id,
425 job_id: None,
426 reason: e.reason,
427 }
428 .fail();
429 }
430
431 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
432
433 e.waiters
435 }
436 }
437 } else {
438 debug!(
439 "Remote compaction is not enabled, fallback to local compaction for region {}",
440 region_id
441 );
442 waiters
443 }
444 } else {
445 waiters
446 };
447
448 let estimated_bytes = estimate_compaction_bytes(&picker_output);
450 let local_compaction_task = Box::new(CompactionTaskImpl {
451 request_sender,
452 waiters,
453 start_time,
454 listener,
455 picker_output,
456 compaction_region,
457 compactor: Arc::new(DefaultCompactor {}),
458 memory_manager: self.memory_manager.clone(),
459 memory_policy: self.memory_policy,
460 estimated_memory_bytes: estimated_bytes,
461 });
462
463 self.submit_compaction_task(local_compaction_task, region_id)
464 }
465
466 fn submit_compaction_task(
467 &mut self,
468 mut task: Box<CompactionTaskImpl>,
469 region_id: RegionId,
470 ) -> Result<()> {
471 self.scheduler
472 .schedule(Box::pin(async move {
473 INFLIGHT_COMPACTION_COUNT.inc();
474 task.run().await;
475 INFLIGHT_COMPACTION_COUNT.dec();
476 }))
477 .map_err(|e| {
478 error!(e; "Failed to submit compaction request for region {}", region_id);
479 self.region_status.remove(®ion_id);
480 e
481 })
482 }
483
484 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
485 let Some(status) = self.region_status.remove(®ion_id) else {
487 return;
488 };
489
490 status.on_failure(err);
492 }
493}
494
495impl Drop for CompactionScheduler {
496 fn drop(&mut self) {
497 for (region_id, status) in self.region_status.drain() {
498 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
500 }
501 }
502}
503
504async fn find_dynamic_options(
506 table_id: TableId,
507 region_options: &crate::region::options::RegionOptions,
508 schema_metadata_manager: &SchemaMetadataManagerRef,
509) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
510 if region_options.compaction_override && region_options.ttl.is_some() {
511 debug!(
512 "Use region options directly for table {}: compaction={:?}, ttl={:?}",
513 table_id, region_options.compaction, region_options.ttl
514 );
515 return Ok((
516 region_options.compaction.clone(),
517 region_options.ttl.unwrap(),
518 ));
519 }
520
521 let db_options = tokio::time::timeout(
522 crate::config::FETCH_OPTION_TIMEOUT,
523 schema_metadata_manager.get_schema_options_by_table_id(table_id),
524 )
525 .await
526 .context(TimeoutSnafu)?
527 .context(GetSchemaMetadataSnafu)?;
528
529 let ttl = if region_options.ttl.is_some() {
530 debug!(
531 "Use region TTL directly for table {}: ttl={:?}",
532 table_id, region_options.ttl
533 );
534 region_options.ttl.unwrap()
535 } else {
536 db_options
537 .as_ref()
538 .and_then(|options| options.ttl)
539 .unwrap_or_default()
540 .into()
541 };
542
543 let compaction = if !region_options.compaction_override {
544 if let Some(schema_opts) = db_options {
545 let map: HashMap<String, String> = schema_opts
546 .extra_options
547 .iter()
548 .filter_map(|(k, v)| {
549 if k.starts_with("compaction.") {
550 Some((k.clone(), v.clone()))
551 } else {
552 None
553 }
554 })
555 .collect();
556 if map.is_empty() {
557 region_options.compaction.clone()
558 } else {
559 crate::region::options::RegionOptions::try_from(&map)
560 .map(|o| o.compaction)
561 .unwrap_or_else(|e| {
562 error!(e; "Failed to create RegionOptions from map");
563 region_options.compaction.clone()
564 })
565 }
566 } else {
567 debug!(
568 "DB options is None for table {}, use region compaction: compaction={:?}",
569 table_id, region_options.compaction
570 );
571 region_options.compaction.clone()
572 }
573 } else {
574 debug!(
575 "No schema options for table {}, use region compaction: compaction={:?}",
576 table_id, region_options.compaction
577 );
578 region_options.compaction.clone()
579 };
580
581 debug!(
582 "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
583 table_id, compaction, ttl
584 );
585 Ok((compaction, ttl))
586}
587
588struct CompactionStatus {
590 region_id: RegionId,
592 version_control: VersionControlRef,
594 access_layer: AccessLayerRef,
596 waiters: Vec<OutputTx>,
598 pending_request: Option<PendingCompaction>,
600}
601
602impl CompactionStatus {
603 fn new(
605 region_id: RegionId,
606 version_control: VersionControlRef,
607 access_layer: AccessLayerRef,
608 ) -> CompactionStatus {
609 CompactionStatus {
610 region_id,
611 version_control,
612 access_layer,
613 waiters: Vec::new(),
614 pending_request: None,
615 }
616 }
617
618 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
620 if let Some(waiter) = waiter.take_inner() {
621 self.waiters.push(waiter);
622 }
623 }
624
625 fn set_pending_request(&mut self, pending: PendingCompaction) {
627 if let Some(prev) = self.pending_request.replace(pending) {
628 debug!(
629 "Replace pending compaction options with new request {:?} for region: {}",
630 prev.options, self.region_id
631 );
632 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
633 }
634 }
635
636 fn on_failure(mut self, err: Arc<Error>) {
637 for waiter in self.waiters.drain(..) {
638 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
639 region_id: self.region_id,
640 }));
641 }
642
643 if let Some(pending_compaction) = self.pending_request {
644 pending_compaction
645 .waiter
646 .send(Err(err.clone()).context(CompactRegionSnafu {
647 region_id: self.region_id,
648 }));
649 }
650 }
651
652 #[allow(clippy::too_many_arguments)]
656 fn new_compaction_request(
657 &mut self,
658 request_sender: Sender<WorkerRequestWithTime>,
659 mut waiter: OptionOutputTx,
660 engine_config: Arc<MitoConfig>,
661 cache_manager: CacheManagerRef,
662 manifest_ctx: &ManifestContextRef,
663 listener: WorkerListener,
664 schema_metadata_manager: SchemaMetadataManagerRef,
665 max_parallelism: usize,
666 ) -> CompactionRequest {
667 let current_version = CompactionVersion::from(self.version_control.current().version);
668 let start_time = Instant::now();
669 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
670 waiters.extend(std::mem::take(&mut self.waiters));
671
672 if let Some(waiter) = waiter.take_inner() {
673 waiters.push(waiter);
674 }
675
676 CompactionRequest {
677 engine_config,
678 current_version,
679 access_layer: self.access_layer.clone(),
680 request_sender: request_sender.clone(),
681 waiters,
682 start_time,
683 cache_manager,
684 manifest_ctx: manifest_ctx.clone(),
685 listener,
686 schema_metadata_manager,
687 max_parallelism,
688 }
689 }
690}
691
692#[derive(Debug, Clone)]
693pub struct CompactionOutput {
694 pub output_level: Level,
696 pub inputs: Vec<FileHandle>,
698 pub filter_deleted: bool,
700 pub output_time_range: Option<TimestampRange>,
702}
703
704#[derive(Debug, Clone, Serialize, Deserialize)]
706pub struct SerializedCompactionOutput {
707 output_level: Level,
708 inputs: Vec<FileMeta>,
709 filter_deleted: bool,
710 output_time_range: Option<TimestampRange>,
711}
712
713struct CompactionSstReaderBuilder<'a> {
715 metadata: RegionMetadataRef,
716 sst_layer: AccessLayerRef,
717 cache: CacheManagerRef,
718 inputs: &'a [FileHandle],
719 append_mode: bool,
720 filter_deleted: bool,
721 time_range: Option<TimestampRange>,
722 merge_mode: MergeMode,
723}
724
725impl CompactionSstReaderBuilder<'_> {
726 async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
728 let scan_input = self.build_scan_input(false)?.with_compaction(true);
729
730 SeqScan::new(scan_input).build_reader_for_compaction().await
731 }
732
733 async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
735 let scan_input = self.build_scan_input(true)?.with_compaction(true);
736
737 SeqScan::new(scan_input)
738 .build_flat_reader_for_compaction()
739 .await
740 }
741
742 fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
743 let mapper = ProjectionMapper::all(&self.metadata, flat_format)?;
744 let mut scan_input = ScanInput::new(self.sst_layer, mapper)
745 .with_files(self.inputs.to_vec())
746 .with_append_mode(self.append_mode)
747 .with_cache(CacheStrategy::Compaction(self.cache))
749 .with_filter_deleted(self.filter_deleted)
750 .with_ignore_file_not_found(true)
752 .with_merge_mode(self.merge_mode)
753 .with_flat_format(flat_format);
754
755 if let Some(time_range) = self.time_range {
758 scan_input =
759 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
760 }
761
762 Ok(scan_input)
763 }
764}
765
766fn time_range_to_predicate(
768 range: TimestampRange,
769 metadata: &RegionMetadataRef,
770) -> Result<PredicateGroup> {
771 let ts_col = metadata.time_index_column();
772
773 let ts_col_unit = ts_col
775 .column_schema
776 .data_type
777 .as_timestamp()
778 .unwrap()
779 .unit();
780
781 let exprs = match (range.start(), range.end()) {
782 (Some(start), Some(end)) => {
783 vec![
784 datafusion_expr::col(ts_col.column_schema.name.clone())
785 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
786 datafusion_expr::col(ts_col.column_schema.name.clone())
787 .lt(ts_to_lit(*end, ts_col_unit)?),
788 ]
789 }
790 (Some(start), None) => {
791 vec![
792 datafusion_expr::col(ts_col.column_schema.name.clone())
793 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
794 ]
795 }
796
797 (None, Some(end)) => {
798 vec![
799 datafusion_expr::col(ts_col.column_schema.name.clone())
800 .lt(ts_to_lit(*end, ts_col_unit)?),
801 ]
802 }
803 (None, None) => {
804 return Ok(PredicateGroup::default());
805 }
806 };
807
808 let predicate = PredicateGroup::new(metadata, &exprs)?;
809 Ok(predicate)
810}
811
812fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
813 let ts = ts
814 .convert_to(ts_col_unit)
815 .context(TimeRangePredicateOverflowSnafu {
816 timestamp: ts,
817 unit: ts_col_unit,
818 })?;
819 let val = ts.value();
820 let scalar_value = match ts_col_unit {
821 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
822 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
823 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
824 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
825 };
826 Ok(datafusion_expr::lit(scalar_value))
827}
828
829fn get_expired_ssts(
831 levels: &[LevelMeta],
832 ttl: Option<TimeToLive>,
833 now: Timestamp,
834) -> Vec<FileHandle> {
835 let Some(ttl) = ttl else {
836 return vec![];
837 };
838
839 levels
840 .iter()
841 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
842 .collect()
843}
844
845fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
848 picker_output
849 .outputs
850 .iter()
851 .flat_map(|output| output.inputs.iter())
852 .map(|file: &FileHandle| {
853 let meta = file.meta_ref();
854 meta.max_row_group_uncompressed_size
855 })
856 .sum()
857}
858
859struct PendingCompaction {
862 pub(crate) options: compact_request::Options,
864 pub(crate) waiter: OptionOutputTx,
866 pub(crate) max_parallelism: usize,
868}
869
870#[cfg(test)]
871mod tests {
872 use std::time::Duration;
873
874 use api::v1::region::StrictWindow;
875 use common_datasource::compression::CompressionType;
876 use common_meta::key::schema_name::SchemaNameValue;
877 use common_time::DatabaseTimeToLive;
878 use tokio::sync::{Barrier, oneshot};
879
880 use super::*;
881 use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
882 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
883 use crate::region::ManifestContext;
884 use crate::sst::FormatType;
885 use crate::test_util::mock_schema_metadata_manager;
886 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
887 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
888
889 #[tokio::test]
890 async fn test_find_compaction_options_db_level() {
891 let env = SchedulerEnv::new().await;
892 let builder = VersionControlBuilder::new();
893 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
894 let region_id = builder.region_id();
895 let table_id = region_id.table_id();
896 let mut schema_value = SchemaNameValue {
898 ttl: Some(DatabaseTimeToLive::default()),
899 ..Default::default()
900 };
901 schema_value
902 .extra_options
903 .insert("compaction.type".to_string(), "twcs".to_string());
904 schema_value
905 .extra_options
906 .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
907 schema_metadata_manager
908 .register_region_table_info(
909 table_id,
910 "t",
911 "c",
912 "s",
913 Some(schema_value),
914 kv_backend.clone(),
915 )
916 .await;
917
918 let version_control = Arc::new(builder.build());
919 let region_opts = version_control.current().version.options.clone();
920 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
921 .await
922 .unwrap();
923 match opts {
924 crate::region::options::CompactionOptions::Twcs(t) => {
925 assert_eq!(t.time_window_seconds(), Some(2 * 3600));
926 }
927 }
928 let manifest_ctx = env
929 .mock_manifest_context(version_control.current().version.metadata.clone())
930 .await;
931 let (tx, _rx) = mpsc::channel(4);
932 let mut scheduler = env.mock_compaction_scheduler(tx);
933 let (otx, _orx) = oneshot::channel();
934 let request = scheduler
935 .region_status
936 .entry(region_id)
937 .or_insert_with(|| {
938 crate::compaction::CompactionStatus::new(
939 region_id,
940 version_control.clone(),
941 env.access_layer.clone(),
942 )
943 })
944 .new_compaction_request(
945 scheduler.request_sender.clone(),
946 OptionOutputTx::new(Some(OutputTx::new(otx))),
947 scheduler.engine_config.clone(),
948 scheduler.cache_manager.clone(),
949 &manifest_ctx,
950 scheduler.listener.clone(),
951 schema_metadata_manager.clone(),
952 1,
953 );
954 scheduler
955 .schedule_compaction_request(
956 request,
957 compact_request::Options::Regular(Default::default()),
958 )
959 .await
960 .unwrap();
961 }
962
963 #[tokio::test]
964 async fn test_find_compaction_options_priority() {
965 fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
966 let mut schema_value = SchemaNameValue {
967 ttl: Some(DatabaseTimeToLive::default()),
968 ..Default::default()
969 };
970 schema_value
971 .extra_options
972 .insert("compaction.type".to_string(), "twcs".to_string());
973 schema_value.extra_options.insert(
974 "compaction.twcs.time_window".to_string(),
975 time_window.to_string(),
976 );
977 schema_value
978 }
979
980 let cases = [
981 (
982 "db options set and table override set",
983 Some(schema_value_with_twcs("2h")),
984 true,
985 Some(Duration::from_secs(5 * 3600)),
986 Some(5 * 3600),
987 ),
988 (
989 "db options set and table override not set",
990 Some(schema_value_with_twcs("2h")),
991 false,
992 None,
993 Some(2 * 3600),
994 ),
995 (
996 "db options not set and table override set",
997 None,
998 true,
999 Some(Duration::from_secs(4 * 3600)),
1000 Some(4 * 3600),
1001 ),
1002 (
1003 "db options not set and table override not set",
1004 None,
1005 false,
1006 None,
1007 None,
1008 ),
1009 ];
1010
1011 for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1012 let builder = VersionControlBuilder::new();
1013 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1014 let table_id = builder.region_id().table_id();
1015 schema_metadata_manager
1016 .register_region_table_info(
1017 table_id,
1018 "t",
1019 "c",
1020 "s",
1021 schema_value,
1022 kv_backend.clone(),
1023 )
1024 .await;
1025
1026 let version_control = Arc::new(builder.build());
1027 let mut region_opts = version_control.current().version.options.clone();
1028 region_opts.compaction_override = override_set;
1029 if let Some(window) = table_window {
1030 let crate::region::options::CompactionOptions::Twcs(twcs) =
1031 &mut region_opts.compaction;
1032 twcs.time_window = Some(window);
1033 }
1034
1035 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
1036 .await
1037 .unwrap();
1038 match opts {
1039 crate::region::options::CompactionOptions::Twcs(t) => {
1040 assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1041 }
1042 }
1043 }
1044 }
1045
1046 #[tokio::test]
1047 async fn test_schedule_empty() {
1048 let env = SchedulerEnv::new().await;
1049 let (tx, _rx) = mpsc::channel(4);
1050 let mut scheduler = env.mock_compaction_scheduler(tx);
1051 let mut builder = VersionControlBuilder::new();
1052 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1053 schema_metadata_manager
1054 .register_region_table_info(
1055 builder.region_id().table_id(),
1056 "test_table",
1057 "test_catalog",
1058 "test_schema",
1059 None,
1060 kv_backend,
1061 )
1062 .await;
1063 let version_control = Arc::new(builder.build());
1065 let (output_tx, output_rx) = oneshot::channel();
1066 let waiter = OptionOutputTx::from(output_tx);
1067 let manifest_ctx = env
1068 .mock_manifest_context(version_control.current().version.metadata.clone())
1069 .await;
1070 scheduler
1071 .schedule_compaction(
1072 builder.region_id(),
1073 compact_request::Options::Regular(Default::default()),
1074 &version_control,
1075 &env.access_layer,
1076 waiter,
1077 &manifest_ctx,
1078 schema_metadata_manager.clone(),
1079 1,
1080 )
1081 .await
1082 .unwrap();
1083 let output = output_rx.await.unwrap().unwrap();
1084 assert_eq!(output, 0);
1085 assert!(scheduler.region_status.is_empty());
1086
1087 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1089 let (output_tx, output_rx) = oneshot::channel();
1090 let waiter = OptionOutputTx::from(output_tx);
1091 scheduler
1092 .schedule_compaction(
1093 builder.region_id(),
1094 compact_request::Options::Regular(Default::default()),
1095 &version_control,
1096 &env.access_layer,
1097 waiter,
1098 &manifest_ctx,
1099 schema_metadata_manager,
1100 1,
1101 )
1102 .await
1103 .unwrap();
1104 let output = output_rx.await.unwrap().unwrap();
1105 assert_eq!(output, 0);
1106 assert!(scheduler.region_status.is_empty());
1107 }
1108
1109 #[tokio::test]
1110 async fn test_schedule_on_finished() {
1111 common_telemetry::init_default_ut_logging();
1112 let job_scheduler = Arc::new(VecScheduler::default());
1113 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1114 let (tx, _rx) = mpsc::channel(4);
1115 let mut scheduler = env.mock_compaction_scheduler(tx);
1116 let mut builder = VersionControlBuilder::new();
1117 let purger = builder.file_purger();
1118 let region_id = builder.region_id();
1119
1120 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1121 schema_metadata_manager
1122 .register_region_table_info(
1123 builder.region_id().table_id(),
1124 "test_table",
1125 "test_catalog",
1126 "test_schema",
1127 None,
1128 kv_backend,
1129 )
1130 .await;
1131
1132 let end = 1000 * 1000;
1134 let version_control = Arc::new(
1135 builder
1136 .push_l0_file(0, end)
1137 .push_l0_file(10, end)
1138 .push_l0_file(50, end)
1139 .push_l0_file(80, end)
1140 .push_l0_file(90, end)
1141 .build(),
1142 );
1143 let manifest_ctx = env
1144 .mock_manifest_context(version_control.current().version.metadata.clone())
1145 .await;
1146 scheduler
1147 .schedule_compaction(
1148 region_id,
1149 compact_request::Options::Regular(Default::default()),
1150 &version_control,
1151 &env.access_layer,
1152 OptionOutputTx::none(),
1153 &manifest_ctx,
1154 schema_metadata_manager.clone(),
1155 1,
1156 )
1157 .await
1158 .unwrap();
1159 assert_eq!(1, scheduler.region_status.len());
1161 assert_eq!(1, job_scheduler.num_jobs());
1162 let data = version_control.current();
1163 let file_metas: Vec<_> = data.version.ssts.levels()[0]
1164 .files
1165 .values()
1166 .map(|file| file.meta_ref().clone())
1167 .collect();
1168
1169 apply_edit(
1171 &version_control,
1172 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1173 &file_metas,
1174 purger.clone(),
1175 );
1176 let (tx, _rx) = oneshot::channel();
1178 scheduler
1179 .schedule_compaction(
1180 region_id,
1181 compact_request::Options::Regular(Default::default()),
1182 &version_control,
1183 &env.access_layer,
1184 OptionOutputTx::new(Some(OutputTx::new(tx))),
1185 &manifest_ctx,
1186 schema_metadata_manager.clone(),
1187 1,
1188 )
1189 .await
1190 .unwrap();
1191 assert_eq!(1, scheduler.region_status.len());
1192 assert_eq!(1, job_scheduler.num_jobs());
1193 assert!(
1194 !scheduler
1195 .region_status
1196 .get(&builder.region_id())
1197 .unwrap()
1198 .waiters
1199 .is_empty()
1200 );
1201
1202 scheduler
1204 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1205 .await;
1206 assert_eq!(1, scheduler.region_status.len());
1207 assert_eq!(2, job_scheduler.num_jobs());
1208
1209 apply_edit(
1211 &version_control,
1212 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1213 &[],
1214 purger.clone(),
1215 );
1216 let (tx, _rx) = oneshot::channel();
1217 scheduler
1219 .schedule_compaction(
1220 region_id,
1221 compact_request::Options::Regular(Default::default()),
1222 &version_control,
1223 &env.access_layer,
1224 OptionOutputTx::new(Some(OutputTx::new(tx))),
1225 &manifest_ctx,
1226 schema_metadata_manager,
1227 1,
1228 )
1229 .await
1230 .unwrap();
1231 assert_eq!(2, job_scheduler.num_jobs());
1232 assert!(
1233 !scheduler
1234 .region_status
1235 .get(&builder.region_id())
1236 .unwrap()
1237 .waiters
1238 .is_empty()
1239 );
1240 }
1241
1242 #[tokio::test]
1243 async fn test_manual_compaction_when_compaction_in_progress() {
1244 common_telemetry::init_default_ut_logging();
1245 let job_scheduler = Arc::new(VecScheduler::default());
1246 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1247 let (tx, _rx) = mpsc::channel(4);
1248 let mut scheduler = env.mock_compaction_scheduler(tx);
1249 let mut builder = VersionControlBuilder::new();
1250 let purger = builder.file_purger();
1251 let region_id = builder.region_id();
1252
1253 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1254 schema_metadata_manager
1255 .register_region_table_info(
1256 builder.region_id().table_id(),
1257 "test_table",
1258 "test_catalog",
1259 "test_schema",
1260 None,
1261 kv_backend,
1262 )
1263 .await;
1264
1265 let end = 1000 * 1000;
1267 let version_control = Arc::new(
1268 builder
1269 .push_l0_file(0, end)
1270 .push_l0_file(10, end)
1271 .push_l0_file(50, end)
1272 .push_l0_file(80, end)
1273 .push_l0_file(90, end)
1274 .build(),
1275 );
1276 let manifest_ctx = env
1277 .mock_manifest_context(version_control.current().version.metadata.clone())
1278 .await;
1279
1280 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1281 .files
1282 .values()
1283 .map(|file| file.meta_ref().clone())
1284 .collect();
1285
1286 apply_edit(
1288 &version_control,
1289 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1290 &file_metas,
1291 purger.clone(),
1292 );
1293
1294 scheduler
1295 .schedule_compaction(
1296 region_id,
1297 compact_request::Options::Regular(Default::default()),
1298 &version_control,
1299 &env.access_layer,
1300 OptionOutputTx::none(),
1301 &manifest_ctx,
1302 schema_metadata_manager.clone(),
1303 1,
1304 )
1305 .await
1306 .unwrap();
1307 assert_eq!(1, scheduler.region_status.len());
1309 assert_eq!(1, job_scheduler.num_jobs());
1310 assert!(
1311 scheduler
1312 .region_status
1313 .get(®ion_id)
1314 .unwrap()
1315 .pending_request
1316 .is_none()
1317 );
1318
1319 let (tx, _rx) = oneshot::channel();
1321 scheduler
1322 .schedule_compaction(
1323 region_id,
1324 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1325 &version_control,
1326 &env.access_layer,
1327 OptionOutputTx::new(Some(OutputTx::new(tx))),
1328 &manifest_ctx,
1329 schema_metadata_manager.clone(),
1330 1,
1331 )
1332 .await
1333 .unwrap();
1334 assert_eq!(1, scheduler.region_status.len());
1335 assert_eq!(1, job_scheduler.num_jobs());
1337 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1338 assert!(status.pending_request.is_some());
1339
1340 scheduler
1342 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1343 .await;
1344 assert_eq!(1, scheduler.region_status.len());
1345 assert_eq!(2, job_scheduler.num_jobs());
1346
1347 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1348 assert!(status.pending_request.is_none());
1349 }
1350
1351 #[tokio::test]
1352 async fn test_compaction_bypass_in_staging_mode() {
1353 let env = SchedulerEnv::new().await;
1354 let (tx, _rx) = mpsc::channel(4);
1355 let mut scheduler = env.mock_compaction_scheduler(tx);
1356
1357 let builder = VersionControlBuilder::new();
1359 let version_control = Arc::new(builder.build());
1360 let region_id = version_control.current().version.metadata.region_id;
1361
1362 let staging_manifest_ctx = {
1364 let manager = RegionManifestManager::new(
1365 version_control.current().version.metadata.clone(),
1366 0,
1367 RegionManifestOptions {
1368 manifest_dir: "".to_string(),
1369 object_store: env.access_layer.object_store().clone(),
1370 compress_type: CompressionType::Uncompressed,
1371 checkpoint_distance: 10,
1372 remove_file_options: Default::default(),
1373 manifest_cache: None,
1374 },
1375 FormatType::PrimaryKey,
1376 &Default::default(),
1377 )
1378 .await
1379 .unwrap();
1380 Arc::new(ManifestContext::new(
1381 manager,
1382 RegionRoleState::Leader(RegionLeaderState::Staging),
1383 ))
1384 };
1385
1386 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1387
1388 let (tx, rx) = oneshot::channel();
1390 scheduler
1391 .schedule_compaction(
1392 region_id,
1393 compact_request::Options::Regular(Default::default()),
1394 &version_control,
1395 &env.access_layer,
1396 OptionOutputTx::new(Some(OutputTx::new(tx))),
1397 &staging_manifest_ctx,
1398 schema_metadata_manager,
1399 1,
1400 )
1401 .await
1402 .unwrap();
1403
1404 let result = rx.await.unwrap();
1405 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1407 }
1408
1409 #[tokio::test]
1410 async fn test_concurrent_memory_competition() {
1411 let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); let barrier = Arc::new(Barrier::new(3));
1413 let mut handles = vec![];
1414
1415 for _i in 0..3 {
1417 let mgr = manager.clone();
1418 let bar = barrier.clone();
1419 let handle = tokio::spawn(async move {
1420 bar.wait().await; mgr.try_acquire(2 * 1024 * 1024)
1422 });
1423 handles.push(handle);
1424 }
1425
1426 let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1427 .await
1428 .into_iter()
1429 .map(|r| r.unwrap())
1430 .collect();
1431
1432 let succeeded = results.iter().filter(|r| r.is_some()).count();
1434 let failed = results.iter().filter(|r| r.is_none()).count();
1435
1436 assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1437 assert_eq!(failed, 2, "Expected 2 tasks to fail");
1438
1439 drop(results);
1441 assert_eq!(manager.used_bytes(), 0);
1442 }
1443}