1mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_memory_manager::OnExhaustedPolicy;
34use common_meta::key::SchemaMetadataManagerRef;
35use common_telemetry::{debug, error, info, warn};
36use common_time::range::TimestampRange;
37use common_time::timestamp::TimeUnit;
38use common_time::{TimeToLive, Timestamp};
39use datafusion_common::ScalarValue;
40use datafusion_expr::Expr;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{RegionId, TableId};
45use task::MAX_PARALLEL_COMPACTION;
46use tokio::sync::mpsc::{self, Sender};
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::{CacheManagerRef, CacheStrategy};
50use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
51use crate::compaction::memory_manager::CompactionMemoryManager;
52use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
53use crate::compaction::task::CompactionTaskImpl;
54use crate::config::MitoConfig;
55use crate::error::{
56 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
57 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
58 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
59};
60use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
61use crate::read::projection::ProjectionMapper;
62use crate::read::scan_region::{PredicateGroup, ScanInput};
63use crate::read::seq_scan::SeqScan;
64use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
65use crate::region::options::MergeMode;
66use crate::region::version::VersionControlRef;
67use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
68use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
69use crate::schedule::remote_job_scheduler::{
70 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
71};
72use crate::schedule::scheduler::SchedulerRef;
73use crate::sst::file::{FileHandle, FileMeta, Level};
74use crate::sst::version::LevelMeta;
75use crate::worker::WorkerListener;
76
77pub struct CompactionRequest {
79 pub(crate) engine_config: Arc<MitoConfig>,
80 pub(crate) current_version: CompactionVersion,
81 pub(crate) access_layer: AccessLayerRef,
82 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
84 pub(crate) waiters: Vec<OutputTx>,
86 pub(crate) start_time: Instant,
88 pub(crate) cache_manager: CacheManagerRef,
89 pub(crate) manifest_ctx: ManifestContextRef,
90 pub(crate) listener: WorkerListener,
91 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
92 pub(crate) max_parallelism: usize,
93}
94
95impl CompactionRequest {
96 pub(crate) fn region_id(&self) -> RegionId {
97 self.current_version.metadata.region_id
98 }
99}
100
101pub(crate) struct CompactionScheduler {
103 scheduler: SchedulerRef,
104 region_status: HashMap<RegionId, CompactionStatus>,
106 request_sender: Sender<WorkerRequestWithTime>,
108 cache_manager: CacheManagerRef,
109 engine_config: Arc<MitoConfig>,
110 memory_manager: Arc<CompactionMemoryManager>,
111 memory_policy: OnExhaustedPolicy,
112 listener: WorkerListener,
113 plugins: Plugins,
115}
116
117impl CompactionScheduler {
118 #[allow(clippy::too_many_arguments)]
119 pub(crate) fn new(
120 scheduler: SchedulerRef,
121 request_sender: Sender<WorkerRequestWithTime>,
122 cache_manager: CacheManagerRef,
123 engine_config: Arc<MitoConfig>,
124 listener: WorkerListener,
125 plugins: Plugins,
126 memory_manager: Arc<CompactionMemoryManager>,
127 memory_policy: OnExhaustedPolicy,
128 ) -> Self {
129 Self {
130 scheduler,
131 region_status: HashMap::new(),
132 request_sender,
133 cache_manager,
134 engine_config,
135 memory_manager,
136 memory_policy,
137 listener,
138 plugins,
139 }
140 }
141
142 #[allow(clippy::too_many_arguments)]
144 pub(crate) async fn schedule_compaction(
145 &mut self,
146 region_id: RegionId,
147 compact_options: compact_request::Options,
148 version_control: &VersionControlRef,
149 access_layer: &AccessLayerRef,
150 waiter: OptionOutputTx,
151 manifest_ctx: &ManifestContextRef,
152 schema_metadata_manager: SchemaMetadataManagerRef,
153 max_parallelism: usize,
154 ) -> Result<()> {
155 let current_state = manifest_ctx.current_state();
157 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
158 info!(
159 "Skipping compaction for region {} in staging mode, options: {:?}",
160 region_id, compact_options
161 );
162 waiter.send(Ok(0));
163 return Ok(());
164 }
165
166 if let Some(status) = self.region_status.get_mut(®ion_id) {
167 match compact_options {
168 Options::Regular(_) => {
169 status.merge_waiter(waiter);
171 }
172 options @ Options::StrictWindow(_) => {
173 status.set_pending_request(PendingCompaction {
175 options,
176 waiter,
177 max_parallelism,
178 });
179 info!(
180 "Region {} is compacting, manually compaction will be re-scheduled.",
181 region_id
182 );
183 }
184 }
185 return Ok(());
186 }
187
188 let mut status =
190 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
191 let request = status.new_compaction_request(
192 self.request_sender.clone(),
193 waiter,
194 self.engine_config.clone(),
195 self.cache_manager.clone(),
196 manifest_ctx,
197 self.listener.clone(),
198 schema_metadata_manager,
199 max_parallelism,
200 );
201 self.region_status.insert(region_id, status);
202 let result = self
203 .schedule_compaction_request(request, compact_options)
204 .await;
205
206 self.listener.on_compaction_scheduled(region_id);
207 result
208 }
209
210 pub(crate) async fn on_compaction_finished(
212 &mut self,
213 region_id: RegionId,
214 manifest_ctx: &ManifestContextRef,
215 schema_metadata_manager: SchemaMetadataManagerRef,
216 ) {
217 let Some(status) = self.region_status.get_mut(®ion_id) else {
218 return;
219 };
220
221 if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
222 let PendingCompaction {
223 options,
224 waiter,
225 max_parallelism,
226 } = pending_request;
227
228 let request = status.new_compaction_request(
229 self.request_sender.clone(),
230 waiter,
231 self.engine_config.clone(),
232 self.cache_manager.clone(),
233 manifest_ctx,
234 self.listener.clone(),
235 schema_metadata_manager,
236 max_parallelism,
237 );
238
239 if let Err(e) = self.schedule_compaction_request(request, options).await {
240 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
241 } else {
242 debug!(
243 "Successfully scheduled manual compaction for region id: {}",
244 region_id
245 );
246 }
247 return;
248 }
249
250 let request = status.new_compaction_request(
252 self.request_sender.clone(),
253 OptionOutputTx::none(),
254 self.engine_config.clone(),
255 self.cache_manager.clone(),
256 manifest_ctx,
257 self.listener.clone(),
258 schema_metadata_manager,
259 MAX_PARALLEL_COMPACTION,
260 );
261 if let Err(e) = self
263 .schedule_compaction_request(
264 request,
265 compact_request::Options::Regular(Default::default()),
266 )
267 .await
268 {
269 error!(e; "Failed to schedule next compaction for region {}", region_id);
270 }
271 }
272
273 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
275 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
276 let Some(status) = self.region_status.remove(®ion_id) else {
278 return;
279 };
280
281 status.on_failure(err);
283 }
284
285 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
287 self.remove_region_on_failure(
288 region_id,
289 Arc::new(RegionDroppedSnafu { region_id }.build()),
290 );
291 }
292
293 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
295 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
296 }
297
298 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
300 self.remove_region_on_failure(
301 region_id,
302 Arc::new(RegionTruncatedSnafu { region_id }.build()),
303 );
304 }
305
306 async fn schedule_compaction_request(
310 &mut self,
311 request: CompactionRequest,
312 options: compact_request::Options,
313 ) -> Result<()> {
314 let picker = new_picker(
315 &options,
316 &request.current_version.options.compaction,
317 request.current_version.options.append_mode,
318 Some(self.engine_config.max_background_compactions),
319 );
320 let region_id = request.region_id();
321 let CompactionRequest {
322 engine_config,
323 current_version,
324 access_layer,
325 request_sender,
326 waiters,
327 start_time,
328 cache_manager,
329 manifest_ctx,
330 listener,
331 schema_metadata_manager,
332 max_parallelism,
333 } = request;
334
335 let ttl = find_ttl(
336 region_id.table_id(),
337 current_version.options.ttl,
338 &schema_metadata_manager,
339 )
340 .await
341 .unwrap_or_else(|e| {
342 warn!(e; "Failed to get ttl for region: {}", region_id);
343 TimeToLive::default()
344 });
345
346 debug!(
347 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
348 picker, region_id, ttl
349 );
350
351 let compaction_region = CompactionRegion {
352 region_id,
353 current_version: current_version.clone(),
354 region_options: current_version.options.clone(),
355 engine_config: engine_config.clone(),
356 region_metadata: current_version.metadata.clone(),
357 cache_manager: cache_manager.clone(),
358 access_layer: access_layer.clone(),
359 manifest_ctx: manifest_ctx.clone(),
360 file_purger: None,
361 ttl: Some(ttl),
362 max_parallelism,
363 };
364
365 let picker_output = {
366 let _pick_timer = COMPACTION_STAGE_ELAPSED
367 .with_label_values(&["pick"])
368 .start_timer();
369 picker.pick(&compaction_region)
370 };
371
372 let picker_output = if let Some(picker_output) = picker_output {
373 picker_output
374 } else {
375 for waiter in waiters {
377 waiter.send(Ok(0));
378 }
379 self.region_status.remove(®ion_id);
380 return Ok(());
381 };
382
383 let waiters = if current_version.options.compaction.remote_compaction() {
386 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
387 let remote_compaction_job = CompactionJob {
388 compaction_region: compaction_region.clone(),
389 picker_output: picker_output.clone(),
390 start_time,
391 waiters,
392 ttl,
393 };
394
395 let result = remote_job_scheduler
396 .schedule(
397 RemoteJob::CompactionJob(remote_compaction_job),
398 Box::new(DefaultNotifier {
399 request_sender: request_sender.clone(),
400 }),
401 )
402 .await;
403
404 match result {
405 Ok(job_id) => {
406 info!(
407 "Scheduled remote compaction job {} for region {}",
408 job_id, region_id
409 );
410 INFLIGHT_COMPACTION_COUNT.inc();
411 return Ok(());
412 }
413 Err(e) => {
414 if !current_version.options.compaction.fallback_to_local() {
415 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
416 return RemoteCompactionSnafu {
417 region_id,
418 job_id: None,
419 reason: e.reason,
420 }
421 .fail();
422 }
423
424 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
425
426 e.waiters
428 }
429 }
430 } else {
431 debug!(
432 "Remote compaction is not enabled, fallback to local compaction for region {}",
433 region_id
434 );
435 waiters
436 }
437 } else {
438 waiters
439 };
440
441 let estimated_bytes = estimate_compaction_bytes(&picker_output);
443 let local_compaction_task = Box::new(CompactionTaskImpl {
444 request_sender,
445 waiters,
446 start_time,
447 listener,
448 picker_output,
449 compaction_region,
450 compactor: Arc::new(DefaultCompactor {}),
451 memory_manager: self.memory_manager.clone(),
452 memory_policy: self.memory_policy,
453 estimated_memory_bytes: estimated_bytes,
454 });
455
456 self.submit_compaction_task(local_compaction_task, region_id)
457 }
458
459 fn submit_compaction_task(
460 &mut self,
461 mut task: Box<CompactionTaskImpl>,
462 region_id: RegionId,
463 ) -> Result<()> {
464 self.scheduler
465 .schedule(Box::pin(async move {
466 INFLIGHT_COMPACTION_COUNT.inc();
467 task.run().await;
468 INFLIGHT_COMPACTION_COUNT.dec();
469 }))
470 .map_err(|e| {
471 error!(e; "Failed to submit compaction request for region {}", region_id);
472 self.region_status.remove(®ion_id);
473 e
474 })
475 }
476
477 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
478 let Some(status) = self.region_status.remove(®ion_id) else {
480 return;
481 };
482
483 status.on_failure(err);
485 }
486}
487
488impl Drop for CompactionScheduler {
489 fn drop(&mut self) {
490 for (region_id, status) in self.region_status.drain() {
491 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
493 }
494 }
495}
496
497async fn find_ttl(
499 table_id: TableId,
500 table_ttl: Option<TimeToLive>,
501 schema_metadata_manager: &SchemaMetadataManagerRef,
502) -> Result<TimeToLive> {
503 if let Some(table_ttl) = table_ttl {
505 return Ok(table_ttl);
506 }
507
508 let ttl = tokio::time::timeout(
509 crate::config::FETCH_OPTION_TIMEOUT,
510 schema_metadata_manager.get_schema_options_by_table_id(table_id),
511 )
512 .await
513 .context(TimeoutSnafu)?
514 .context(GetSchemaMetadataSnafu)?
515 .and_then(|options| options.ttl)
516 .unwrap_or_default()
517 .into();
518
519 Ok(ttl)
520}
521
522struct CompactionStatus {
524 region_id: RegionId,
526 version_control: VersionControlRef,
528 access_layer: AccessLayerRef,
530 waiters: Vec<OutputTx>,
532 pending_request: Option<PendingCompaction>,
534}
535
536impl CompactionStatus {
537 fn new(
539 region_id: RegionId,
540 version_control: VersionControlRef,
541 access_layer: AccessLayerRef,
542 ) -> CompactionStatus {
543 CompactionStatus {
544 region_id,
545 version_control,
546 access_layer,
547 waiters: Vec::new(),
548 pending_request: None,
549 }
550 }
551
552 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
554 if let Some(waiter) = waiter.take_inner() {
555 self.waiters.push(waiter);
556 }
557 }
558
559 fn set_pending_request(&mut self, pending: PendingCompaction) {
561 if let Some(prev) = self.pending_request.replace(pending) {
562 debug!(
563 "Replace pending compaction options with new request {:?} for region: {}",
564 prev.options, self.region_id
565 );
566 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
567 }
568 }
569
570 fn on_failure(mut self, err: Arc<Error>) {
571 for waiter in self.waiters.drain(..) {
572 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
573 region_id: self.region_id,
574 }));
575 }
576
577 if let Some(pending_compaction) = self.pending_request {
578 pending_compaction
579 .waiter
580 .send(Err(err.clone()).context(CompactRegionSnafu {
581 region_id: self.region_id,
582 }));
583 }
584 }
585
586 #[allow(clippy::too_many_arguments)]
590 fn new_compaction_request(
591 &mut self,
592 request_sender: Sender<WorkerRequestWithTime>,
593 mut waiter: OptionOutputTx,
594 engine_config: Arc<MitoConfig>,
595 cache_manager: CacheManagerRef,
596 manifest_ctx: &ManifestContextRef,
597 listener: WorkerListener,
598 schema_metadata_manager: SchemaMetadataManagerRef,
599 max_parallelism: usize,
600 ) -> CompactionRequest {
601 let current_version = CompactionVersion::from(self.version_control.current().version);
602 let start_time = Instant::now();
603 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
604 waiters.extend(std::mem::take(&mut self.waiters));
605
606 if let Some(waiter) = waiter.take_inner() {
607 waiters.push(waiter);
608 }
609
610 CompactionRequest {
611 engine_config,
612 current_version,
613 access_layer: self.access_layer.clone(),
614 request_sender: request_sender.clone(),
615 waiters,
616 start_time,
617 cache_manager,
618 manifest_ctx: manifest_ctx.clone(),
619 listener,
620 schema_metadata_manager,
621 max_parallelism,
622 }
623 }
624}
625
626#[derive(Debug, Clone)]
627pub struct CompactionOutput {
628 pub output_level: Level,
630 pub inputs: Vec<FileHandle>,
632 pub filter_deleted: bool,
634 pub output_time_range: Option<TimestampRange>,
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
640pub struct SerializedCompactionOutput {
641 output_level: Level,
642 inputs: Vec<FileMeta>,
643 filter_deleted: bool,
644 output_time_range: Option<TimestampRange>,
645}
646
647struct CompactionSstReaderBuilder<'a> {
649 metadata: RegionMetadataRef,
650 sst_layer: AccessLayerRef,
651 cache: CacheManagerRef,
652 inputs: &'a [FileHandle],
653 append_mode: bool,
654 filter_deleted: bool,
655 time_range: Option<TimestampRange>,
656 merge_mode: MergeMode,
657}
658
659impl CompactionSstReaderBuilder<'_> {
660 async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
662 let scan_input = self.build_scan_input(false)?.with_compaction(true);
663
664 SeqScan::new(scan_input).build_reader_for_compaction().await
665 }
666
667 async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
669 let scan_input = self.build_scan_input(true)?.with_compaction(true);
670
671 SeqScan::new(scan_input)
672 .build_flat_reader_for_compaction()
673 .await
674 }
675
676 fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
677 let mut scan_input = ScanInput::new(
678 self.sst_layer,
679 ProjectionMapper::all(&self.metadata, flat_format)?,
680 )
681 .with_files(self.inputs.to_vec())
682 .with_append_mode(self.append_mode)
683 .with_cache(CacheStrategy::Compaction(self.cache))
685 .with_filter_deleted(self.filter_deleted)
686 .with_ignore_file_not_found(true)
688 .with_merge_mode(self.merge_mode)
689 .with_flat_format(flat_format);
690
691 if let Some(time_range) = self.time_range {
694 scan_input =
695 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
696 }
697
698 Ok(scan_input)
699 }
700}
701
702fn time_range_to_predicate(
704 range: TimestampRange,
705 metadata: &RegionMetadataRef,
706) -> Result<PredicateGroup> {
707 let ts_col = metadata.time_index_column();
708
709 let ts_col_unit = ts_col
711 .column_schema
712 .data_type
713 .as_timestamp()
714 .unwrap()
715 .unit();
716
717 let exprs = match (range.start(), range.end()) {
718 (Some(start), Some(end)) => {
719 vec![
720 datafusion_expr::col(ts_col.column_schema.name.clone())
721 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
722 datafusion_expr::col(ts_col.column_schema.name.clone())
723 .lt(ts_to_lit(*end, ts_col_unit)?),
724 ]
725 }
726 (Some(start), None) => {
727 vec![
728 datafusion_expr::col(ts_col.column_schema.name.clone())
729 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
730 ]
731 }
732
733 (None, Some(end)) => {
734 vec![
735 datafusion_expr::col(ts_col.column_schema.name.clone())
736 .lt(ts_to_lit(*end, ts_col_unit)?),
737 ]
738 }
739 (None, None) => {
740 return Ok(PredicateGroup::default());
741 }
742 };
743
744 let predicate = PredicateGroup::new(metadata, &exprs)?;
745 Ok(predicate)
746}
747
748fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
749 let ts = ts
750 .convert_to(ts_col_unit)
751 .context(TimeRangePredicateOverflowSnafu {
752 timestamp: ts,
753 unit: ts_col_unit,
754 })?;
755 let val = ts.value();
756 let scalar_value = match ts_col_unit {
757 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
758 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
759 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
760 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
761 };
762 Ok(datafusion_expr::lit(scalar_value))
763}
764
765fn get_expired_ssts(
767 levels: &[LevelMeta],
768 ttl: Option<TimeToLive>,
769 now: Timestamp,
770) -> Vec<FileHandle> {
771 let Some(ttl) = ttl else {
772 return vec![];
773 };
774
775 levels
776 .iter()
777 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
778 .collect()
779}
780
781fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
784 picker_output
785 .outputs
786 .iter()
787 .flat_map(|output| output.inputs.iter())
788 .map(|file: &FileHandle| {
789 let meta = file.meta_ref();
790 meta.max_row_group_uncompressed_size
791 })
792 .sum()
793}
794
795struct PendingCompaction {
798 pub(crate) options: compact_request::Options,
800 pub(crate) waiter: OptionOutputTx,
802 pub(crate) max_parallelism: usize,
804}
805
806#[cfg(test)]
807mod tests {
808 use api::v1::region::StrictWindow;
809 use common_datasource::compression::CompressionType;
810 use tokio::sync::{Barrier, oneshot};
811
812 use super::*;
813 use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
814 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
815 use crate::region::ManifestContext;
816 use crate::sst::FormatType;
817 use crate::test_util::mock_schema_metadata_manager;
818 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
819 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
820
821 #[tokio::test]
822 async fn test_schedule_empty() {
823 let env = SchedulerEnv::new().await;
824 let (tx, _rx) = mpsc::channel(4);
825 let mut scheduler = env.mock_compaction_scheduler(tx);
826 let mut builder = VersionControlBuilder::new();
827 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
828 schema_metadata_manager
829 .register_region_table_info(
830 builder.region_id().table_id(),
831 "test_table",
832 "test_catalog",
833 "test_schema",
834 None,
835 kv_backend,
836 )
837 .await;
838 let version_control = Arc::new(builder.build());
840 let (output_tx, output_rx) = oneshot::channel();
841 let waiter = OptionOutputTx::from(output_tx);
842 let manifest_ctx = env
843 .mock_manifest_context(version_control.current().version.metadata.clone())
844 .await;
845 scheduler
846 .schedule_compaction(
847 builder.region_id(),
848 compact_request::Options::Regular(Default::default()),
849 &version_control,
850 &env.access_layer,
851 waiter,
852 &manifest_ctx,
853 schema_metadata_manager.clone(),
854 1,
855 )
856 .await
857 .unwrap();
858 let output = output_rx.await.unwrap().unwrap();
859 assert_eq!(output, 0);
860 assert!(scheduler.region_status.is_empty());
861
862 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
864 let (output_tx, output_rx) = oneshot::channel();
865 let waiter = OptionOutputTx::from(output_tx);
866 scheduler
867 .schedule_compaction(
868 builder.region_id(),
869 compact_request::Options::Regular(Default::default()),
870 &version_control,
871 &env.access_layer,
872 waiter,
873 &manifest_ctx,
874 schema_metadata_manager,
875 1,
876 )
877 .await
878 .unwrap();
879 let output = output_rx.await.unwrap().unwrap();
880 assert_eq!(output, 0);
881 assert!(scheduler.region_status.is_empty());
882 }
883
884 #[tokio::test]
885 async fn test_schedule_on_finished() {
886 common_telemetry::init_default_ut_logging();
887 let job_scheduler = Arc::new(VecScheduler::default());
888 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
889 let (tx, _rx) = mpsc::channel(4);
890 let mut scheduler = env.mock_compaction_scheduler(tx);
891 let mut builder = VersionControlBuilder::new();
892 let purger = builder.file_purger();
893 let region_id = builder.region_id();
894
895 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
896 schema_metadata_manager
897 .register_region_table_info(
898 builder.region_id().table_id(),
899 "test_table",
900 "test_catalog",
901 "test_schema",
902 None,
903 kv_backend,
904 )
905 .await;
906
907 let end = 1000 * 1000;
909 let version_control = Arc::new(
910 builder
911 .push_l0_file(0, end)
912 .push_l0_file(10, end)
913 .push_l0_file(50, end)
914 .push_l0_file(80, end)
915 .push_l0_file(90, end)
916 .build(),
917 );
918 let manifest_ctx = env
919 .mock_manifest_context(version_control.current().version.metadata.clone())
920 .await;
921 scheduler
922 .schedule_compaction(
923 region_id,
924 compact_request::Options::Regular(Default::default()),
925 &version_control,
926 &env.access_layer,
927 OptionOutputTx::none(),
928 &manifest_ctx,
929 schema_metadata_manager.clone(),
930 1,
931 )
932 .await
933 .unwrap();
934 assert_eq!(1, scheduler.region_status.len());
936 assert_eq!(1, job_scheduler.num_jobs());
937 let data = version_control.current();
938 let file_metas: Vec<_> = data.version.ssts.levels()[0]
939 .files
940 .values()
941 .map(|file| file.meta_ref().clone())
942 .collect();
943
944 apply_edit(
946 &version_control,
947 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
948 &file_metas,
949 purger.clone(),
950 );
951 let (tx, _rx) = oneshot::channel();
953 scheduler
954 .schedule_compaction(
955 region_id,
956 compact_request::Options::Regular(Default::default()),
957 &version_control,
958 &env.access_layer,
959 OptionOutputTx::new(Some(OutputTx::new(tx))),
960 &manifest_ctx,
961 schema_metadata_manager.clone(),
962 1,
963 )
964 .await
965 .unwrap();
966 assert_eq!(1, scheduler.region_status.len());
967 assert_eq!(1, job_scheduler.num_jobs());
968 assert!(
969 !scheduler
970 .region_status
971 .get(&builder.region_id())
972 .unwrap()
973 .waiters
974 .is_empty()
975 );
976
977 scheduler
979 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
980 .await;
981 assert_eq!(1, scheduler.region_status.len());
982 assert_eq!(2, job_scheduler.num_jobs());
983
984 apply_edit(
986 &version_control,
987 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
988 &[],
989 purger.clone(),
990 );
991 let (tx, _rx) = oneshot::channel();
992 scheduler
994 .schedule_compaction(
995 region_id,
996 compact_request::Options::Regular(Default::default()),
997 &version_control,
998 &env.access_layer,
999 OptionOutputTx::new(Some(OutputTx::new(tx))),
1000 &manifest_ctx,
1001 schema_metadata_manager,
1002 1,
1003 )
1004 .await
1005 .unwrap();
1006 assert_eq!(2, job_scheduler.num_jobs());
1007 assert!(
1008 !scheduler
1009 .region_status
1010 .get(&builder.region_id())
1011 .unwrap()
1012 .waiters
1013 .is_empty()
1014 );
1015 }
1016
1017 #[tokio::test]
1018 async fn test_manual_compaction_when_compaction_in_progress() {
1019 common_telemetry::init_default_ut_logging();
1020 let job_scheduler = Arc::new(VecScheduler::default());
1021 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1022 let (tx, _rx) = mpsc::channel(4);
1023 let mut scheduler = env.mock_compaction_scheduler(tx);
1024 let mut builder = VersionControlBuilder::new();
1025 let purger = builder.file_purger();
1026 let region_id = builder.region_id();
1027
1028 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1029 schema_metadata_manager
1030 .register_region_table_info(
1031 builder.region_id().table_id(),
1032 "test_table",
1033 "test_catalog",
1034 "test_schema",
1035 None,
1036 kv_backend,
1037 )
1038 .await;
1039
1040 let end = 1000 * 1000;
1042 let version_control = Arc::new(
1043 builder
1044 .push_l0_file(0, end)
1045 .push_l0_file(10, end)
1046 .push_l0_file(50, end)
1047 .push_l0_file(80, end)
1048 .push_l0_file(90, end)
1049 .build(),
1050 );
1051 let manifest_ctx = env
1052 .mock_manifest_context(version_control.current().version.metadata.clone())
1053 .await;
1054
1055 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1056 .files
1057 .values()
1058 .map(|file| file.meta_ref().clone())
1059 .collect();
1060
1061 apply_edit(
1063 &version_control,
1064 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1065 &file_metas,
1066 purger.clone(),
1067 );
1068
1069 scheduler
1070 .schedule_compaction(
1071 region_id,
1072 compact_request::Options::Regular(Default::default()),
1073 &version_control,
1074 &env.access_layer,
1075 OptionOutputTx::none(),
1076 &manifest_ctx,
1077 schema_metadata_manager.clone(),
1078 1,
1079 )
1080 .await
1081 .unwrap();
1082 assert_eq!(1, scheduler.region_status.len());
1084 assert_eq!(1, job_scheduler.num_jobs());
1085 assert!(
1086 scheduler
1087 .region_status
1088 .get(®ion_id)
1089 .unwrap()
1090 .pending_request
1091 .is_none()
1092 );
1093
1094 let (tx, _rx) = oneshot::channel();
1096 scheduler
1097 .schedule_compaction(
1098 region_id,
1099 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1100 &version_control,
1101 &env.access_layer,
1102 OptionOutputTx::new(Some(OutputTx::new(tx))),
1103 &manifest_ctx,
1104 schema_metadata_manager.clone(),
1105 1,
1106 )
1107 .await
1108 .unwrap();
1109 assert_eq!(1, scheduler.region_status.len());
1110 assert_eq!(1, job_scheduler.num_jobs());
1112 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1113 assert!(status.pending_request.is_some());
1114
1115 scheduler
1117 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1118 .await;
1119 assert_eq!(1, scheduler.region_status.len());
1120 assert_eq!(2, job_scheduler.num_jobs());
1121
1122 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1123 assert!(status.pending_request.is_none());
1124 }
1125
1126 #[tokio::test]
1127 async fn test_compaction_bypass_in_staging_mode() {
1128 let env = SchedulerEnv::new().await;
1129 let (tx, _rx) = mpsc::channel(4);
1130 let mut scheduler = env.mock_compaction_scheduler(tx);
1131
1132 let builder = VersionControlBuilder::new();
1134 let version_control = Arc::new(builder.build());
1135 let region_id = version_control.current().version.metadata.region_id;
1136
1137 let staging_manifest_ctx = {
1139 let manager = RegionManifestManager::new(
1140 version_control.current().version.metadata.clone(),
1141 0,
1142 RegionManifestOptions {
1143 manifest_dir: "".to_string(),
1144 object_store: env.access_layer.object_store().clone(),
1145 compress_type: CompressionType::Uncompressed,
1146 checkpoint_distance: 10,
1147 remove_file_options: Default::default(),
1148 manifest_cache: None,
1149 },
1150 FormatType::PrimaryKey,
1151 &Default::default(),
1152 )
1153 .await
1154 .unwrap();
1155 Arc::new(ManifestContext::new(
1156 manager,
1157 RegionRoleState::Leader(RegionLeaderState::Staging),
1158 ))
1159 };
1160
1161 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1162
1163 let (tx, rx) = oneshot::channel();
1165 scheduler
1166 .schedule_compaction(
1167 region_id,
1168 compact_request::Options::Regular(Default::default()),
1169 &version_control,
1170 &env.access_layer,
1171 OptionOutputTx::new(Some(OutputTx::new(tx))),
1172 &staging_manifest_ctx,
1173 schema_metadata_manager,
1174 1,
1175 )
1176 .await
1177 .unwrap();
1178
1179 let result = rx.await.unwrap();
1180 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1182 }
1183
1184 #[tokio::test]
1185 async fn test_concurrent_memory_competition() {
1186 let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); let barrier = Arc::new(Barrier::new(3));
1188 let mut handles = vec![];
1189
1190 for _i in 0..3 {
1192 let mgr = manager.clone();
1193 let bar = barrier.clone();
1194 let handle = tokio::spawn(async move {
1195 bar.wait().await; mgr.try_acquire(2 * 1024 * 1024)
1197 });
1198 handles.push(handle);
1199 }
1200
1201 let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
1202 .await
1203 .into_iter()
1204 .map(|r| r.unwrap())
1205 .collect();
1206
1207 let succeeded = results.iter().filter(|r| r.is_some()).count();
1209 let failed = results.iter().filter(|r| r.is_none()).count();
1210
1211 assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
1212 assert_eq!(failed, 2, "Expected 2 tasks to fail");
1213
1214 drop(results);
1216 assert_eq!(manager.used_bytes(), 0);
1217 }
1218}