1mod buckets;
16pub mod compactor;
17pub mod picker;
18pub mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{CompactionTask, new_picker};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
66use crate::schedule::remote_job_scheduler::{
67 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74pub struct CompactionRequest {
76 pub(crate) engine_config: Arc<MitoConfig>,
77 pub(crate) current_version: CompactionVersion,
78 pub(crate) access_layer: AccessLayerRef,
79 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
81 pub(crate) waiters: Vec<OutputTx>,
83 pub(crate) start_time: Instant,
85 pub(crate) cache_manager: CacheManagerRef,
86 pub(crate) manifest_ctx: ManifestContextRef,
87 pub(crate) listener: WorkerListener,
88 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89 pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93 pub(crate) fn region_id(&self) -> RegionId {
94 self.current_version.metadata.region_id
95 }
96}
97
98pub(crate) struct CompactionScheduler {
100 scheduler: SchedulerRef,
101 region_status: HashMap<RegionId, CompactionStatus>,
103 request_sender: Sender<WorkerRequestWithTime>,
105 cache_manager: CacheManagerRef,
106 engine_config: Arc<MitoConfig>,
107 listener: WorkerListener,
108 plugins: Plugins,
110}
111
112impl CompactionScheduler {
113 pub(crate) fn new(
114 scheduler: SchedulerRef,
115 request_sender: Sender<WorkerRequestWithTime>,
116 cache_manager: CacheManagerRef,
117 engine_config: Arc<MitoConfig>,
118 listener: WorkerListener,
119 plugins: Plugins,
120 ) -> Self {
121 Self {
122 scheduler,
123 region_status: HashMap::new(),
124 request_sender,
125 cache_manager,
126 engine_config,
127 listener,
128 plugins,
129 }
130 }
131
132 #[allow(clippy::too_many_arguments)]
134 pub(crate) async fn schedule_compaction(
135 &mut self,
136 region_id: RegionId,
137 compact_options: compact_request::Options,
138 version_control: &VersionControlRef,
139 access_layer: &AccessLayerRef,
140 waiter: OptionOutputTx,
141 manifest_ctx: &ManifestContextRef,
142 schema_metadata_manager: SchemaMetadataManagerRef,
143 max_parallelism: usize,
144 ) -> Result<()> {
145 let current_state = manifest_ctx.current_state();
147 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
148 info!(
149 "Skipping compaction for region {} in staging mode, options: {:?}",
150 region_id, compact_options
151 );
152 waiter.send(Ok(0));
153 return Ok(());
154 }
155
156 if let Some(status) = self.region_status.get_mut(®ion_id) {
157 match compact_options {
158 Options::Regular(_) => {
159 status.merge_waiter(waiter);
161 }
162 options @ Options::StrictWindow(_) => {
163 status.set_pending_request(PendingCompaction {
165 options,
166 waiter,
167 max_parallelism,
168 });
169 info!(
170 "Region {} is compacting, manually compaction will be re-scheduled.",
171 region_id
172 );
173 }
174 }
175 return Ok(());
176 }
177
178 let mut status =
180 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
181 let request = status.new_compaction_request(
182 self.request_sender.clone(),
183 waiter,
184 self.engine_config.clone(),
185 self.cache_manager.clone(),
186 manifest_ctx,
187 self.listener.clone(),
188 schema_metadata_manager,
189 max_parallelism,
190 );
191 self.region_status.insert(region_id, status);
192 let result = self
193 .schedule_compaction_request(request, compact_options)
194 .await;
195
196 self.listener.on_compaction_scheduled(region_id);
197 result
198 }
199
200 pub(crate) async fn on_compaction_finished(
202 &mut self,
203 region_id: RegionId,
204 manifest_ctx: &ManifestContextRef,
205 schema_metadata_manager: SchemaMetadataManagerRef,
206 ) {
207 let Some(status) = self.region_status.get_mut(®ion_id) else {
208 return;
209 };
210
211 if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
212 let PendingCompaction {
213 options,
214 waiter,
215 max_parallelism,
216 } = pending_request;
217
218 let request = status.new_compaction_request(
219 self.request_sender.clone(),
220 waiter,
221 self.engine_config.clone(),
222 self.cache_manager.clone(),
223 manifest_ctx,
224 self.listener.clone(),
225 schema_metadata_manager,
226 max_parallelism,
227 );
228
229 if let Err(e) = self.schedule_compaction_request(request, options).await {
230 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
231 } else {
232 debug!(
233 "Successfully scheduled manual compaction for region id: {}",
234 region_id
235 );
236 }
237 return;
238 }
239
240 let request = status.new_compaction_request(
242 self.request_sender.clone(),
243 OptionOutputTx::none(),
244 self.engine_config.clone(),
245 self.cache_manager.clone(),
246 manifest_ctx,
247 self.listener.clone(),
248 schema_metadata_manager,
249 MAX_PARALLEL_COMPACTION,
250 );
251 if let Err(e) = self
253 .schedule_compaction_request(
254 request,
255 compact_request::Options::Regular(Default::default()),
256 )
257 .await
258 {
259 error!(e; "Failed to schedule next compaction for region {}", region_id);
260 }
261 }
262
263 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
265 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
266 let Some(status) = self.region_status.remove(®ion_id) else {
268 return;
269 };
270
271 status.on_failure(err);
273 }
274
275 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
277 self.remove_region_on_failure(
278 region_id,
279 Arc::new(RegionDroppedSnafu { region_id }.build()),
280 );
281 }
282
283 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
285 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
286 }
287
288 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
290 self.remove_region_on_failure(
291 region_id,
292 Arc::new(RegionTruncatedSnafu { region_id }.build()),
293 );
294 }
295
296 async fn schedule_compaction_request(
300 &mut self,
301 request: CompactionRequest,
302 options: compact_request::Options,
303 ) -> Result<()> {
304 let picker = new_picker(
305 &options,
306 &request.current_version.options.compaction,
307 request.current_version.options.append_mode,
308 Some(self.engine_config.max_background_compactions),
309 );
310 let region_id = request.region_id();
311 let CompactionRequest {
312 engine_config,
313 current_version,
314 access_layer,
315 request_sender,
316 waiters,
317 start_time,
318 cache_manager,
319 manifest_ctx,
320 listener,
321 schema_metadata_manager,
322 max_parallelism,
323 } = request;
324
325 let ttl = find_ttl(
326 region_id.table_id(),
327 current_version.options.ttl,
328 &schema_metadata_manager,
329 )
330 .await
331 .unwrap_or_else(|e| {
332 warn!(e; "Failed to get ttl for region: {}", region_id);
333 TimeToLive::default()
334 });
335
336 debug!(
337 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
338 picker, region_id, ttl
339 );
340
341 let compaction_region = CompactionRegion {
342 region_id,
343 current_version: current_version.clone(),
344 region_options: current_version.options.clone(),
345 engine_config: engine_config.clone(),
346 region_metadata: current_version.metadata.clone(),
347 cache_manager: cache_manager.clone(),
348 access_layer: access_layer.clone(),
349 manifest_ctx: manifest_ctx.clone(),
350 file_purger: None,
351 ttl: Some(ttl),
352 max_parallelism,
353 };
354
355 let picker_output = {
356 let _pick_timer = COMPACTION_STAGE_ELAPSED
357 .with_label_values(&["pick"])
358 .start_timer();
359 picker.pick(&compaction_region)
360 };
361
362 let picker_output = if let Some(picker_output) = picker_output {
363 picker_output
364 } else {
365 for waiter in waiters {
367 waiter.send(Ok(0));
368 }
369 self.region_status.remove(®ion_id);
370 return Ok(());
371 };
372
373 let waiters = if current_version.options.compaction.remote_compaction() {
376 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
377 let remote_compaction_job = CompactionJob {
378 compaction_region: compaction_region.clone(),
379 picker_output: picker_output.clone(),
380 start_time,
381 waiters,
382 ttl,
383 };
384
385 let result = remote_job_scheduler
386 .schedule(
387 RemoteJob::CompactionJob(remote_compaction_job),
388 Box::new(DefaultNotifier {
389 request_sender: request_sender.clone(),
390 }),
391 )
392 .await;
393
394 match result {
395 Ok(job_id) => {
396 info!(
397 "Scheduled remote compaction job {} for region {}",
398 job_id, region_id
399 );
400 INFLIGHT_COMPACTION_COUNT.inc();
401 return Ok(());
402 }
403 Err(e) => {
404 if !current_version.options.compaction.fallback_to_local() {
405 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
406 return RemoteCompactionSnafu {
407 region_id,
408 job_id: None,
409 reason: e.reason,
410 }
411 .fail();
412 }
413
414 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
415
416 e.waiters
418 }
419 }
420 } else {
421 debug!(
422 "Remote compaction is not enabled, fallback to local compaction for region {}",
423 region_id
424 );
425 waiters
426 }
427 } else {
428 waiters
429 };
430
431 let mut local_compaction_task = Box::new(CompactionTaskImpl {
433 request_sender,
434 waiters,
435 start_time,
436 listener,
437 picker_output,
438 compaction_region,
439 compactor: Arc::new(DefaultCompactor {}),
440 });
441
442 self.scheduler
444 .schedule(Box::pin(async move {
445 INFLIGHT_COMPACTION_COUNT.inc();
446 local_compaction_task.run().await;
447 INFLIGHT_COMPACTION_COUNT.dec();
448 }))
449 .map_err(|e| {
450 error!(e; "Failed to submit compaction request for region {}", region_id);
451 self.region_status.remove(®ion_id);
453 e
454 })
455 }
456
457 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
458 let Some(status) = self.region_status.remove(®ion_id) else {
460 return;
461 };
462
463 status.on_failure(err);
465 }
466}
467
468impl Drop for CompactionScheduler {
469 fn drop(&mut self) {
470 for (region_id, status) in self.region_status.drain() {
471 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
473 }
474 }
475}
476
477async fn find_ttl(
479 table_id: TableId,
480 table_ttl: Option<TimeToLive>,
481 schema_metadata_manager: &SchemaMetadataManagerRef,
482) -> Result<TimeToLive> {
483 if let Some(table_ttl) = table_ttl {
485 return Ok(table_ttl);
486 }
487
488 let ttl = tokio::time::timeout(
489 crate::config::FETCH_OPTION_TIMEOUT,
490 schema_metadata_manager.get_schema_options_by_table_id(table_id),
491 )
492 .await
493 .context(TimeoutSnafu)?
494 .context(GetSchemaMetadataSnafu)?
495 .and_then(|options| options.ttl)
496 .unwrap_or_default()
497 .into();
498
499 Ok(ttl)
500}
501
502struct CompactionStatus {
504 region_id: RegionId,
506 version_control: VersionControlRef,
508 access_layer: AccessLayerRef,
510 waiters: Vec<OutputTx>,
512 pending_request: Option<PendingCompaction>,
514}
515
516impl CompactionStatus {
517 fn new(
519 region_id: RegionId,
520 version_control: VersionControlRef,
521 access_layer: AccessLayerRef,
522 ) -> CompactionStatus {
523 CompactionStatus {
524 region_id,
525 version_control,
526 access_layer,
527 waiters: Vec::new(),
528 pending_request: None,
529 }
530 }
531
532 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
534 if let Some(waiter) = waiter.take_inner() {
535 self.waiters.push(waiter);
536 }
537 }
538
539 fn set_pending_request(&mut self, pending: PendingCompaction) {
541 if let Some(prev) = self.pending_request.replace(pending) {
542 debug!(
543 "Replace pending compaction options with new request {:?} for region: {}",
544 prev.options, self.region_id
545 );
546 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
547 }
548 }
549
550 fn on_failure(mut self, err: Arc<Error>) {
551 for waiter in self.waiters.drain(..) {
552 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
553 region_id: self.region_id,
554 }));
555 }
556
557 if let Some(pending_compaction) = self.pending_request {
558 pending_compaction
559 .waiter
560 .send(Err(err.clone()).context(CompactRegionSnafu {
561 region_id: self.region_id,
562 }));
563 }
564 }
565
566 #[allow(clippy::too_many_arguments)]
570 fn new_compaction_request(
571 &mut self,
572 request_sender: Sender<WorkerRequestWithTime>,
573 mut waiter: OptionOutputTx,
574 engine_config: Arc<MitoConfig>,
575 cache_manager: CacheManagerRef,
576 manifest_ctx: &ManifestContextRef,
577 listener: WorkerListener,
578 schema_metadata_manager: SchemaMetadataManagerRef,
579 max_parallelism: usize,
580 ) -> CompactionRequest {
581 let current_version = CompactionVersion::from(self.version_control.current().version);
582 let start_time = Instant::now();
583 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
584 waiters.extend(std::mem::take(&mut self.waiters));
585
586 if let Some(waiter) = waiter.take_inner() {
587 waiters.push(waiter);
588 }
589
590 CompactionRequest {
591 engine_config,
592 current_version,
593 access_layer: self.access_layer.clone(),
594 request_sender: request_sender.clone(),
595 waiters,
596 start_time,
597 cache_manager,
598 manifest_ctx: manifest_ctx.clone(),
599 listener,
600 schema_metadata_manager,
601 max_parallelism,
602 }
603 }
604}
605
606#[derive(Debug, Clone)]
607pub struct CompactionOutput {
608 pub output_level: Level,
610 pub inputs: Vec<FileHandle>,
612 pub filter_deleted: bool,
614 pub output_time_range: Option<TimestampRange>,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct SerializedCompactionOutput {
621 output_level: Level,
622 inputs: Vec<FileMeta>,
623 filter_deleted: bool,
624 output_time_range: Option<TimestampRange>,
625}
626
627struct CompactionSstReaderBuilder<'a> {
629 metadata: RegionMetadataRef,
630 sst_layer: AccessLayerRef,
631 cache: CacheManagerRef,
632 inputs: &'a [FileHandle],
633 append_mode: bool,
634 filter_deleted: bool,
635 time_range: Option<TimestampRange>,
636 merge_mode: MergeMode,
637}
638
639impl CompactionSstReaderBuilder<'_> {
640 async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
642 let scan_input = self.build_scan_input(false)?.with_compaction(true);
643
644 SeqScan::new(scan_input).build_reader_for_compaction().await
645 }
646
647 async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
649 let scan_input = self.build_scan_input(true)?.with_compaction(true);
650
651 SeqScan::new(scan_input)
652 .build_flat_reader_for_compaction()
653 .await
654 }
655
656 fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
657 let mut scan_input = ScanInput::new(
658 self.sst_layer,
659 ProjectionMapper::all(&self.metadata, flat_format)?,
660 )
661 .with_files(self.inputs.to_vec())
662 .with_append_mode(self.append_mode)
663 .with_cache(CacheStrategy::Compaction(self.cache))
665 .with_filter_deleted(self.filter_deleted)
666 .with_ignore_file_not_found(true)
668 .with_merge_mode(self.merge_mode)
669 .with_flat_format(flat_format);
670
671 if let Some(time_range) = self.time_range {
674 scan_input =
675 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
676 }
677
678 Ok(scan_input)
679 }
680}
681
682fn time_range_to_predicate(
684 range: TimestampRange,
685 metadata: &RegionMetadataRef,
686) -> Result<PredicateGroup> {
687 let ts_col = metadata.time_index_column();
688
689 let ts_col_unit = ts_col
691 .column_schema
692 .data_type
693 .as_timestamp()
694 .unwrap()
695 .unit();
696
697 let exprs = match (range.start(), range.end()) {
698 (Some(start), Some(end)) => {
699 vec![
700 datafusion_expr::col(ts_col.column_schema.name.clone())
701 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
702 datafusion_expr::col(ts_col.column_schema.name.clone())
703 .lt(ts_to_lit(*end, ts_col_unit)?),
704 ]
705 }
706 (Some(start), None) => {
707 vec![
708 datafusion_expr::col(ts_col.column_schema.name.clone())
709 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
710 ]
711 }
712
713 (None, Some(end)) => {
714 vec![
715 datafusion_expr::col(ts_col.column_schema.name.clone())
716 .lt(ts_to_lit(*end, ts_col_unit)?),
717 ]
718 }
719 (None, None) => {
720 return Ok(PredicateGroup::default());
721 }
722 };
723
724 let predicate = PredicateGroup::new(metadata, &exprs)?;
725 Ok(predicate)
726}
727
728fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
729 let ts = ts
730 .convert_to(ts_col_unit)
731 .context(TimeRangePredicateOverflowSnafu {
732 timestamp: ts,
733 unit: ts_col_unit,
734 })?;
735 let val = ts.value();
736 let scalar_value = match ts_col_unit {
737 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
738 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
739 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
740 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
741 };
742 Ok(datafusion_expr::lit(scalar_value))
743}
744
745fn get_expired_ssts(
747 levels: &[LevelMeta],
748 ttl: Option<TimeToLive>,
749 now: Timestamp,
750) -> Vec<FileHandle> {
751 let Some(ttl) = ttl else {
752 return vec![];
753 };
754
755 levels
756 .iter()
757 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
758 .collect()
759}
760
761struct PendingCompaction {
764 pub(crate) options: compact_request::Options,
766 pub(crate) waiter: OptionOutputTx,
768 pub(crate) max_parallelism: usize,
770}
771
772#[cfg(test)]
773mod tests {
774 use api::v1::region::StrictWindow;
775 use common_datasource::compression::CompressionType;
776 use tokio::sync::oneshot;
777
778 use super::*;
779 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
780 use crate::region::ManifestContext;
781 use crate::sst::FormatType;
782 use crate::test_util::mock_schema_metadata_manager;
783 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
784 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
785
786 #[tokio::test]
787 async fn test_schedule_empty() {
788 let env = SchedulerEnv::new().await;
789 let (tx, _rx) = mpsc::channel(4);
790 let mut scheduler = env.mock_compaction_scheduler(tx);
791 let mut builder = VersionControlBuilder::new();
792 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
793 schema_metadata_manager
794 .register_region_table_info(
795 builder.region_id().table_id(),
796 "test_table",
797 "test_catalog",
798 "test_schema",
799 None,
800 kv_backend,
801 )
802 .await;
803 let version_control = Arc::new(builder.build());
805 let (output_tx, output_rx) = oneshot::channel();
806 let waiter = OptionOutputTx::from(output_tx);
807 let manifest_ctx = env
808 .mock_manifest_context(version_control.current().version.metadata.clone())
809 .await;
810 scheduler
811 .schedule_compaction(
812 builder.region_id(),
813 compact_request::Options::Regular(Default::default()),
814 &version_control,
815 &env.access_layer,
816 waiter,
817 &manifest_ctx,
818 schema_metadata_manager.clone(),
819 1,
820 )
821 .await
822 .unwrap();
823 let output = output_rx.await.unwrap().unwrap();
824 assert_eq!(output, 0);
825 assert!(scheduler.region_status.is_empty());
826
827 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
829 let (output_tx, output_rx) = oneshot::channel();
830 let waiter = OptionOutputTx::from(output_tx);
831 scheduler
832 .schedule_compaction(
833 builder.region_id(),
834 compact_request::Options::Regular(Default::default()),
835 &version_control,
836 &env.access_layer,
837 waiter,
838 &manifest_ctx,
839 schema_metadata_manager,
840 1,
841 )
842 .await
843 .unwrap();
844 let output = output_rx.await.unwrap().unwrap();
845 assert_eq!(output, 0);
846 assert!(scheduler.region_status.is_empty());
847 }
848
849 #[tokio::test]
850 async fn test_schedule_on_finished() {
851 common_telemetry::init_default_ut_logging();
852 let job_scheduler = Arc::new(VecScheduler::default());
853 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
854 let (tx, _rx) = mpsc::channel(4);
855 let mut scheduler = env.mock_compaction_scheduler(tx);
856 let mut builder = VersionControlBuilder::new();
857 let purger = builder.file_purger();
858 let region_id = builder.region_id();
859
860 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
861 schema_metadata_manager
862 .register_region_table_info(
863 builder.region_id().table_id(),
864 "test_table",
865 "test_catalog",
866 "test_schema",
867 None,
868 kv_backend,
869 )
870 .await;
871
872 let end = 1000 * 1000;
874 let version_control = Arc::new(
875 builder
876 .push_l0_file(0, end)
877 .push_l0_file(10, end)
878 .push_l0_file(50, end)
879 .push_l0_file(80, end)
880 .push_l0_file(90, end)
881 .build(),
882 );
883 let manifest_ctx = env
884 .mock_manifest_context(version_control.current().version.metadata.clone())
885 .await;
886 scheduler
887 .schedule_compaction(
888 region_id,
889 compact_request::Options::Regular(Default::default()),
890 &version_control,
891 &env.access_layer,
892 OptionOutputTx::none(),
893 &manifest_ctx,
894 schema_metadata_manager.clone(),
895 1,
896 )
897 .await
898 .unwrap();
899 assert_eq!(1, scheduler.region_status.len());
901 assert_eq!(1, job_scheduler.num_jobs());
902 let data = version_control.current();
903 let file_metas: Vec<_> = data.version.ssts.levels()[0]
904 .files
905 .values()
906 .map(|file| file.meta_ref().clone())
907 .collect();
908
909 apply_edit(
911 &version_control,
912 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
913 &file_metas,
914 purger.clone(),
915 );
916 let (tx, _rx) = oneshot::channel();
918 scheduler
919 .schedule_compaction(
920 region_id,
921 compact_request::Options::Regular(Default::default()),
922 &version_control,
923 &env.access_layer,
924 OptionOutputTx::new(Some(OutputTx::new(tx))),
925 &manifest_ctx,
926 schema_metadata_manager.clone(),
927 1,
928 )
929 .await
930 .unwrap();
931 assert_eq!(1, scheduler.region_status.len());
932 assert_eq!(1, job_scheduler.num_jobs());
933 assert!(
934 !scheduler
935 .region_status
936 .get(&builder.region_id())
937 .unwrap()
938 .waiters
939 .is_empty()
940 );
941
942 scheduler
944 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
945 .await;
946 assert_eq!(1, scheduler.region_status.len());
947 assert_eq!(2, job_scheduler.num_jobs());
948
949 apply_edit(
951 &version_control,
952 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
953 &[],
954 purger.clone(),
955 );
956 let (tx, _rx) = oneshot::channel();
957 scheduler
959 .schedule_compaction(
960 region_id,
961 compact_request::Options::Regular(Default::default()),
962 &version_control,
963 &env.access_layer,
964 OptionOutputTx::new(Some(OutputTx::new(tx))),
965 &manifest_ctx,
966 schema_metadata_manager,
967 1,
968 )
969 .await
970 .unwrap();
971 assert_eq!(2, job_scheduler.num_jobs());
972 assert!(
973 !scheduler
974 .region_status
975 .get(&builder.region_id())
976 .unwrap()
977 .waiters
978 .is_empty()
979 );
980 }
981
982 #[tokio::test]
983 async fn test_manual_compaction_when_compaction_in_progress() {
984 common_telemetry::init_default_ut_logging();
985 let job_scheduler = Arc::new(VecScheduler::default());
986 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
987 let (tx, _rx) = mpsc::channel(4);
988 let mut scheduler = env.mock_compaction_scheduler(tx);
989 let mut builder = VersionControlBuilder::new();
990 let purger = builder.file_purger();
991 let region_id = builder.region_id();
992
993 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
994 schema_metadata_manager
995 .register_region_table_info(
996 builder.region_id().table_id(),
997 "test_table",
998 "test_catalog",
999 "test_schema",
1000 None,
1001 kv_backend,
1002 )
1003 .await;
1004
1005 let end = 1000 * 1000;
1007 let version_control = Arc::new(
1008 builder
1009 .push_l0_file(0, end)
1010 .push_l0_file(10, end)
1011 .push_l0_file(50, end)
1012 .push_l0_file(80, end)
1013 .push_l0_file(90, end)
1014 .build(),
1015 );
1016 let manifest_ctx = env
1017 .mock_manifest_context(version_control.current().version.metadata.clone())
1018 .await;
1019
1020 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1021 .files
1022 .values()
1023 .map(|file| file.meta_ref().clone())
1024 .collect();
1025
1026 apply_edit(
1028 &version_control,
1029 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1030 &file_metas,
1031 purger.clone(),
1032 );
1033
1034 scheduler
1035 .schedule_compaction(
1036 region_id,
1037 compact_request::Options::Regular(Default::default()),
1038 &version_control,
1039 &env.access_layer,
1040 OptionOutputTx::none(),
1041 &manifest_ctx,
1042 schema_metadata_manager.clone(),
1043 1,
1044 )
1045 .await
1046 .unwrap();
1047 assert_eq!(1, scheduler.region_status.len());
1049 assert_eq!(1, job_scheduler.num_jobs());
1050 assert!(
1051 scheduler
1052 .region_status
1053 .get(®ion_id)
1054 .unwrap()
1055 .pending_request
1056 .is_none()
1057 );
1058
1059 let (tx, _rx) = oneshot::channel();
1061 scheduler
1062 .schedule_compaction(
1063 region_id,
1064 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1065 &version_control,
1066 &env.access_layer,
1067 OptionOutputTx::new(Some(OutputTx::new(tx))),
1068 &manifest_ctx,
1069 schema_metadata_manager.clone(),
1070 1,
1071 )
1072 .await
1073 .unwrap();
1074 assert_eq!(1, scheduler.region_status.len());
1075 assert_eq!(1, job_scheduler.num_jobs());
1077 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1078 assert!(status.pending_request.is_some());
1079
1080 scheduler
1082 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1083 .await;
1084 assert_eq!(1, scheduler.region_status.len());
1085 assert_eq!(2, job_scheduler.num_jobs());
1086
1087 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1088 assert!(status.pending_request.is_none());
1089 }
1090
1091 #[tokio::test]
1092 async fn test_compaction_bypass_in_staging_mode() {
1093 let env = SchedulerEnv::new().await;
1094 let (tx, _rx) = mpsc::channel(4);
1095 let mut scheduler = env.mock_compaction_scheduler(tx);
1096
1097 let builder = VersionControlBuilder::new();
1099 let version_control = Arc::new(builder.build());
1100 let region_id = version_control.current().version.metadata.region_id;
1101
1102 let staging_manifest_ctx = {
1104 let manager = RegionManifestManager::new(
1105 version_control.current().version.metadata.clone(),
1106 0,
1107 RegionManifestOptions {
1108 manifest_dir: "".to_string(),
1109 object_store: env.access_layer.object_store().clone(),
1110 compress_type: CompressionType::Uncompressed,
1111 checkpoint_distance: 10,
1112 remove_file_options: Default::default(),
1113 },
1114 FormatType::PrimaryKey,
1115 &Default::default(),
1116 )
1117 .await
1118 .unwrap();
1119 Arc::new(ManifestContext::new(
1120 manager,
1121 RegionRoleState::Leader(RegionLeaderState::Staging),
1122 ))
1123 };
1124
1125 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1126
1127 let (tx, rx) = oneshot::channel();
1129 scheduler
1130 .schedule_compaction(
1131 region_id,
1132 compact_request::Options::Regular(Default::default()),
1133 &version_control,
1134 &env.access_layer,
1135 OptionOutputTx::new(Some(OutputTx::new(tx))),
1136 &staging_manifest_ctx,
1137 schema_metadata_manager,
1138 1,
1139 )
1140 .await
1141 .unwrap();
1142
1143 let result = rx.await.unwrap();
1144 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1146 }
1147}