1mod buckets;
16pub mod compactor;
17pub mod picker;
18mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{new_picker, CompactionTask};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::BoxedBatchReader;
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::ManifestContextRef;
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
66use crate::schedule::remote_job_scheduler::{
67 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74pub struct CompactionRequest {
76 pub(crate) engine_config: Arc<MitoConfig>,
77 pub(crate) current_version: CompactionVersion,
78 pub(crate) access_layer: AccessLayerRef,
79 pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
81 pub(crate) waiters: Vec<OutputTx>,
83 pub(crate) start_time: Instant,
85 pub(crate) cache_manager: CacheManagerRef,
86 pub(crate) manifest_ctx: ManifestContextRef,
87 pub(crate) listener: WorkerListener,
88 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89 pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93 pub(crate) fn region_id(&self) -> RegionId {
94 self.current_version.metadata.region_id
95 }
96}
97
98pub(crate) struct CompactionScheduler {
100 scheduler: SchedulerRef,
101 region_status: HashMap<RegionId, CompactionStatus>,
103 request_sender: Sender<WorkerRequest>,
105 cache_manager: CacheManagerRef,
106 engine_config: Arc<MitoConfig>,
107 listener: WorkerListener,
108 plugins: Plugins,
110}
111
112impl CompactionScheduler {
113 pub(crate) fn new(
114 scheduler: SchedulerRef,
115 request_sender: Sender<WorkerRequest>,
116 cache_manager: CacheManagerRef,
117 engine_config: Arc<MitoConfig>,
118 listener: WorkerListener,
119 plugins: Plugins,
120 ) -> Self {
121 Self {
122 scheduler,
123 region_status: HashMap::new(),
124 request_sender,
125 cache_manager,
126 engine_config,
127 listener,
128 plugins,
129 }
130 }
131
132 #[allow(clippy::too_many_arguments)]
134 pub(crate) async fn schedule_compaction(
135 &mut self,
136 region_id: RegionId,
137 compact_options: compact_request::Options,
138 version_control: &VersionControlRef,
139 access_layer: &AccessLayerRef,
140 waiter: OptionOutputTx,
141 manifest_ctx: &ManifestContextRef,
142 schema_metadata_manager: SchemaMetadataManagerRef,
143 max_parallelism: usize,
144 ) -> Result<()> {
145 if let Some(status) = self.region_status.get_mut(®ion_id) {
146 match compact_options {
147 Options::Regular(_) => {
148 status.merge_waiter(waiter);
150 }
151 options @ Options::StrictWindow(_) => {
152 status.set_pending_request(PendingCompaction {
154 options,
155 waiter,
156 max_parallelism,
157 });
158 info!(
159 "Region {} is compacting, manually compaction will be re-scheduled.",
160 region_id
161 );
162 }
163 }
164 return Ok(());
165 }
166
167 let mut status =
169 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
170 let request = status.new_compaction_request(
171 self.request_sender.clone(),
172 waiter,
173 self.engine_config.clone(),
174 self.cache_manager.clone(),
175 manifest_ctx,
176 self.listener.clone(),
177 schema_metadata_manager,
178 max_parallelism,
179 );
180 self.region_status.insert(region_id, status);
181 let result = self
182 .schedule_compaction_request(request, compact_options)
183 .await;
184
185 self.listener.on_compaction_scheduled(region_id);
186 result
187 }
188
189 pub(crate) async fn on_compaction_finished(
191 &mut self,
192 region_id: RegionId,
193 manifest_ctx: &ManifestContextRef,
194 schema_metadata_manager: SchemaMetadataManagerRef,
195 ) {
196 let Some(status) = self.region_status.get_mut(®ion_id) else {
197 return;
198 };
199
200 if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
201 let PendingCompaction {
202 options,
203 waiter,
204 max_parallelism,
205 } = pending_request;
206
207 let request = status.new_compaction_request(
208 self.request_sender.clone(),
209 waiter,
210 self.engine_config.clone(),
211 self.cache_manager.clone(),
212 manifest_ctx,
213 self.listener.clone(),
214 schema_metadata_manager,
215 max_parallelism,
216 );
217
218 if let Err(e) = self.schedule_compaction_request(request, options).await {
219 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
220 } else {
221 debug!(
222 "Successfully scheduled manual compaction for region id: {}",
223 region_id
224 );
225 }
226 return;
227 }
228
229 let request = status.new_compaction_request(
231 self.request_sender.clone(),
232 OptionOutputTx::none(),
233 self.engine_config.clone(),
234 self.cache_manager.clone(),
235 manifest_ctx,
236 self.listener.clone(),
237 schema_metadata_manager,
238 MAX_PARALLEL_COMPACTION,
239 );
240 if let Err(e) = self
242 .schedule_compaction_request(
243 request,
244 compact_request::Options::Regular(Default::default()),
245 )
246 .await
247 {
248 error!(e; "Failed to schedule next compaction for region {}", region_id);
249 }
250 }
251
252 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
254 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
255 let Some(status) = self.region_status.remove(®ion_id) else {
257 return;
258 };
259
260 status.on_failure(err);
262 }
263
264 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
266 self.remove_region_on_failure(
267 region_id,
268 Arc::new(RegionDroppedSnafu { region_id }.build()),
269 );
270 }
271
272 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
274 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
275 }
276
277 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
279 self.remove_region_on_failure(
280 region_id,
281 Arc::new(RegionTruncatedSnafu { region_id }.build()),
282 );
283 }
284
285 async fn schedule_compaction_request(
289 &mut self,
290 request: CompactionRequest,
291 options: compact_request::Options,
292 ) -> Result<()> {
293 let picker = new_picker(
294 &options,
295 &request.current_version.options.compaction,
296 request.current_version.options.append_mode,
297 );
298 let region_id = request.region_id();
299 let CompactionRequest {
300 engine_config,
301 current_version,
302 access_layer,
303 request_sender,
304 waiters,
305 start_time,
306 cache_manager,
307 manifest_ctx,
308 listener,
309 schema_metadata_manager,
310 max_parallelism,
311 } = request;
312
313 let ttl = find_ttl(
314 region_id.table_id(),
315 current_version.options.ttl,
316 &schema_metadata_manager,
317 )
318 .await
319 .unwrap_or_else(|e| {
320 warn!(e; "Failed to get ttl for region: {}", region_id);
321 TimeToLive::default()
322 });
323
324 debug!(
325 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
326 picker, region_id, ttl
327 );
328
329 let compaction_region = CompactionRegion {
330 region_id,
331 region_dir: access_layer.region_dir().to_string(),
332 current_version: current_version.clone(),
333 region_options: current_version.options.clone(),
334 engine_config: engine_config.clone(),
335 region_metadata: current_version.metadata.clone(),
336 cache_manager: cache_manager.clone(),
337 access_layer: access_layer.clone(),
338 manifest_ctx: manifest_ctx.clone(),
339 file_purger: None,
340 ttl: Some(ttl),
341 max_parallelism,
342 };
343
344 let picker_output = {
345 let _pick_timer = COMPACTION_STAGE_ELAPSED
346 .with_label_values(&["pick"])
347 .start_timer();
348 picker.pick(&compaction_region)
349 };
350
351 let picker_output = if let Some(picker_output) = picker_output {
352 picker_output
353 } else {
354 for waiter in waiters {
356 waiter.send(Ok(0));
357 }
358 self.region_status.remove(®ion_id);
359 return Ok(());
360 };
361
362 let waiters = if current_version.options.compaction.remote_compaction() {
365 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
366 let remote_compaction_job = CompactionJob {
367 compaction_region: compaction_region.clone(),
368 picker_output: picker_output.clone(),
369 start_time,
370 waiters,
371 };
372
373 let result = remote_job_scheduler
374 .schedule(
375 RemoteJob::CompactionJob(remote_compaction_job),
376 Box::new(DefaultNotifier {
377 request_sender: request_sender.clone(),
378 }),
379 )
380 .await;
381
382 match result {
383 Ok(job_id) => {
384 info!(
385 "Scheduled remote compaction job {} for region {}",
386 job_id, region_id
387 );
388 INFLIGHT_COMPACTION_COUNT.inc();
389 return Ok(());
390 }
391 Err(e) => {
392 if !current_version.options.compaction.fallback_to_local() {
393 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
394 return RemoteCompactionSnafu {
395 region_id,
396 job_id: None,
397 reason: e.reason,
398 }
399 .fail();
400 }
401
402 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
403
404 e.waiters
406 }
407 }
408 } else {
409 debug!(
410 "Remote compaction is not enabled, fallback to local compaction for region {}",
411 region_id
412 );
413 waiters
414 }
415 } else {
416 waiters
417 };
418
419 let mut local_compaction_task = Box::new(CompactionTaskImpl {
421 request_sender,
422 waiters,
423 start_time,
424 listener,
425 picker_output,
426 compaction_region,
427 compactor: Arc::new(DefaultCompactor {}),
428 });
429
430 self.scheduler
432 .schedule(Box::pin(async move {
433 INFLIGHT_COMPACTION_COUNT.inc();
434 local_compaction_task.run().await;
435 INFLIGHT_COMPACTION_COUNT.dec();
436 }))
437 .map_err(|e| {
438 error!(e; "Failed to submit compaction request for region {}", region_id);
439 self.region_status.remove(®ion_id);
441 e
442 })
443 }
444
445 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
446 let Some(status) = self.region_status.remove(®ion_id) else {
448 return;
449 };
450
451 status.on_failure(err);
453 }
454}
455
456impl Drop for CompactionScheduler {
457 fn drop(&mut self) {
458 for (region_id, status) in self.region_status.drain() {
459 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
461 }
462 }
463}
464
465async fn find_ttl(
467 table_id: TableId,
468 table_ttl: Option<TimeToLive>,
469 schema_metadata_manager: &SchemaMetadataManagerRef,
470) -> Result<TimeToLive> {
471 if let Some(table_ttl) = table_ttl {
473 return Ok(table_ttl);
474 }
475
476 let ttl = tokio::time::timeout(
477 crate::config::FETCH_OPTION_TIMEOUT,
478 schema_metadata_manager.get_schema_options_by_table_id(table_id),
479 )
480 .await
481 .context(TimeoutSnafu)?
482 .context(GetSchemaMetadataSnafu)?
483 .and_then(|options| options.ttl)
484 .unwrap_or_default()
485 .into();
486
487 Ok(ttl)
488}
489
490struct CompactionStatus {
492 region_id: RegionId,
494 version_control: VersionControlRef,
496 access_layer: AccessLayerRef,
498 waiters: Vec<OutputTx>,
500 pending_request: Option<PendingCompaction>,
502}
503
504impl CompactionStatus {
505 fn new(
507 region_id: RegionId,
508 version_control: VersionControlRef,
509 access_layer: AccessLayerRef,
510 ) -> CompactionStatus {
511 CompactionStatus {
512 region_id,
513 version_control,
514 access_layer,
515 waiters: Vec::new(),
516 pending_request: None,
517 }
518 }
519
520 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
522 if let Some(waiter) = waiter.take_inner() {
523 self.waiters.push(waiter);
524 }
525 }
526
527 fn set_pending_request(&mut self, pending: PendingCompaction) {
529 if let Some(mut prev) = self.pending_request.replace(pending) {
530 debug!(
531 "Replace pending compaction options with new request {:?} for region: {}",
532 prev.options, self.region_id
533 );
534 if let Some(waiter) = prev.waiter.take_inner() {
535 waiter.send(ManualCompactionOverrideSnafu.fail());
536 }
537 }
538 }
539
540 fn on_failure(mut self, err: Arc<Error>) {
541 for waiter in self.waiters.drain(..) {
542 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
543 region_id: self.region_id,
544 }));
545 }
546
547 if let Some(pending_compaction) = self.pending_request {
548 pending_compaction
549 .waiter
550 .send(Err(err.clone()).context(CompactRegionSnafu {
551 region_id: self.region_id,
552 }));
553 }
554 }
555
556 #[allow(clippy::too_many_arguments)]
560 fn new_compaction_request(
561 &mut self,
562 request_sender: Sender<WorkerRequest>,
563 mut waiter: OptionOutputTx,
564 engine_config: Arc<MitoConfig>,
565 cache_manager: CacheManagerRef,
566 manifest_ctx: &ManifestContextRef,
567 listener: WorkerListener,
568 schema_metadata_manager: SchemaMetadataManagerRef,
569 max_parallelism: usize,
570 ) -> CompactionRequest {
571 let current_version = CompactionVersion::from(self.version_control.current().version);
572 let start_time = Instant::now();
573 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
574 waiters.extend(std::mem::take(&mut self.waiters));
575
576 if let Some(waiter) = waiter.take_inner() {
577 waiters.push(waiter);
578 }
579
580 CompactionRequest {
581 engine_config,
582 current_version,
583 access_layer: self.access_layer.clone(),
584 request_sender: request_sender.clone(),
585 waiters,
586 start_time,
587 cache_manager,
588 manifest_ctx: manifest_ctx.clone(),
589 listener,
590 schema_metadata_manager,
591 max_parallelism,
592 }
593 }
594}
595
596#[derive(Debug, Clone)]
597pub struct CompactionOutput {
598 pub output_level: Level,
600 pub inputs: Vec<FileHandle>,
602 pub filter_deleted: bool,
604 pub output_time_range: Option<TimestampRange>,
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
610pub struct SerializedCompactionOutput {
611 output_level: Level,
612 inputs: Vec<FileMeta>,
613 filter_deleted: bool,
614 output_time_range: Option<TimestampRange>,
615}
616
617struct CompactionSstReaderBuilder<'a> {
619 metadata: RegionMetadataRef,
620 sst_layer: AccessLayerRef,
621 cache: CacheManagerRef,
622 inputs: &'a [FileHandle],
623 append_mode: bool,
624 filter_deleted: bool,
625 time_range: Option<TimestampRange>,
626 merge_mode: MergeMode,
627}
628
629impl CompactionSstReaderBuilder<'_> {
630 async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
632 let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
633 .with_files(self.inputs.to_vec())
634 .with_append_mode(self.append_mode)
635 .with_cache(CacheStrategy::Compaction(self.cache))
637 .with_filter_deleted(self.filter_deleted)
638 .with_ignore_file_not_found(true)
640 .with_merge_mode(self.merge_mode);
641
642 if let Some(time_range) = self.time_range {
645 scan_input =
646 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
647 }
648
649 SeqScan::new(scan_input, true)
650 .build_reader_for_compaction()
651 .await
652 }
653}
654
655fn time_range_to_predicate(
657 range: TimestampRange,
658 metadata: &RegionMetadataRef,
659) -> Result<PredicateGroup> {
660 let ts_col = metadata.time_index_column();
661
662 let ts_col_unit = ts_col
664 .column_schema
665 .data_type
666 .as_timestamp()
667 .unwrap()
668 .unit();
669
670 let exprs = match (range.start(), range.end()) {
671 (Some(start), Some(end)) => {
672 vec![
673 datafusion_expr::col(ts_col.column_schema.name.clone())
674 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
675 datafusion_expr::col(ts_col.column_schema.name.clone())
676 .lt(ts_to_lit(*end, ts_col_unit)?),
677 ]
678 }
679 (Some(start), None) => {
680 vec![datafusion_expr::col(ts_col.column_schema.name.clone())
681 .gt_eq(ts_to_lit(*start, ts_col_unit)?)]
682 }
683
684 (None, Some(end)) => {
685 vec![datafusion_expr::col(ts_col.column_schema.name.clone())
686 .lt(ts_to_lit(*end, ts_col_unit)?)]
687 }
688 (None, None) => {
689 return Ok(PredicateGroup::default());
690 }
691 };
692
693 let predicate = PredicateGroup::new(metadata, &exprs);
694 Ok(predicate)
695}
696
697fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
698 let ts = ts
699 .convert_to(ts_col_unit)
700 .context(TimeRangePredicateOverflowSnafu {
701 timestamp: ts,
702 unit: ts_col_unit,
703 })?;
704 let val = ts.value();
705 let scalar_value = match ts_col_unit {
706 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
707 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
708 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
709 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
710 };
711 Ok(datafusion_expr::lit(scalar_value))
712}
713
714fn get_expired_ssts(
716 levels: &[LevelMeta],
717 ttl: Option<TimeToLive>,
718 now: Timestamp,
719) -> Vec<FileHandle> {
720 let Some(ttl) = ttl else {
721 return vec![];
722 };
723
724 levels
725 .iter()
726 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
727 .collect()
728}
729
730struct PendingCompaction {
733 pub(crate) options: compact_request::Options,
735 pub(crate) waiter: OptionOutputTx,
737 pub(crate) max_parallelism: usize,
739}
740
741#[cfg(test)]
742mod tests {
743 use api::v1::region::StrictWindow;
744 use tokio::sync::oneshot;
745
746 use super::*;
747 use crate::test_util::mock_schema_metadata_manager;
748 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
749 use crate::test_util::version_util::{apply_edit, VersionControlBuilder};
750
751 #[tokio::test]
752 async fn test_schedule_empty() {
753 let env = SchedulerEnv::new().await;
754 let (tx, _rx) = mpsc::channel(4);
755 let mut scheduler = env.mock_compaction_scheduler(tx);
756 let mut builder = VersionControlBuilder::new();
757 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
758 schema_metadata_manager
759 .register_region_table_info(
760 builder.region_id().table_id(),
761 "test_table",
762 "test_catalog",
763 "test_schema",
764 None,
765 kv_backend,
766 )
767 .await;
768 let version_control = Arc::new(builder.build());
770 let (output_tx, output_rx) = oneshot::channel();
771 let waiter = OptionOutputTx::from(output_tx);
772 let manifest_ctx = env
773 .mock_manifest_context(version_control.current().version.metadata.clone())
774 .await;
775 scheduler
776 .schedule_compaction(
777 builder.region_id(),
778 compact_request::Options::Regular(Default::default()),
779 &version_control,
780 &env.access_layer,
781 waiter,
782 &manifest_ctx,
783 schema_metadata_manager.clone(),
784 1,
785 )
786 .await
787 .unwrap();
788 let output = output_rx.await.unwrap().unwrap();
789 assert_eq!(output, 0);
790 assert!(scheduler.region_status.is_empty());
791
792 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
794 let (output_tx, output_rx) = oneshot::channel();
795 let waiter = OptionOutputTx::from(output_tx);
796 scheduler
797 .schedule_compaction(
798 builder.region_id(),
799 compact_request::Options::Regular(Default::default()),
800 &version_control,
801 &env.access_layer,
802 waiter,
803 &manifest_ctx,
804 schema_metadata_manager,
805 1,
806 )
807 .await
808 .unwrap();
809 let output = output_rx.await.unwrap().unwrap();
810 assert_eq!(output, 0);
811 assert!(scheduler.region_status.is_empty());
812 }
813
814 #[tokio::test]
815 async fn test_schedule_on_finished() {
816 common_telemetry::init_default_ut_logging();
817 let job_scheduler = Arc::new(VecScheduler::default());
818 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
819 let (tx, _rx) = mpsc::channel(4);
820 let mut scheduler = env.mock_compaction_scheduler(tx);
821 let mut builder = VersionControlBuilder::new();
822 let purger = builder.file_purger();
823 let region_id = builder.region_id();
824
825 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
826 schema_metadata_manager
827 .register_region_table_info(
828 builder.region_id().table_id(),
829 "test_table",
830 "test_catalog",
831 "test_schema",
832 None,
833 kv_backend,
834 )
835 .await;
836
837 let end = 1000 * 1000;
839 let version_control = Arc::new(
840 builder
841 .push_l0_file(0, end)
842 .push_l0_file(10, end)
843 .push_l0_file(50, end)
844 .push_l0_file(80, end)
845 .push_l0_file(90, end)
846 .build(),
847 );
848 let manifest_ctx = env
849 .mock_manifest_context(version_control.current().version.metadata.clone())
850 .await;
851 scheduler
852 .schedule_compaction(
853 region_id,
854 compact_request::Options::Regular(Default::default()),
855 &version_control,
856 &env.access_layer,
857 OptionOutputTx::none(),
858 &manifest_ctx,
859 schema_metadata_manager.clone(),
860 1,
861 )
862 .await
863 .unwrap();
864 assert_eq!(1, scheduler.region_status.len());
866 assert_eq!(1, job_scheduler.num_jobs());
867 let data = version_control.current();
868 let file_metas: Vec<_> = data.version.ssts.levels()[0]
869 .files
870 .values()
871 .map(|file| file.meta_ref().clone())
872 .collect();
873
874 apply_edit(
876 &version_control,
877 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
878 &file_metas,
879 purger.clone(),
880 );
881 let (tx, _rx) = oneshot::channel();
883 scheduler
884 .schedule_compaction(
885 region_id,
886 compact_request::Options::Regular(Default::default()),
887 &version_control,
888 &env.access_layer,
889 OptionOutputTx::new(Some(OutputTx::new(tx))),
890 &manifest_ctx,
891 schema_metadata_manager.clone(),
892 1,
893 )
894 .await
895 .unwrap();
896 assert_eq!(1, scheduler.region_status.len());
897 assert_eq!(1, job_scheduler.num_jobs());
898 assert!(!scheduler
899 .region_status
900 .get(&builder.region_id())
901 .unwrap()
902 .waiters
903 .is_empty());
904
905 scheduler
907 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
908 .await;
909 assert_eq!(1, scheduler.region_status.len());
910 assert_eq!(2, job_scheduler.num_jobs());
911
912 apply_edit(
914 &version_control,
915 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
916 &[],
917 purger.clone(),
918 );
919 let (tx, _rx) = oneshot::channel();
920 scheduler
922 .schedule_compaction(
923 region_id,
924 compact_request::Options::Regular(Default::default()),
925 &version_control,
926 &env.access_layer,
927 OptionOutputTx::new(Some(OutputTx::new(tx))),
928 &manifest_ctx,
929 schema_metadata_manager,
930 1,
931 )
932 .await
933 .unwrap();
934 assert_eq!(2, job_scheduler.num_jobs());
935 assert!(!scheduler
936 .region_status
937 .get(&builder.region_id())
938 .unwrap()
939 .waiters
940 .is_empty());
941 }
942
943 #[tokio::test]
944 async fn test_manual_compaction_when_compaction_in_progress() {
945 common_telemetry::init_default_ut_logging();
946 let job_scheduler = Arc::new(VecScheduler::default());
947 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
948 let (tx, _rx) = mpsc::channel(4);
949 let mut scheduler = env.mock_compaction_scheduler(tx);
950 let mut builder = VersionControlBuilder::new();
951 let purger = builder.file_purger();
952 let region_id = builder.region_id();
953
954 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
955 schema_metadata_manager
956 .register_region_table_info(
957 builder.region_id().table_id(),
958 "test_table",
959 "test_catalog",
960 "test_schema",
961 None,
962 kv_backend,
963 )
964 .await;
965
966 let end = 1000 * 1000;
968 let version_control = Arc::new(
969 builder
970 .push_l0_file(0, end)
971 .push_l0_file(10, end)
972 .push_l0_file(50, end)
973 .push_l0_file(80, end)
974 .push_l0_file(90, end)
975 .build(),
976 );
977 let manifest_ctx = env
978 .mock_manifest_context(version_control.current().version.metadata.clone())
979 .await;
980
981 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
982 .files
983 .values()
984 .map(|file| file.meta_ref().clone())
985 .collect();
986
987 apply_edit(
989 &version_control,
990 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
991 &file_metas,
992 purger.clone(),
993 );
994
995 scheduler
996 .schedule_compaction(
997 region_id,
998 compact_request::Options::Regular(Default::default()),
999 &version_control,
1000 &env.access_layer,
1001 OptionOutputTx::none(),
1002 &manifest_ctx,
1003 schema_metadata_manager.clone(),
1004 1,
1005 )
1006 .await
1007 .unwrap();
1008 assert_eq!(1, scheduler.region_status.len());
1010 assert_eq!(1, job_scheduler.num_jobs());
1011 assert!(scheduler
1012 .region_status
1013 .get(®ion_id)
1014 .unwrap()
1015 .pending_request
1016 .is_none());
1017
1018 let (tx, _rx) = oneshot::channel();
1020 scheduler
1021 .schedule_compaction(
1022 region_id,
1023 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1024 &version_control,
1025 &env.access_layer,
1026 OptionOutputTx::new(Some(OutputTx::new(tx))),
1027 &manifest_ctx,
1028 schema_metadata_manager.clone(),
1029 1,
1030 )
1031 .await
1032 .unwrap();
1033 assert_eq!(1, scheduler.region_status.len());
1034 assert_eq!(1, job_scheduler.num_jobs());
1036 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1037 assert!(status.pending_request.is_some());
1038
1039 scheduler
1041 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1042 .await;
1043 assert_eq!(1, scheduler.region_status.len());
1044 assert_eq!(2, job_scheduler.num_jobs());
1045
1046 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1047 assert!(status.pending_request.is_none());
1048 }
1049}