1mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::projection::ProjectionMapper;
62use crate::read::scan_region::{PredicateGroup, ScanInput};
63use crate::read::seq_scan::SeqScan;
64use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
65use crate::region::options::{MergeMode, RegionOptions};
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77pub struct CompactionRequest {
79 pub(crate) engine_config: Arc<MitoConfig>,
80 pub(crate) current_version: CompactionVersion,
81 pub(crate) access_layer: AccessLayerRef,
82 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84 pub(crate) waiters: Vec<OutputTx>,
86 pub(crate) start_time: Instant,
88 pub(crate) cache_manager: CacheManagerRef,
89 pub(crate) manifest_ctx: ManifestContextRef,
90 pub(crate) listener: WorkerListener,
91 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92 pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96 pub(crate) fn region_id(&self) -> RegionId {
97 self.current_version.metadata.region_id
98 }
99}
100
101pub(crate) struct CompactionScheduler {
103 scheduler: SchedulerRef,
104 region_status: HashMap<RegionId, CompactionStatus>,
106 request_sender: Sender<WorkerRequestWithTime>,
108 cache_manager: CacheManagerRef,
109 engine_config: Arc<MitoConfig>,
110 memory_manager: Arc<CompactionMemoryManager>,
111 memory_policy: OnExhaustedPolicy,
112 listener: WorkerListener,
113 plugins: Plugins,
115}
116
117impl CompactionScheduler {
118 #[allow(clippy::too_many_arguments)]
119 pub(crate) fn new(
120 scheduler: SchedulerRef,
121 request_sender: Sender<WorkerRequestWithTime>,
122 cache_manager: CacheManagerRef,
123 engine_config: Arc<MitoConfig>,
124 listener: WorkerListener,
125 plugins: Plugins,
126 memory_manager: Arc<CompactionMemoryManager>,
127 memory_policy: OnExhaustedPolicy,
128 ) -> Self {
129 Self {
130 scheduler,
131 region_status: HashMap::new(),
132 request_sender,
133 cache_manager,
134 engine_config,
135 memory_manager,
136 memory_policy,
137 listener,
138 plugins,
139 }
140 }
141
142 #[allow(clippy::too_many_arguments)]
144 pub(crate) async fn schedule_compaction(
145 &mut self,
146 region_id: RegionId,
147 compact_options: compact_request::Options,
148 version_control: &VersionControlRef,
149 access_layer: &AccessLayerRef,
150 waiter: OptionOutputTx,
151 manifest_ctx: &ManifestContextRef,
152 schema_metadata_manager: SchemaMetadataManagerRef,
153 max_parallelism: usize,
154 ) -> Result<()> {
155 let current_state = manifest_ctx.current_state();
157 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158 info!(
159 "Skipping compaction for region {} in staging mode, options: {:?}",
160 region_id, compact_options
161 );
162 waiter.send(Ok(0));
163 return Ok(());
164 }
165
166 if let Some(status) = self.region_status.get_mut(®ion_id) {
167 match compact_options {
168 Options::Regular(_) => {
169 status.merge_waiter(waiter);
171 }
172 options @ Options::StrictWindow(_) => {
173 status.set_pending_request(PendingCompaction {
175 options,
176 waiter,
177 max_parallelism,
178 });
179 info!(
180 "Region {} is compacting, manually compaction will be re-scheduled.",
181 region_id
182 );
183 }
184 }
185 return Ok(());
186 }
187
188 let mut status =
190 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191 let request = status.new_compaction_request(
192 self.request_sender.clone(),
193 waiter,
194 self.engine_config.clone(),
195 self.cache_manager.clone(),
196 manifest_ctx,
197 self.listener.clone(),
198 schema_metadata_manager,
199 max_parallelism,
200 );
201 self.region_status.insert(region_id, status);
202 let result = self
203 .schedule_compaction_request(request, compact_options)
204 .await;
205
206 self.listener.on_compaction_scheduled(region_id);
207 result
208 }
209
210 pub(crate) async fn on_compaction_finished(
212 &mut self,
213 region_id: RegionId,
214 manifest_ctx: &ManifestContextRef,
215 schema_metadata_manager: SchemaMetadataManagerRef,
216 ) {
217 let Some(status) = self.region_status.get_mut(®ion_id) else {
218 return;
219 };
220
221 if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
222 let PendingCompaction {
223 options,
224 waiter,
225 max_parallelism,
226 } = pending_request;
227
228 let request = status.new_compaction_request(
229 self.request_sender.clone(),
230 waiter,
231 self.engine_config.clone(),
232 self.cache_manager.clone(),
233 manifest_ctx,
234 self.listener.clone(),
235 schema_metadata_manager,
236 max_parallelism,
237 );
238
239 if let Err(e) = self.schedule_compaction_request(request, options).await {
240 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
241 } else {
242 debug!(
243 "Successfully scheduled manual compaction for region id: {}",
244 region_id
245 );
246 }
247 return;
248 }
249
250 let request = status.new_compaction_request(
252 self.request_sender.clone(),
253 OptionOutputTx::none(),
254 self.engine_config.clone(),
255 self.cache_manager.clone(),
256 manifest_ctx,
257 self.listener.clone(),
258 schema_metadata_manager,
259 MAX_PARALLEL_COMPACTION,
260 );
261 if let Err(e) = self
263 .schedule_compaction_request(
264 request,
265 compact_request::Options::Regular(Default::default()),
266 )
267 .await
268 {
269 error!(e; "Failed to schedule next compaction for region {}", region_id);
270 }
271 }
272
273 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
275 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
276 let Some(status) = self.region_status.remove(®ion_id) else {
278 return;
279 };
280
281 status.on_failure(err);
283 }
284
285 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
287 self.remove_region_on_failure(
288 region_id,
289 Arc::new(RegionDroppedSnafu { region_id }.build()),
290 );
291 }
292
293 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
295 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
296 }
297
298 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
300 self.remove_region_on_failure(
301 region_id,
302 Arc::new(RegionTruncatedSnafu { region_id }.build()),
303 );
304 }
305
306 async fn schedule_compaction_request(
310 &mut self,
311 request: CompactionRequest,
312 options: compact_request::Options,
313 ) -> Result<()> {
314 let region_id = request.region_id();
315 let (dynamic_compaction_opts, ttl) = find_dynamic_options(
316 region_id.table_id(),
317 &request.current_version.options,
318 &request.schema_metadata_manager,
319 )
320 .await
321 .unwrap_or_else(|e| {
322 warn!(e; "Failed to find dynamic options for region: {}", region_id);
323 (
324 request.current_version.options.compaction.clone(),
325 request.current_version.options.ttl.unwrap_or_default(),
326 )
327 });
328
329 let picker = new_picker(
330 &options,
331 &dynamic_compaction_opts,
332 request.current_version.options.append_mode,
333 Some(self.engine_config.max_background_compactions),
334 );
335 let region_id = request.region_id();
336 let CompactionRequest {
337 engine_config,
338 current_version,
339 access_layer,
340 request_sender,
341 waiters,
342 start_time,
343 cache_manager,
344 manifest_ctx,
345 listener,
346 schema_metadata_manager: _,
347 max_parallelism,
348 } = request;
349
350 debug!(
351 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
352 picker, region_id, ttl
353 );
354
355 let compaction_region = CompactionRegion {
356 region_id,
357 current_version: current_version.clone(),
358 region_options: RegionOptions {
359 compaction: dynamic_compaction_opts.clone(),
360 ..current_version.options.clone()
361 },
362 engine_config: engine_config.clone(),
363 region_metadata: current_version.metadata.clone(),
364 cache_manager: cache_manager.clone(),
365 access_layer: access_layer.clone(),
366 manifest_ctx: manifest_ctx.clone(),
367 file_purger: None,
368 ttl: Some(ttl),
369 max_parallelism,
370 };
371
372 let picker_output = {
373 let _pick_timer = COMPACTION_STAGE_ELAPSED
374 .with_label_values(&["pick"])
375 .start_timer();
376 picker.pick(&compaction_region)
377 };
378
379 let picker_output = if let Some(picker_output) = picker_output {
380 picker_output
381 } else {
382 for waiter in waiters {
384 waiter.send(Ok(0));
385 }
386 self.region_status.remove(®ion_id);
387 return Ok(());
388 };
389
390 let waiters = if dynamic_compaction_opts.remote_compaction() {
393 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
394 let remote_compaction_job = CompactionJob {
395 compaction_region: compaction_region.clone(),
396 picker_output: picker_output.clone(),
397 start_time,
398 waiters,
399 ttl,
400 };
401
402 let result = remote_job_scheduler
403 .schedule(
404 RemoteJob::CompactionJob(remote_compaction_job),
405 Box::new(DefaultNotifier {
406 request_sender: request_sender.clone(),
407 }),
408 )
409 .await;
410
411 match result {
412 Ok(job_id) => {
413 info!(
414 "Scheduled remote compaction job {} for region {}",
415 job_id, region_id
416 );
417 INFLIGHT_COMPACTION_COUNT.inc();
418 return Ok(());
419 }
420 Err(e) => {
421 if !dynamic_compaction_opts.fallback_to_local() {
422 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
423 return RemoteCompactionSnafu {
424 region_id,
425 job_id: None,
426 reason: e.reason,
427 }
428 .fail();
429 }
430
431 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
432
433 e.waiters
435 }
436 }
437 } else {
438 debug!(
439 "Remote compaction is not enabled, fallback to local compaction for region {}",
440 region_id
441 );
442 waiters
443 }
444 } else {
445 waiters
446 };
447
448 let estimated_bytes = estimate_compaction_bytes(&picker_output);
450 let local_compaction_task = Box::new(CompactionTaskImpl {
451 request_sender,
452 waiters,
453 start_time,
454 listener,
455 picker_output,
456 compaction_region,
457 compactor: Arc::new(DefaultCompactor {}),
458 memory_manager: self.memory_manager.clone(),
459 memory_policy: self.memory_policy,
460 estimated_memory_bytes: estimated_bytes,
461 });
462
463 self.submit_compaction_task(local_compaction_task, region_id)
464 }
465
466 fn submit_compaction_task(
467 &mut self,
468 mut task: Box<CompactionTaskImpl>,
469 region_id: RegionId,
470 ) -> Result<()> {
471 self.scheduler
472 .schedule(Box::pin(async move {
473 INFLIGHT_COMPACTION_COUNT.inc();
474 task.run().await;
475 INFLIGHT_COMPACTION_COUNT.dec();
476 }))
477 .map_err(|e| {
478 error!(e; "Failed to submit compaction request for region {}", region_id);
479 self.region_status.remove(®ion_id);
480 e
481 })
482 }
483
484 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
485 let Some(status) = self.region_status.remove(®ion_id) else {
487 return;
488 };
489
490 status.on_failure(err);
492 }
493}
494
495impl Drop for CompactionScheduler {
496 fn drop(&mut self) {
497 for (region_id, status) in self.region_status.drain() {
498 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
500 }
501 }
502}
503
504async fn find_dynamic_options(
506 table_id: TableId,
507 region_options: &crate::region::options::RegionOptions,
508 schema_metadata_manager: &SchemaMetadataManagerRef,
509) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
510 if region_options.compaction_override && region_options.ttl.is_some() {
511 debug!(
512 "Use region options directly for table {}: compaction={:?}, ttl={:?}",
513 table_id, region_options.compaction, region_options.ttl
514 );
515 return Ok((
516 region_options.compaction.clone(),
517 region_options.ttl.unwrap(),
518 ));
519 }
520
521 let db_options = tokio::time::timeout(
522 crate::config::FETCH_OPTION_TIMEOUT,
523 schema_metadata_manager.get_schema_options_by_table_id(table_id),
524 )
525 .await
526 .context(TimeoutSnafu)?
527 .context(GetSchemaMetadataSnafu)?;
528
529 let ttl = if region_options.ttl.is_some() {
530 debug!(
531 "Use region TTL directly for table {}: ttl={:?}",
532 table_id, region_options.ttl
533 );
534 region_options.ttl.unwrap()
535 } else {
536 db_options
537 .as_ref()
538 .and_then(|options| options.ttl)
539 .unwrap_or_default()
540 .into()
541 };
542
543 let compaction = if !region_options.compaction_override {
544 if let Some(schema_opts) = db_options {
545 let map: HashMap<String, String> = schema_opts
546 .extra_options
547 .iter()
548 .filter_map(|(k, v)| {
549 if k.starts_with("compaction.") {
550 Some((k.clone(), v.clone()))
551 } else {
552 None
553 }
554 })
555 .collect();
556 if map.is_empty() {
557 region_options.compaction.clone()
558 } else {
559 crate::region::options::RegionOptions::try_from(&map)
560 .map(|o| o.compaction)
561 .unwrap_or_else(|e| {
562 error!(e; "Failed to create RegionOptions from map");
563 region_options.compaction.clone()
564 })
565 }
566 } else {
567 debug!(
568 "DB options is None for table {}, use region compaction: compaction={:?}",
569 table_id, region_options.compaction
570 );
571 region_options.compaction.clone()
572 }
573 } else {
574 debug!(
575 "No schema options for table {}, use region compaction: compaction={:?}",
576 table_id, region_options.compaction
577 );
578 region_options.compaction.clone()
579 };
580
581 debug!(
582 "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
583 table_id, compaction, ttl
584 );
585 Ok((compaction, ttl))
586}
587
588struct CompactionStatus {
590 region_id: RegionId,
592 version_control: VersionControlRef,
594 access_layer: AccessLayerRef,
596 waiters: Vec<OutputTx>,
598 pending_request: Option<PendingCompaction>,
600}
601
602impl CompactionStatus {
603 fn new(
605 region_id: RegionId,
606 version_control: VersionControlRef,
607 access_layer: AccessLayerRef,
608 ) -> CompactionStatus {
609 CompactionStatus {
610 region_id,
611 version_control,
612 access_layer,
613 waiters: Vec::new(),
614 pending_request: None,
615 }
616 }
617
618 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
620 if let Some(waiter) = waiter.take_inner() {
621 self.waiters.push(waiter);
622 }
623 }
624
625 fn set_pending_request(&mut self, pending: PendingCompaction) {
627 if let Some(prev) = self.pending_request.replace(pending) {
628 debug!(
629 "Replace pending compaction options with new request {:?} for region: {}",
630 prev.options, self.region_id
631 );
632 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
633 }
634 }
635
636 fn on_failure(mut self, err: Arc<Error>) {
637 for waiter in self.waiters.drain(..) {
638 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
639 region_id: self.region_id,
640 }));
641 }
642
643 if let Some(pending_compaction) = self.pending_request {
644 pending_compaction
645 .waiter
646 .send(Err(err.clone()).context(CompactRegionSnafu {
647 region_id: self.region_id,
648 }));
649 }
650 }
651
652 #[allow(clippy::too_many_arguments)]
656 fn new_compaction_request(
657 &mut self,
658 request_sender: Sender<WorkerRequestWithTime>,
659 mut waiter: OptionOutputTx,
660 engine_config: Arc<MitoConfig>,
661 cache_manager: CacheManagerRef,
662 manifest_ctx: &ManifestContextRef,
663 listener: WorkerListener,
664 schema_metadata_manager: SchemaMetadataManagerRef,
665 max_parallelism: usize,
666 ) -> CompactionRequest {
667 let current_version = CompactionVersion::from(self.version_control.current().version);
668 let start_time = Instant::now();
669 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
670 waiters.extend(std::mem::take(&mut self.waiters));
671
672 if let Some(waiter) = waiter.take_inner() {
673 waiters.push(waiter);
674 }
675
676 CompactionRequest {
677 engine_config,
678 current_version,
679 access_layer: self.access_layer.clone(),
680 request_sender: request_sender.clone(),
681 waiters,
682 start_time,
683 cache_manager,
684 manifest_ctx: manifest_ctx.clone(),
685 listener,
686 schema_metadata_manager,
687 max_parallelism,
688 }
689 }
690}
691
692#[derive(Debug, Clone)]
693pub struct CompactionOutput {
694 pub output_level: Level,
696 pub inputs: Vec<FileHandle>,
698 pub filter_deleted: bool,
700 pub output_time_range: Option<TimestampRange>,
702}
703
704#[derive(Debug, Clone, Serialize, Deserialize)]
706pub struct SerializedCompactionOutput {
707 output_level: Level,
708 inputs: Vec<FileMeta>,
709 filter_deleted: bool,
710 output_time_range: Option<TimestampRange>,
711}
712
713struct CompactionSstReaderBuilder<'a> {
715 metadata: RegionMetadataRef,
716 sst_layer: AccessLayerRef,
717 cache: CacheManagerRef,
718 inputs: &'a [FileHandle],
719 append_mode: bool,
720 filter_deleted: bool,
721 time_range: Option<TimestampRange>,
722 merge_mode: MergeMode,
723}
724
725impl CompactionSstReaderBuilder<'_> {
726 async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
728 let scan_input = self.build_scan_input(false)?.with_compaction(true);
729
730 SeqScan::new(scan_input).build_reader_for_compaction().await
731 }
732
733 async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
735 let scan_input = self.build_scan_input(true)?.with_compaction(true);
736
737 SeqScan::new(scan_input)
738 .build_flat_reader_for_compaction()
739 .await
740 }
741
742 fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
743 let mut scan_input = ScanInput::new(
744 self.sst_layer,
745 ProjectionMapper::all(&self.metadata, flat_format)?,
746 )
747 .with_files(self.inputs.to_vec())
748 .with_append_mode(self.append_mode)
749 .with_cache(CacheStrategy::Compaction(self.cache))
751 .with_filter_deleted(self.filter_deleted)
752 .with_ignore_file_not_found(true)
754 .with_merge_mode(self.merge_mode)
755 .with_flat_format(flat_format);
756
757 if let Some(time_range) = self.time_range {
760 scan_input =
761 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
762 }
763
764 Ok(scan_input)
765 }
766}
767
768fn time_range_to_predicate(
770 range: TimestampRange,
771 metadata: &RegionMetadataRef,
772) -> Result<PredicateGroup> {
773 let ts_col = metadata.time_index_column();
774
775 let ts_col_unit = ts_col
777 .column_schema
778 .data_type
779 .as_timestamp()
780 .unwrap()
781 .unit();
782
783 let exprs = match (range.start(), range.end()) {
784 (Some(start), Some(end)) => {
785 vec![
786 datafusion_expr::col(ts_col.column_schema.name.clone())
787 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
788 datafusion_expr::col(ts_col.column_schema.name.clone())
789 .lt(ts_to_lit(*end, ts_col_unit)?),
790 ]
791 }
792 (Some(start), None) => {
793 vec![
794 datafusion_expr::col(ts_col.column_schema.name.clone())
795 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
796 ]
797 }
798
799 (None, Some(end)) => {
800 vec![
801 datafusion_expr::col(ts_col.column_schema.name.clone())
802 .lt(ts_to_lit(*end, ts_col_unit)?),
803 ]
804 }
805 (None, None) => {
806 return Ok(PredicateGroup::default());
807 }
808 };
809
810 let predicate = PredicateGroup::new(metadata, &exprs)?;
811 Ok(predicate)
812}
813
814fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
815 let ts = ts
816 .convert_to(ts_col_unit)
817 .context(TimeRangePredicateOverflowSnafu {
818 timestamp: ts,
819 unit: ts_col_unit,
820 })?;
821 let val = ts.value();
822 let scalar_value = match ts_col_unit {
823 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
824 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
825 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
826 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
827 };
828 Ok(datafusion_expr::lit(scalar_value))
829}
830
831fn get_expired_ssts(
833 levels: &[LevelMeta],
834 ttl: Option<TimeToLive>,
835 now: Timestamp,
836) -> Vec<FileHandle> {
837 let Some(ttl) = ttl else {
838 return vec![];
839 };
840
841 levels
842 .iter()
843 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
844 .collect()
845}
846
847fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
850 picker_output
851 .outputs
852 .iter()
853 .flat_map(|output| output.inputs.iter())
854 .map(|file: &FileHandle| {
855 let meta = file.meta_ref();
856 meta.max_row_group_uncompressed_size
857 })
858 .sum()
859}
860
861struct PendingCompaction {
864 pub(crate) options: compact_request::Options,
866 pub(crate) waiter: OptionOutputTx,
868 pub(crate) max_parallelism: usize,
870}
871
872#[cfg(test)]
873mod tests {
874 use std::time::Duration;
875
876 use api::v1::region::StrictWindow;
877 use common_datasource::compression::CompressionType;
878 use common_meta::key::schema_name::SchemaNameValue;
879 use common_time::DatabaseTimeToLive;
880 use tokio::sync::{Barrier, oneshot};
881
882 use super::*;
883 use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
884 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
885 use crate::region::ManifestContext;
886 use crate::sst::FormatType;
887 use crate::test_util::mock_schema_metadata_manager;
888 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
889 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
890
891 #[tokio::test]
892 async fn test_find_compaction_options_db_level() {
893 let env = SchedulerEnv::new().await;
894 let builder = VersionControlBuilder::new();
895 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
896 let region_id = builder.region_id();
897 let table_id = region_id.table_id();
898 let mut schema_value = SchemaNameValue {
900 ttl: Some(DatabaseTimeToLive::default()),
901 ..Default::default()
902 };
903 schema_value
904 .extra_options
905 .insert("compaction.type".to_string(), "twcs".to_string());
906 schema_value
907 .extra_options
908 .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
909 schema_metadata_manager
910 .register_region_table_info(
911 table_id,
912 "t",
913 "c",
914 "s",
915 Some(schema_value),
916 kv_backend.clone(),
917 )
918 .await;
919
920 let version_control = Arc::new(builder.build());
921 let region_opts = version_control.current().version.options.clone();
922 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
923 .await
924 .unwrap();
925 match opts {
926 crate::region::options::CompactionOptions::Twcs(t) => {
927 assert_eq!(t.time_window_seconds(), Some(2 * 3600));
928 }
929 }
930 let manifest_ctx = env
931 .mock_manifest_context(version_control.current().version.metadata.clone())
932 .await;
933 let (tx, _rx) = mpsc::channel(4);
934 let mut scheduler = env.mock_compaction_scheduler(tx);
935 let (otx, _orx) = oneshot::channel();
936 let request = scheduler
937 .region_status
938 .entry(region_id)
939 .or_insert_with(|| {
940 crate::compaction::CompactionStatus::new(
941 region_id,
942 version_control.clone(),
943 env.access_layer.clone(),
944 )
945 })
946 .new_compaction_request(
947 scheduler.request_sender.clone(),
948 OptionOutputTx::new(Some(OutputTx::new(otx))),
949 scheduler.engine_config.clone(),
950 scheduler.cache_manager.clone(),
951 &manifest_ctx,
952 scheduler.listener.clone(),
953 schema_metadata_manager.clone(),
954 1,
955 );
956 scheduler
957 .schedule_compaction_request(
958 request,
959 compact_request::Options::Regular(Default::default()),
960 )
961 .await
962 .unwrap();
963 }
964
965 #[tokio::test]
966 async fn test_find_compaction_options_priority() {
967 fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
968 let mut schema_value = SchemaNameValue {
969 ttl: Some(DatabaseTimeToLive::default()),
970 ..Default::default()
971 };
972 schema_value
973 .extra_options
974 .insert("compaction.type".to_string(), "twcs".to_string());
975 schema_value.extra_options.insert(
976 "compaction.twcs.time_window".to_string(),
977 time_window.to_string(),
978 );
979 schema_value
980 }
981
982 let cases = [
983 (
984 "db options set and table override set",
985 Some(schema_value_with_twcs("2h")),
986 true,
987 Some(Duration::from_secs(5 * 3600)),
988 Some(5 * 3600),
989 ),
990 (
991 "db options set and table override not set",
992 Some(schema_value_with_twcs("2h")),
993 false,
994 None,
995 Some(2 * 3600),
996 ),
997 (
998 "db options not set and table override set",
999 None,
1000 true,
1001 Some(Duration::from_secs(4 * 3600)),
1002 Some(4 * 3600),
1003 ),
1004 (
1005 "db options not set and table override not set",
1006 None,
1007 false,
1008 None,
1009 None,
1010 ),
1011 ];
1012
1013 for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1014 let builder = VersionControlBuilder::new();
1015 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1016 let table_id = builder.region_id().table_id();
1017 schema_metadata_manager
1018 .register_region_table_info(
1019 table_id,
1020 "t",
1021 "c",
1022 "s",
1023 schema_value,
1024 kv_backend.clone(),
1025 )
1026 .await;
1027
1028 let version_control = Arc::new(builder.build());
1029 let mut region_opts = version_control.current().version.options.clone();
1030 region_opts.compaction_override = override_set;
1031 if let Some(window) = table_window {
1032 let crate::region::options::CompactionOptions::Twcs(twcs) =
1033 &mut region_opts.compaction;
1034 twcs.time_window = Some(window);
1035 }
1036
1037 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
1038 .await
1039 .unwrap();
1040 match opts {
1041 crate::region::options::CompactionOptions::Twcs(t) => {
1042 assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1043 }
1044 }
1045 }
1046 }
1047
1048 #[tokio::test]
1049 async fn test_schedule_empty() {
1050 let env = SchedulerEnv::new().await;
1051 let (tx, _rx) = mpsc::channel(4);
1052 let mut scheduler = env.mock_compaction_scheduler(tx);
1053 let mut builder = VersionControlBuilder::new();
1054 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1055 schema_metadata_manager
1056 .register_region_table_info(
1057 builder.region_id().table_id(),
1058 "test_table",
1059 "test_catalog",
1060 "test_schema",
1061 None,
1062 kv_backend,
1063 )
1064 .await;
1065 let version_control = Arc::new(builder.build());
1067 let (output_tx, output_rx) = oneshot::channel();
1068 let waiter = OptionOutputTx::from(output_tx);
1069 let manifest_ctx = env
1070 .mock_manifest_context(version_control.current().version.metadata.clone())
1071 .await;
1072 scheduler
1073 .schedule_compaction(
1074 builder.region_id(),
1075 compact_request::Options::Regular(Default::default()),
1076 &version_control,
1077 &env.access_layer,
1078 waiter,
1079 &manifest_ctx,
1080 schema_metadata_manager.clone(),
1081 1,
1082 )
1083 .await
1084 .unwrap();
1085 let output = output_rx.await.unwrap().unwrap();
1086 assert_eq!(output, 0);
1087 assert!(scheduler.region_status.is_empty());
1088
1089 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1091 let (output_tx, output_rx) = oneshot::channel();
1092 let waiter = OptionOutputTx::from(output_tx);
1093 scheduler
1094 .schedule_compaction(
1095 builder.region_id(),
1096 compact_request::Options::Regular(Default::default()),
1097 &version_control,
1098 &env.access_layer,
1099 waiter,
1100 &manifest_ctx,
1101 schema_metadata_manager,
1102 1,
1103 )
1104 .await
1105 .unwrap();
1106 let output = output_rx.await.unwrap().unwrap();
1107 assert_eq!(output, 0);
1108 assert!(scheduler.region_status.is_empty());
1109 }
1110
1111 #[tokio::test]
1112 async fn test_schedule_on_finished() {
1113 common_telemetry::init_default_ut_logging();
1114 let job_scheduler = Arc::new(VecScheduler::default());
1115 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1116 let (tx, _rx) = mpsc::channel(4);
1117 let mut scheduler = env.mock_compaction_scheduler(tx);
1118 let mut builder = VersionControlBuilder::new();
1119 let purger = builder.file_purger();
1120 let region_id = builder.region_id();
1121
1122 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1123 schema_metadata_manager
1124 .register_region_table_info(
1125 builder.region_id().table_id(),
1126 "test_table",
1127 "test_catalog",
1128 "test_schema",
1129 None,
1130 kv_backend,
1131 )
1132 .await;
1133
1134 let end = 1000 * 1000;
1136 let version_control = Arc::new(
1137 builder
1138 .push_l0_file(0, end)
1139 .push_l0_file(10, end)
1140 .push_l0_file(50, end)
1141 .push_l0_file(80, end)
1142 .push_l0_file(90, end)
1143 .build(),
1144 );
1145 let manifest_ctx = env
1146 .mock_manifest_context(version_control.current().version.metadata.clone())
1147 .await;
1148 scheduler
1149 .schedule_compaction(
1150 region_id,
1151 compact_request::Options::Regular(Default::default()),
1152 &version_control,
1153 &env.access_layer,
1154 OptionOutputTx::none(),
1155 &manifest_ctx,
1156 schema_metadata_manager.clone(),
1157 1,
1158 )
1159 .await
1160 .unwrap();
1161 assert_eq!(1, scheduler.region_status.len());
1163 assert_eq!(1, job_scheduler.num_jobs());
1164 let data = version_control.current();
1165 let file_metas: Vec<_> = data.version.ssts.levels()[0]
1166 .files
1167 .values()
1168 .map(|file| file.meta_ref().clone())
1169 .collect();
1170
1171 apply_edit(
1173 &version_control,
1174 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1175 &file_metas,
1176 purger.clone(),
1177 );
1178 let (tx, _rx) = oneshot::channel();
1180 scheduler
1181 .schedule_compaction(
1182 region_id,
1183 compact_request::Options::Regular(Default::default()),
1184 &version_control,
1185 &env.access_layer,
1186 OptionOutputTx::new(Some(OutputTx::new(tx))),
1187 &manifest_ctx,
1188 schema_metadata_manager.clone(),
1189 1,
1190 )
1191 .await
1192 .unwrap();
1193 assert_eq!(1, scheduler.region_status.len());
1194 assert_eq!(1, job_scheduler.num_jobs());
1195 assert!(
1196 !scheduler
1197 .region_status
1198 .get(&builder.region_id())
1199 .unwrap()
1200 .waiters
1201 .is_empty()
1202 );
1203
1204 scheduler
1206 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1207 .await;
1208 assert_eq!(1, scheduler.region_status.len());
1209 assert_eq!(2, job_scheduler.num_jobs());
1210
1211 apply_edit(
1213 &version_control,
1214 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1215 &[],
1216 purger.clone(),
1217 );
1218 let (tx, _rx) = oneshot::channel();
1219 scheduler
1221 .schedule_compaction(
1222 region_id,
1223 compact_request::Options::Regular(Default::default()),
1224 &version_control,
1225 &env.access_layer,
1226 OptionOutputTx::new(Some(OutputTx::new(tx))),
1227 &manifest_ctx,
1228 schema_metadata_manager,
1229 1,
1230 )
1231 .await
1232 .unwrap();
1233 assert_eq!(2, job_scheduler.num_jobs());
1234 assert!(
1235 !scheduler
1236 .region_status
1237 .get(&builder.region_id())
1238 .unwrap()
1239 .waiters
1240 .is_empty()
1241 );
1242 }
1243
1244 #[tokio::test]
1245 async fn test_manual_compaction_when_compaction_in_progress() {
1246 common_telemetry::init_default_ut_logging();
1247 let job_scheduler = Arc::new(VecScheduler::default());
1248 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1249 let (tx, _rx) = mpsc::channel(4);
1250 let mut scheduler = env.mock_compaction_scheduler(tx);
1251 let mut builder = VersionControlBuilder::new();
1252 let purger = builder.file_purger();
1253 let region_id = builder.region_id();
1254
1255 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1256 schema_metadata_manager
1257 .register_region_table_info(
1258 builder.region_id().table_id(),
1259 "test_table",
1260 "test_catalog",
1261 "test_schema",
1262 None,
1263 kv_backend,
1264 )
1265 .await;
1266
1267 let end = 1000 * 1000;
1269 let version_control = Arc::new(
1270 builder
1271 .push_l0_file(0, end)
1272 .push_l0_file(10, end)
1273 .push_l0_file(50, end)
1274 .push_l0_file(80, end)
1275 .push_l0_file(90, end)
1276 .build(),
1277 );
1278 let manifest_ctx = env
1279 .mock_manifest_context(version_control.current().version.metadata.clone())
1280 .await;
1281
1282 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1283 .files
1284 .values()
1285 .map(|file| file.meta_ref().clone())
1286 .collect();
1287
1288 apply_edit(
1290 &version_control,
1291 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1292 &file_metas,
1293 purger.clone(),
1294 );
1295
1296 scheduler
1297 .schedule_compaction(
1298 region_id,
1299 compact_request::Options::Regular(Default::default()),
1300 &version_control,
1301 &env.access_layer,
1302 OptionOutputTx::none(),
1303 &manifest_ctx,
1304 schema_metadata_manager.clone(),
1305 1,
1306 )
1307 .await
1308 .unwrap();
1309 assert_eq!(1, scheduler.region_status.len());
1311 assert_eq!(1, job_scheduler.num_jobs());
1312 assert!(
1313 scheduler
1314 .region_status
1315 .get(®ion_id)
1316 .unwrap()
1317 .pending_request
1318 .is_none()
1319 );
1320
1321 let (tx, _rx) = oneshot::channel();
1323 scheduler
1324 .schedule_compaction(
1325 region_id,
1326 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1327 &version_control,
1328 &env.access_layer,
1329 OptionOutputTx::new(Some(OutputTx::new(tx))),
1330 &manifest_ctx,
1331 schema_metadata_manager.clone(),
1332 1,
1333 )
1334 .await
1335 .unwrap();
1336 assert_eq!(1, scheduler.region_status.len());
1337 assert_eq!(1, job_scheduler.num_jobs());
1339 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1340 assert!(status.pending_request.is_some());
1341
1342 scheduler
1344 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1345 .await;
1346 assert_eq!(1, scheduler.region_status.len());
1347 assert_eq!(2, job_scheduler.num_jobs());
1348
1349 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1350 assert!(status.pending_request.is_none());
1351 }
1352
1353 #[tokio::test]
1354 async fn test_compaction_bypass_in_staging_mode() {
1355 let env = SchedulerEnv::new().await;
1356 let (tx, _rx) = mpsc::channel(4);
1357 let mut scheduler = env.mock_compaction_scheduler(tx);
1358
1359 let builder = VersionControlBuilder::new();
1361 let version_control = Arc::new(builder.build());
1362 let region_id = version_control.current().version.metadata.region_id;
1363
1364 let staging_manifest_ctx = {
1366 let manager = RegionManifestManager::new(
1367 version_control.current().version.metadata.clone(),
1368 0,
1369 RegionManifestOptions {
1370 manifest_dir: "".to_string(),
1371 object_store: env.access_layer.object_store().clone(),
1372 compress_type: CompressionType::Uncompressed,
1373 checkpoint_distance: 10,
1374 remove_file_options: Default::default(),
1375 manifest_cache: None,
1376 },
1377 FormatType::PrimaryKey,
1378 &Default::default(),
1379 )
1380 .await
1381 .unwrap();
1382 Arc::new(ManifestContext::new(
1383 manager,
1384 RegionRoleState::Leader(RegionLeaderState::Staging),
1385 ))
1386 };
1387
1388 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1389
1390 let (tx, rx) = oneshot::channel();
1392 scheduler
1393 .schedule_compaction(
1394 region_id,
1395 compact_request::Options::Regular(Default::default()),
1396 &version_control,
1397 &env.access_layer,
1398 OptionOutputTx::new(Some(OutputTx::new(tx))),
1399 &staging_manifest_ctx,
1400 schema_metadata_manager,
1401 1,
1402 )
1403 .await
1404 .unwrap();
1405
1406 let result = rx.await.unwrap();
1407 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1409 }
1410
1411 #[tokio::test]
1412 async fn test_concurrent_memory_competition() {
1413 let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); let barrier = Arc::new(Barrier::new(3));
1415 let mut handles = vec![];
1416
1417 for _i in 0..3 {
1419 let mgr = manager.clone();
1420 let bar = barrier.clone();
1421 let handle = tokio::spawn(async move {
1422 bar.wait().await; mgr.try_acquire(2 * 1024 * 1024)
1424 });
1425 handles.push(handle);
1426 }
1427
1428 let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1429 .await
1430 .into_iter()
1431 .map(|r| r.unwrap())
1432 .collect();
1433
1434 let succeeded = results.iter().filter(|r| r.is_some()).count();
1436 let failed = results.iter().filter(|r| r.is_none()).count();
1437
1438 assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1439 assert_eq!(failed, 2, "Expected 2 tasks to fail");
1440
1441 drop(results);
1443 assert_eq!(manager.used_bytes(), 0);
1444 }
1445}