1mod buckets;
16pub mod compactor;
17pub mod picker;
18pub mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{CompactionTask, new_picker};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::{BoxedBatchReader, BoxedRecordBatchStream};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
66use crate::schedule::remote_job_scheduler::{
67 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74pub struct CompactionRequest {
76 pub(crate) engine_config: Arc<MitoConfig>,
77 pub(crate) current_version: CompactionVersion,
78 pub(crate) access_layer: AccessLayerRef,
79 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
81 pub(crate) waiters: Vec<OutputTx>,
83 pub(crate) start_time: Instant,
85 pub(crate) cache_manager: CacheManagerRef,
86 pub(crate) manifest_ctx: ManifestContextRef,
87 pub(crate) listener: WorkerListener,
88 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89 pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93 pub(crate) fn region_id(&self) -> RegionId {
94 self.current_version.metadata.region_id
95 }
96}
97
98pub(crate) struct CompactionScheduler {
100 scheduler: SchedulerRef,
101 region_status: HashMap<RegionId, CompactionStatus>,
103 request_sender: Sender<WorkerRequestWithTime>,
105 cache_manager: CacheManagerRef,
106 engine_config: Arc<MitoConfig>,
107 listener: WorkerListener,
108 plugins: Plugins,
110}
111
112impl CompactionScheduler {
113 pub(crate) fn new(
114 scheduler: SchedulerRef,
115 request_sender: Sender<WorkerRequestWithTime>,
116 cache_manager: CacheManagerRef,
117 engine_config: Arc<MitoConfig>,
118 listener: WorkerListener,
119 plugins: Plugins,
120 ) -> Self {
121 Self {
122 scheduler,
123 region_status: HashMap::new(),
124 request_sender,
125 cache_manager,
126 engine_config,
127 listener,
128 plugins,
129 }
130 }
131
132 #[allow(clippy::too_many_arguments)]
134 pub(crate) async fn schedule_compaction(
135 &mut self,
136 region_id: RegionId,
137 compact_options: compact_request::Options,
138 version_control: &VersionControlRef,
139 access_layer: &AccessLayerRef,
140 waiter: OptionOutputTx,
141 manifest_ctx: &ManifestContextRef,
142 schema_metadata_manager: SchemaMetadataManagerRef,
143 max_parallelism: usize,
144 ) -> Result<()> {
145 let current_state = manifest_ctx.current_state();
147 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
148 info!(
149 "Skipping compaction for region {} in staging mode, options: {:?}",
150 region_id, compact_options
151 );
152 waiter.send(Ok(0));
153 return Ok(());
154 }
155
156 if let Some(status) = self.region_status.get_mut(®ion_id) {
157 match compact_options {
158 Options::Regular(_) => {
159 status.merge_waiter(waiter);
161 }
162 options @ Options::StrictWindow(_) => {
163 status.set_pending_request(PendingCompaction {
165 options,
166 waiter,
167 max_parallelism,
168 });
169 info!(
170 "Region {} is compacting, manually compaction will be re-scheduled.",
171 region_id
172 );
173 }
174 }
175 return Ok(());
176 }
177
178 let mut status =
180 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
181 let request = status.new_compaction_request(
182 self.request_sender.clone(),
183 waiter,
184 self.engine_config.clone(),
185 self.cache_manager.clone(),
186 manifest_ctx,
187 self.listener.clone(),
188 schema_metadata_manager,
189 max_parallelism,
190 );
191 self.region_status.insert(region_id, status);
192 let result = self
193 .schedule_compaction_request(request, compact_options)
194 .await;
195
196 self.listener.on_compaction_scheduled(region_id);
197 result
198 }
199
200 pub(crate) async fn on_compaction_finished(
202 &mut self,
203 region_id: RegionId,
204 manifest_ctx: &ManifestContextRef,
205 schema_metadata_manager: SchemaMetadataManagerRef,
206 ) {
207 let Some(status) = self.region_status.get_mut(®ion_id) else {
208 return;
209 };
210
211 if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
212 let PendingCompaction {
213 options,
214 waiter,
215 max_parallelism,
216 } = pending_request;
217
218 let request = status.new_compaction_request(
219 self.request_sender.clone(),
220 waiter,
221 self.engine_config.clone(),
222 self.cache_manager.clone(),
223 manifest_ctx,
224 self.listener.clone(),
225 schema_metadata_manager,
226 max_parallelism,
227 );
228
229 if let Err(e) = self.schedule_compaction_request(request, options).await {
230 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
231 } else {
232 debug!(
233 "Successfully scheduled manual compaction for region id: {}",
234 region_id
235 );
236 }
237 return;
238 }
239
240 let request = status.new_compaction_request(
242 self.request_sender.clone(),
243 OptionOutputTx::none(),
244 self.engine_config.clone(),
245 self.cache_manager.clone(),
246 manifest_ctx,
247 self.listener.clone(),
248 schema_metadata_manager,
249 MAX_PARALLEL_COMPACTION,
250 );
251 if let Err(e) = self
253 .schedule_compaction_request(
254 request,
255 compact_request::Options::Regular(Default::default()),
256 )
257 .await
258 {
259 error!(e; "Failed to schedule next compaction for region {}", region_id);
260 }
261 }
262
263 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
265 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
266 let Some(status) = self.region_status.remove(®ion_id) else {
268 return;
269 };
270
271 status.on_failure(err);
273 }
274
275 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
277 self.remove_region_on_failure(
278 region_id,
279 Arc::new(RegionDroppedSnafu { region_id }.build()),
280 );
281 }
282
283 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
285 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
286 }
287
288 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
290 self.remove_region_on_failure(
291 region_id,
292 Arc::new(RegionTruncatedSnafu { region_id }.build()),
293 );
294 }
295
296 async fn schedule_compaction_request(
300 &mut self,
301 request: CompactionRequest,
302 options: compact_request::Options,
303 ) -> Result<()> {
304 let picker = new_picker(
305 &options,
306 &request.current_version.options.compaction,
307 request.current_version.options.append_mode,
308 );
309 let region_id = request.region_id();
310 let CompactionRequest {
311 engine_config,
312 current_version,
313 access_layer,
314 request_sender,
315 waiters,
316 start_time,
317 cache_manager,
318 manifest_ctx,
319 listener,
320 schema_metadata_manager,
321 max_parallelism,
322 } = request;
323
324 let ttl = find_ttl(
325 region_id.table_id(),
326 current_version.options.ttl,
327 &schema_metadata_manager,
328 )
329 .await
330 .unwrap_or_else(|e| {
331 warn!(e; "Failed to get ttl for region: {}", region_id);
332 TimeToLive::default()
333 });
334
335 debug!(
336 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
337 picker, region_id, ttl
338 );
339
340 let compaction_region = CompactionRegion {
341 region_id,
342 current_version: current_version.clone(),
343 region_options: current_version.options.clone(),
344 engine_config: engine_config.clone(),
345 region_metadata: current_version.metadata.clone(),
346 cache_manager: cache_manager.clone(),
347 access_layer: access_layer.clone(),
348 manifest_ctx: manifest_ctx.clone(),
349 file_purger: None,
350 ttl: Some(ttl),
351 max_parallelism,
352 };
353
354 let picker_output = {
355 let _pick_timer = COMPACTION_STAGE_ELAPSED
356 .with_label_values(&["pick"])
357 .start_timer();
358 picker.pick(&compaction_region)
359 };
360
361 let picker_output = if let Some(picker_output) = picker_output {
362 picker_output
363 } else {
364 for waiter in waiters {
366 waiter.send(Ok(0));
367 }
368 self.region_status.remove(®ion_id);
369 return Ok(());
370 };
371
372 let waiters = if current_version.options.compaction.remote_compaction() {
375 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
376 let remote_compaction_job = CompactionJob {
377 compaction_region: compaction_region.clone(),
378 picker_output: picker_output.clone(),
379 start_time,
380 waiters,
381 ttl,
382 };
383
384 let result = remote_job_scheduler
385 .schedule(
386 RemoteJob::CompactionJob(remote_compaction_job),
387 Box::new(DefaultNotifier {
388 request_sender: request_sender.clone(),
389 }),
390 )
391 .await;
392
393 match result {
394 Ok(job_id) => {
395 info!(
396 "Scheduled remote compaction job {} for region {}",
397 job_id, region_id
398 );
399 INFLIGHT_COMPACTION_COUNT.inc();
400 return Ok(());
401 }
402 Err(e) => {
403 if !current_version.options.compaction.fallback_to_local() {
404 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
405 return RemoteCompactionSnafu {
406 region_id,
407 job_id: None,
408 reason: e.reason,
409 }
410 .fail();
411 }
412
413 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
414
415 e.waiters
417 }
418 }
419 } else {
420 debug!(
421 "Remote compaction is not enabled, fallback to local compaction for region {}",
422 region_id
423 );
424 waiters
425 }
426 } else {
427 waiters
428 };
429
430 let mut local_compaction_task = Box::new(CompactionTaskImpl {
432 request_sender,
433 waiters,
434 start_time,
435 listener,
436 picker_output,
437 compaction_region,
438 compactor: Arc::new(DefaultCompactor {}),
439 });
440
441 self.scheduler
443 .schedule(Box::pin(async move {
444 INFLIGHT_COMPACTION_COUNT.inc();
445 local_compaction_task.run().await;
446 INFLIGHT_COMPACTION_COUNT.dec();
447 }))
448 .map_err(|e| {
449 error!(e; "Failed to submit compaction request for region {}", region_id);
450 self.region_status.remove(®ion_id);
452 e
453 })
454 }
455
456 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
457 let Some(status) = self.region_status.remove(®ion_id) else {
459 return;
460 };
461
462 status.on_failure(err);
464 }
465}
466
467impl Drop for CompactionScheduler {
468 fn drop(&mut self) {
469 for (region_id, status) in self.region_status.drain() {
470 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
472 }
473 }
474}
475
476async fn find_ttl(
478 table_id: TableId,
479 table_ttl: Option<TimeToLive>,
480 schema_metadata_manager: &SchemaMetadataManagerRef,
481) -> Result<TimeToLive> {
482 if let Some(table_ttl) = table_ttl {
484 return Ok(table_ttl);
485 }
486
487 let ttl = tokio::time::timeout(
488 crate::config::FETCH_OPTION_TIMEOUT,
489 schema_metadata_manager.get_schema_options_by_table_id(table_id),
490 )
491 .await
492 .context(TimeoutSnafu)?
493 .context(GetSchemaMetadataSnafu)?
494 .and_then(|options| options.ttl)
495 .unwrap_or_default()
496 .into();
497
498 Ok(ttl)
499}
500
501struct CompactionStatus {
503 region_id: RegionId,
505 version_control: VersionControlRef,
507 access_layer: AccessLayerRef,
509 waiters: Vec<OutputTx>,
511 pending_request: Option<PendingCompaction>,
513}
514
515impl CompactionStatus {
516 fn new(
518 region_id: RegionId,
519 version_control: VersionControlRef,
520 access_layer: AccessLayerRef,
521 ) -> CompactionStatus {
522 CompactionStatus {
523 region_id,
524 version_control,
525 access_layer,
526 waiters: Vec::new(),
527 pending_request: None,
528 }
529 }
530
531 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
533 if let Some(waiter) = waiter.take_inner() {
534 self.waiters.push(waiter);
535 }
536 }
537
538 fn set_pending_request(&mut self, pending: PendingCompaction) {
540 if let Some(prev) = self.pending_request.replace(pending) {
541 debug!(
542 "Replace pending compaction options with new request {:?} for region: {}",
543 prev.options, self.region_id
544 );
545 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
546 }
547 }
548
549 fn on_failure(mut self, err: Arc<Error>) {
550 for waiter in self.waiters.drain(..) {
551 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
552 region_id: self.region_id,
553 }));
554 }
555
556 if let Some(pending_compaction) = self.pending_request {
557 pending_compaction
558 .waiter
559 .send(Err(err.clone()).context(CompactRegionSnafu {
560 region_id: self.region_id,
561 }));
562 }
563 }
564
565 #[allow(clippy::too_many_arguments)]
569 fn new_compaction_request(
570 &mut self,
571 request_sender: Sender<WorkerRequestWithTime>,
572 mut waiter: OptionOutputTx,
573 engine_config: Arc<MitoConfig>,
574 cache_manager: CacheManagerRef,
575 manifest_ctx: &ManifestContextRef,
576 listener: WorkerListener,
577 schema_metadata_manager: SchemaMetadataManagerRef,
578 max_parallelism: usize,
579 ) -> CompactionRequest {
580 let current_version = CompactionVersion::from(self.version_control.current().version);
581 let start_time = Instant::now();
582 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
583 waiters.extend(std::mem::take(&mut self.waiters));
584
585 if let Some(waiter) = waiter.take_inner() {
586 waiters.push(waiter);
587 }
588
589 CompactionRequest {
590 engine_config,
591 current_version,
592 access_layer: self.access_layer.clone(),
593 request_sender: request_sender.clone(),
594 waiters,
595 start_time,
596 cache_manager,
597 manifest_ctx: manifest_ctx.clone(),
598 listener,
599 schema_metadata_manager,
600 max_parallelism,
601 }
602 }
603}
604
605#[derive(Debug, Clone)]
606pub struct CompactionOutput {
607 pub output_level: Level,
609 pub inputs: Vec<FileHandle>,
611 pub filter_deleted: bool,
613 pub output_time_range: Option<TimestampRange>,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct SerializedCompactionOutput {
620 output_level: Level,
621 inputs: Vec<FileMeta>,
622 filter_deleted: bool,
623 output_time_range: Option<TimestampRange>,
624}
625
626struct CompactionSstReaderBuilder<'a> {
628 metadata: RegionMetadataRef,
629 sst_layer: AccessLayerRef,
630 cache: CacheManagerRef,
631 inputs: &'a [FileHandle],
632 append_mode: bool,
633 filter_deleted: bool,
634 time_range: Option<TimestampRange>,
635 merge_mode: MergeMode,
636}
637
638impl CompactionSstReaderBuilder<'_> {
639 async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
641 let scan_input = self.build_scan_input(false)?.with_compaction(true);
642
643 SeqScan::new(scan_input).build_reader_for_compaction().await
644 }
645
646 async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
648 let scan_input = self.build_scan_input(true)?.with_compaction(true);
649
650 SeqScan::new(scan_input)
651 .build_flat_reader_for_compaction()
652 .await
653 }
654
655 fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
656 let mut scan_input = ScanInput::new(
657 self.sst_layer,
658 ProjectionMapper::all(&self.metadata, flat_format)?,
659 )
660 .with_files(self.inputs.to_vec())
661 .with_append_mode(self.append_mode)
662 .with_cache(CacheStrategy::Compaction(self.cache))
664 .with_filter_deleted(self.filter_deleted)
665 .with_ignore_file_not_found(true)
667 .with_merge_mode(self.merge_mode)
668 .with_flat_format(flat_format);
669
670 if let Some(time_range) = self.time_range {
673 scan_input =
674 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
675 }
676
677 Ok(scan_input)
678 }
679}
680
681fn time_range_to_predicate(
683 range: TimestampRange,
684 metadata: &RegionMetadataRef,
685) -> Result<PredicateGroup> {
686 let ts_col = metadata.time_index_column();
687
688 let ts_col_unit = ts_col
690 .column_schema
691 .data_type
692 .as_timestamp()
693 .unwrap()
694 .unit();
695
696 let exprs = match (range.start(), range.end()) {
697 (Some(start), Some(end)) => {
698 vec![
699 datafusion_expr::col(ts_col.column_schema.name.clone())
700 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
701 datafusion_expr::col(ts_col.column_schema.name.clone())
702 .lt(ts_to_lit(*end, ts_col_unit)?),
703 ]
704 }
705 (Some(start), None) => {
706 vec![
707 datafusion_expr::col(ts_col.column_schema.name.clone())
708 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
709 ]
710 }
711
712 (None, Some(end)) => {
713 vec![
714 datafusion_expr::col(ts_col.column_schema.name.clone())
715 .lt(ts_to_lit(*end, ts_col_unit)?),
716 ]
717 }
718 (None, None) => {
719 return Ok(PredicateGroup::default());
720 }
721 };
722
723 let predicate = PredicateGroup::new(metadata, &exprs)?;
724 Ok(predicate)
725}
726
727fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
728 let ts = ts
729 .convert_to(ts_col_unit)
730 .context(TimeRangePredicateOverflowSnafu {
731 timestamp: ts,
732 unit: ts_col_unit,
733 })?;
734 let val = ts.value();
735 let scalar_value = match ts_col_unit {
736 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
737 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
738 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
739 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
740 };
741 Ok(datafusion_expr::lit(scalar_value))
742}
743
744fn get_expired_ssts(
746 levels: &[LevelMeta],
747 ttl: Option<TimeToLive>,
748 now: Timestamp,
749) -> Vec<FileHandle> {
750 let Some(ttl) = ttl else {
751 return vec![];
752 };
753
754 levels
755 .iter()
756 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
757 .collect()
758}
759
760struct PendingCompaction {
763 pub(crate) options: compact_request::Options,
765 pub(crate) waiter: OptionOutputTx,
767 pub(crate) max_parallelism: usize,
769}
770
771#[cfg(test)]
772mod tests {
773 use api::v1::region::StrictWindow;
774 use common_datasource::compression::CompressionType;
775 use tokio::sync::oneshot;
776
777 use super::*;
778 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
779 use crate::region::ManifestContext;
780 use crate::sst::FormatType;
781 use crate::test_util::mock_schema_metadata_manager;
782 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
783 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
784
785 #[tokio::test]
786 async fn test_schedule_empty() {
787 let env = SchedulerEnv::new().await;
788 let (tx, _rx) = mpsc::channel(4);
789 let mut scheduler = env.mock_compaction_scheduler(tx);
790 let mut builder = VersionControlBuilder::new();
791 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
792 schema_metadata_manager
793 .register_region_table_info(
794 builder.region_id().table_id(),
795 "test_table",
796 "test_catalog",
797 "test_schema",
798 None,
799 kv_backend,
800 )
801 .await;
802 let version_control = Arc::new(builder.build());
804 let (output_tx, output_rx) = oneshot::channel();
805 let waiter = OptionOutputTx::from(output_tx);
806 let manifest_ctx = env
807 .mock_manifest_context(version_control.current().version.metadata.clone())
808 .await;
809 scheduler
810 .schedule_compaction(
811 builder.region_id(),
812 compact_request::Options::Regular(Default::default()),
813 &version_control,
814 &env.access_layer,
815 waiter,
816 &manifest_ctx,
817 schema_metadata_manager.clone(),
818 1,
819 )
820 .await
821 .unwrap();
822 let output = output_rx.await.unwrap().unwrap();
823 assert_eq!(output, 0);
824 assert!(scheduler.region_status.is_empty());
825
826 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
828 let (output_tx, output_rx) = oneshot::channel();
829 let waiter = OptionOutputTx::from(output_tx);
830 scheduler
831 .schedule_compaction(
832 builder.region_id(),
833 compact_request::Options::Regular(Default::default()),
834 &version_control,
835 &env.access_layer,
836 waiter,
837 &manifest_ctx,
838 schema_metadata_manager,
839 1,
840 )
841 .await
842 .unwrap();
843 let output = output_rx.await.unwrap().unwrap();
844 assert_eq!(output, 0);
845 assert!(scheduler.region_status.is_empty());
846 }
847
848 #[tokio::test]
849 async fn test_schedule_on_finished() {
850 common_telemetry::init_default_ut_logging();
851 let job_scheduler = Arc::new(VecScheduler::default());
852 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
853 let (tx, _rx) = mpsc::channel(4);
854 let mut scheduler = env.mock_compaction_scheduler(tx);
855 let mut builder = VersionControlBuilder::new();
856 let purger = builder.file_purger();
857 let region_id = builder.region_id();
858
859 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
860 schema_metadata_manager
861 .register_region_table_info(
862 builder.region_id().table_id(),
863 "test_table",
864 "test_catalog",
865 "test_schema",
866 None,
867 kv_backend,
868 )
869 .await;
870
871 let end = 1000 * 1000;
873 let version_control = Arc::new(
874 builder
875 .push_l0_file(0, end)
876 .push_l0_file(10, end)
877 .push_l0_file(50, end)
878 .push_l0_file(80, end)
879 .push_l0_file(90, end)
880 .build(),
881 );
882 let manifest_ctx = env
883 .mock_manifest_context(version_control.current().version.metadata.clone())
884 .await;
885 scheduler
886 .schedule_compaction(
887 region_id,
888 compact_request::Options::Regular(Default::default()),
889 &version_control,
890 &env.access_layer,
891 OptionOutputTx::none(),
892 &manifest_ctx,
893 schema_metadata_manager.clone(),
894 1,
895 )
896 .await
897 .unwrap();
898 assert_eq!(1, scheduler.region_status.len());
900 assert_eq!(1, job_scheduler.num_jobs());
901 let data = version_control.current();
902 let file_metas: Vec<_> = data.version.ssts.levels()[0]
903 .files
904 .values()
905 .map(|file| file.meta_ref().clone())
906 .collect();
907
908 apply_edit(
910 &version_control,
911 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
912 &file_metas,
913 purger.clone(),
914 );
915 let (tx, _rx) = oneshot::channel();
917 scheduler
918 .schedule_compaction(
919 region_id,
920 compact_request::Options::Regular(Default::default()),
921 &version_control,
922 &env.access_layer,
923 OptionOutputTx::new(Some(OutputTx::new(tx))),
924 &manifest_ctx,
925 schema_metadata_manager.clone(),
926 1,
927 )
928 .await
929 .unwrap();
930 assert_eq!(1, scheduler.region_status.len());
931 assert_eq!(1, job_scheduler.num_jobs());
932 assert!(
933 !scheduler
934 .region_status
935 .get(&builder.region_id())
936 .unwrap()
937 .waiters
938 .is_empty()
939 );
940
941 scheduler
943 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
944 .await;
945 assert_eq!(1, scheduler.region_status.len());
946 assert_eq!(2, job_scheduler.num_jobs());
947
948 apply_edit(
950 &version_control,
951 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
952 &[],
953 purger.clone(),
954 );
955 let (tx, _rx) = oneshot::channel();
956 scheduler
958 .schedule_compaction(
959 region_id,
960 compact_request::Options::Regular(Default::default()),
961 &version_control,
962 &env.access_layer,
963 OptionOutputTx::new(Some(OutputTx::new(tx))),
964 &manifest_ctx,
965 schema_metadata_manager,
966 1,
967 )
968 .await
969 .unwrap();
970 assert_eq!(2, job_scheduler.num_jobs());
971 assert!(
972 !scheduler
973 .region_status
974 .get(&builder.region_id())
975 .unwrap()
976 .waiters
977 .is_empty()
978 );
979 }
980
981 #[tokio::test]
982 async fn test_manual_compaction_when_compaction_in_progress() {
983 common_telemetry::init_default_ut_logging();
984 let job_scheduler = Arc::new(VecScheduler::default());
985 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
986 let (tx, _rx) = mpsc::channel(4);
987 let mut scheduler = env.mock_compaction_scheduler(tx);
988 let mut builder = VersionControlBuilder::new();
989 let purger = builder.file_purger();
990 let region_id = builder.region_id();
991
992 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
993 schema_metadata_manager
994 .register_region_table_info(
995 builder.region_id().table_id(),
996 "test_table",
997 "test_catalog",
998 "test_schema",
999 None,
1000 kv_backend,
1001 )
1002 .await;
1003
1004 let end = 1000 * 1000;
1006 let version_control = Arc::new(
1007 builder
1008 .push_l0_file(0, end)
1009 .push_l0_file(10, end)
1010 .push_l0_file(50, end)
1011 .push_l0_file(80, end)
1012 .push_l0_file(90, end)
1013 .build(),
1014 );
1015 let manifest_ctx = env
1016 .mock_manifest_context(version_control.current().version.metadata.clone())
1017 .await;
1018
1019 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1020 .files
1021 .values()
1022 .map(|file| file.meta_ref().clone())
1023 .collect();
1024
1025 apply_edit(
1027 &version_control,
1028 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1029 &file_metas,
1030 purger.clone(),
1031 );
1032
1033 scheduler
1034 .schedule_compaction(
1035 region_id,
1036 compact_request::Options::Regular(Default::default()),
1037 &version_control,
1038 &env.access_layer,
1039 OptionOutputTx::none(),
1040 &manifest_ctx,
1041 schema_metadata_manager.clone(),
1042 1,
1043 )
1044 .await
1045 .unwrap();
1046 assert_eq!(1, scheduler.region_status.len());
1048 assert_eq!(1, job_scheduler.num_jobs());
1049 assert!(
1050 scheduler
1051 .region_status
1052 .get(®ion_id)
1053 .unwrap()
1054 .pending_request
1055 .is_none()
1056 );
1057
1058 let (tx, _rx) = oneshot::channel();
1060 scheduler
1061 .schedule_compaction(
1062 region_id,
1063 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1064 &version_control,
1065 &env.access_layer,
1066 OptionOutputTx::new(Some(OutputTx::new(tx))),
1067 &manifest_ctx,
1068 schema_metadata_manager.clone(),
1069 1,
1070 )
1071 .await
1072 .unwrap();
1073 assert_eq!(1, scheduler.region_status.len());
1074 assert_eq!(1, job_scheduler.num_jobs());
1076 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1077 assert!(status.pending_request.is_some());
1078
1079 scheduler
1081 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1082 .await;
1083 assert_eq!(1, scheduler.region_status.len());
1084 assert_eq!(2, job_scheduler.num_jobs());
1085
1086 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1087 assert!(status.pending_request.is_none());
1088 }
1089
1090 #[tokio::test]
1091 async fn test_compaction_bypass_in_staging_mode() {
1092 let env = SchedulerEnv::new().await;
1093 let (tx, _rx) = mpsc::channel(4);
1094 let mut scheduler = env.mock_compaction_scheduler(tx);
1095
1096 let builder = VersionControlBuilder::new();
1098 let version_control = Arc::new(builder.build());
1099 let region_id = version_control.current().version.metadata.region_id;
1100
1101 let staging_manifest_ctx = {
1103 let manager = RegionManifestManager::new(
1104 version_control.current().version.metadata.clone(),
1105 0,
1106 RegionManifestOptions {
1107 manifest_dir: "".to_string(),
1108 object_store: env.access_layer.object_store().clone(),
1109 compress_type: CompressionType::Uncompressed,
1110 checkpoint_distance: 10,
1111 remove_file_options: Default::default(),
1112 },
1113 Default::default(),
1114 Default::default(),
1115 FormatType::PrimaryKey,
1116 )
1117 .await
1118 .unwrap();
1119 Arc::new(ManifestContext::new(
1120 manager,
1121 RegionRoleState::Leader(RegionLeaderState::Staging),
1122 ))
1123 };
1124
1125 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1126
1127 let (tx, rx) = oneshot::channel();
1129 scheduler
1130 .schedule_compaction(
1131 region_id,
1132 compact_request::Options::Regular(Default::default()),
1133 &version_control,
1134 &env.access_layer,
1135 OptionOutputTx::new(Some(OutputTx::new(tx))),
1136 &staging_manifest_ctx,
1137 schema_metadata_manager,
1138 1,
1139 )
1140 .await
1141 .unwrap();
1142
1143 let result = rx.await.unwrap();
1144 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1146 }
1147}