1mod buckets;
16pub mod compactor;
17pub mod picker;
18pub mod run;
19mod task;
20#[cfg(test)]
21mod test_util;
22mod twcs;
23mod window;
24
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28
29use api::v1::region::compact_request;
30use api::v1::region::compact_request::Options;
31use common_base::Plugins;
32use common_meta::key::SchemaMetadataManagerRef;
33use common_telemetry::{debug, error, info, warn};
34use common_time::range::TimestampRange;
35use common_time::timestamp::TimeUnit;
36use common_time::{TimeToLive, Timestamp};
37use datafusion_common::ScalarValue;
38use datafusion_expr::Expr;
39use serde::{Deserialize, Serialize};
40use snafu::{OptionExt, ResultExt};
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{RegionId, TableId};
43use task::MAX_PARALLEL_COMPACTION;
44use tokio::sync::mpsc::{self, Sender};
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::{CacheManagerRef, CacheStrategy};
48use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
49use crate::compaction::picker::{new_picker, CompactionTask};
50use crate::compaction::task::CompactionTaskImpl;
51use crate::config::MitoConfig;
52use crate::error::{
53 CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
54 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
55 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
56};
57use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
58use crate::read::projection::ProjectionMapper;
59use crate::read::scan_region::{PredicateGroup, ScanInput};
60use crate::read::seq_scan::SeqScan;
61use crate::read::BoxedBatchReader;
62use crate::region::options::MergeMode;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
65use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
66use crate::schedule::remote_job_scheduler::{
67 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
68};
69use crate::schedule::scheduler::SchedulerRef;
70use crate::sst::file::{FileHandle, FileMeta, Level};
71use crate::sst::version::LevelMeta;
72use crate::worker::WorkerListener;
73
74pub struct CompactionRequest {
76 pub(crate) engine_config: Arc<MitoConfig>,
77 pub(crate) current_version: CompactionVersion,
78 pub(crate) access_layer: AccessLayerRef,
79 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
81 pub(crate) waiters: Vec<OutputTx>,
83 pub(crate) start_time: Instant,
85 pub(crate) cache_manager: CacheManagerRef,
86 pub(crate) manifest_ctx: ManifestContextRef,
87 pub(crate) listener: WorkerListener,
88 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
89 pub(crate) max_parallelism: usize,
90}
91
92impl CompactionRequest {
93 pub(crate) fn region_id(&self) -> RegionId {
94 self.current_version.metadata.region_id
95 }
96}
97
98pub(crate) struct CompactionScheduler {
100 scheduler: SchedulerRef,
101 region_status: HashMap<RegionId, CompactionStatus>,
103 request_sender: Sender<WorkerRequestWithTime>,
105 cache_manager: CacheManagerRef,
106 engine_config: Arc<MitoConfig>,
107 listener: WorkerListener,
108 plugins: Plugins,
110}
111
112impl CompactionScheduler {
113 pub(crate) fn new(
114 scheduler: SchedulerRef,
115 request_sender: Sender<WorkerRequestWithTime>,
116 cache_manager: CacheManagerRef,
117 engine_config: Arc<MitoConfig>,
118 listener: WorkerListener,
119 plugins: Plugins,
120 ) -> Self {
121 Self {
122 scheduler,
123 region_status: HashMap::new(),
124 request_sender,
125 cache_manager,
126 engine_config,
127 listener,
128 plugins,
129 }
130 }
131
132 #[allow(clippy::too_many_arguments)]
134 pub(crate) async fn schedule_compaction(
135 &mut self,
136 region_id: RegionId,
137 compact_options: compact_request::Options,
138 version_control: &VersionControlRef,
139 access_layer: &AccessLayerRef,
140 waiter: OptionOutputTx,
141 manifest_ctx: &ManifestContextRef,
142 schema_metadata_manager: SchemaMetadataManagerRef,
143 max_parallelism: usize,
144 ) -> Result<()> {
145 let current_state = manifest_ctx.current_state();
147 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
148 info!(
149 "Skipping compaction for region {} in staging mode, options: {:?}",
150 region_id, compact_options
151 );
152 waiter.send(Ok(0));
153 return Ok(());
154 }
155
156 if let Some(status) = self.region_status.get_mut(®ion_id) {
157 match compact_options {
158 Options::Regular(_) => {
159 status.merge_waiter(waiter);
161 }
162 options @ Options::StrictWindow(_) => {
163 status.set_pending_request(PendingCompaction {
165 options,
166 waiter,
167 max_parallelism,
168 });
169 info!(
170 "Region {} is compacting, manually compaction will be re-scheduled.",
171 region_id
172 );
173 }
174 }
175 return Ok(());
176 }
177
178 let mut status =
180 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
181 let request = status.new_compaction_request(
182 self.request_sender.clone(),
183 waiter,
184 self.engine_config.clone(),
185 self.cache_manager.clone(),
186 manifest_ctx,
187 self.listener.clone(),
188 schema_metadata_manager,
189 max_parallelism,
190 );
191 self.region_status.insert(region_id, status);
192 let result = self
193 .schedule_compaction_request(request, compact_options)
194 .await;
195
196 self.listener.on_compaction_scheduled(region_id);
197 result
198 }
199
200 pub(crate) async fn on_compaction_finished(
202 &mut self,
203 region_id: RegionId,
204 manifest_ctx: &ManifestContextRef,
205 schema_metadata_manager: SchemaMetadataManagerRef,
206 ) {
207 let Some(status) = self.region_status.get_mut(®ion_id) else {
208 return;
209 };
210
211 if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
212 let PendingCompaction {
213 options,
214 waiter,
215 max_parallelism,
216 } = pending_request;
217
218 let request = status.new_compaction_request(
219 self.request_sender.clone(),
220 waiter,
221 self.engine_config.clone(),
222 self.cache_manager.clone(),
223 manifest_ctx,
224 self.listener.clone(),
225 schema_metadata_manager,
226 max_parallelism,
227 );
228
229 if let Err(e) = self.schedule_compaction_request(request, options).await {
230 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
231 } else {
232 debug!(
233 "Successfully scheduled manual compaction for region id: {}",
234 region_id
235 );
236 }
237 return;
238 }
239
240 let request = status.new_compaction_request(
242 self.request_sender.clone(),
243 OptionOutputTx::none(),
244 self.engine_config.clone(),
245 self.cache_manager.clone(),
246 manifest_ctx,
247 self.listener.clone(),
248 schema_metadata_manager,
249 MAX_PARALLEL_COMPACTION,
250 );
251 if let Err(e) = self
253 .schedule_compaction_request(
254 request,
255 compact_request::Options::Regular(Default::default()),
256 )
257 .await
258 {
259 error!(e; "Failed to schedule next compaction for region {}", region_id);
260 }
261 }
262
263 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
265 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
266 let Some(status) = self.region_status.remove(®ion_id) else {
268 return;
269 };
270
271 status.on_failure(err);
273 }
274
275 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
277 self.remove_region_on_failure(
278 region_id,
279 Arc::new(RegionDroppedSnafu { region_id }.build()),
280 );
281 }
282
283 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
285 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
286 }
287
288 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
290 self.remove_region_on_failure(
291 region_id,
292 Arc::new(RegionTruncatedSnafu { region_id }.build()),
293 );
294 }
295
296 async fn schedule_compaction_request(
300 &mut self,
301 request: CompactionRequest,
302 options: compact_request::Options,
303 ) -> Result<()> {
304 let picker = new_picker(
305 &options,
306 &request.current_version.options.compaction,
307 request.current_version.options.append_mode,
308 );
309 let region_id = request.region_id();
310 let CompactionRequest {
311 engine_config,
312 current_version,
313 access_layer,
314 request_sender,
315 waiters,
316 start_time,
317 cache_manager,
318 manifest_ctx,
319 listener,
320 schema_metadata_manager,
321 max_parallelism,
322 } = request;
323
324 let ttl = find_ttl(
325 region_id.table_id(),
326 current_version.options.ttl,
327 &schema_metadata_manager,
328 )
329 .await
330 .unwrap_or_else(|e| {
331 warn!(e; "Failed to get ttl for region: {}", region_id);
332 TimeToLive::default()
333 });
334
335 debug!(
336 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
337 picker, region_id, ttl
338 );
339
340 let compaction_region = CompactionRegion {
341 region_id,
342 current_version: current_version.clone(),
343 region_options: current_version.options.clone(),
344 engine_config: engine_config.clone(),
345 region_metadata: current_version.metadata.clone(),
346 cache_manager: cache_manager.clone(),
347 access_layer: access_layer.clone(),
348 manifest_ctx: manifest_ctx.clone(),
349 file_purger: None,
350 ttl: Some(ttl),
351 max_parallelism,
352 };
353
354 let picker_output = {
355 let _pick_timer = COMPACTION_STAGE_ELAPSED
356 .with_label_values(&["pick"])
357 .start_timer();
358 picker.pick(&compaction_region)
359 };
360
361 let picker_output = if let Some(picker_output) = picker_output {
362 picker_output
363 } else {
364 for waiter in waiters {
366 waiter.send(Ok(0));
367 }
368 self.region_status.remove(®ion_id);
369 return Ok(());
370 };
371
372 let waiters = if current_version.options.compaction.remote_compaction() {
375 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
376 let remote_compaction_job = CompactionJob {
377 compaction_region: compaction_region.clone(),
378 picker_output: picker_output.clone(),
379 start_time,
380 waiters,
381 ttl,
382 };
383
384 let result = remote_job_scheduler
385 .schedule(
386 RemoteJob::CompactionJob(remote_compaction_job),
387 Box::new(DefaultNotifier {
388 request_sender: request_sender.clone(),
389 }),
390 )
391 .await;
392
393 match result {
394 Ok(job_id) => {
395 info!(
396 "Scheduled remote compaction job {} for region {}",
397 job_id, region_id
398 );
399 INFLIGHT_COMPACTION_COUNT.inc();
400 return Ok(());
401 }
402 Err(e) => {
403 if !current_version.options.compaction.fallback_to_local() {
404 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
405 return RemoteCompactionSnafu {
406 region_id,
407 job_id: None,
408 reason: e.reason,
409 }
410 .fail();
411 }
412
413 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
414
415 e.waiters
417 }
418 }
419 } else {
420 debug!(
421 "Remote compaction is not enabled, fallback to local compaction for region {}",
422 region_id
423 );
424 waiters
425 }
426 } else {
427 waiters
428 };
429
430 let mut local_compaction_task = Box::new(CompactionTaskImpl {
432 request_sender,
433 waiters,
434 start_time,
435 listener,
436 picker_output,
437 compaction_region,
438 compactor: Arc::new(DefaultCompactor {}),
439 });
440
441 self.scheduler
443 .schedule(Box::pin(async move {
444 INFLIGHT_COMPACTION_COUNT.inc();
445 local_compaction_task.run().await;
446 INFLIGHT_COMPACTION_COUNT.dec();
447 }))
448 .map_err(|e| {
449 error!(e; "Failed to submit compaction request for region {}", region_id);
450 self.region_status.remove(®ion_id);
452 e
453 })
454 }
455
456 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
457 let Some(status) = self.region_status.remove(®ion_id) else {
459 return;
460 };
461
462 status.on_failure(err);
464 }
465}
466
467impl Drop for CompactionScheduler {
468 fn drop(&mut self) {
469 for (region_id, status) in self.region_status.drain() {
470 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
472 }
473 }
474}
475
476async fn find_ttl(
478 table_id: TableId,
479 table_ttl: Option<TimeToLive>,
480 schema_metadata_manager: &SchemaMetadataManagerRef,
481) -> Result<TimeToLive> {
482 if let Some(table_ttl) = table_ttl {
484 return Ok(table_ttl);
485 }
486
487 let ttl = tokio::time::timeout(
488 crate::config::FETCH_OPTION_TIMEOUT,
489 schema_metadata_manager.get_schema_options_by_table_id(table_id),
490 )
491 .await
492 .context(TimeoutSnafu)?
493 .context(GetSchemaMetadataSnafu)?
494 .and_then(|options| options.ttl)
495 .unwrap_or_default()
496 .into();
497
498 Ok(ttl)
499}
500
501struct CompactionStatus {
503 region_id: RegionId,
505 version_control: VersionControlRef,
507 access_layer: AccessLayerRef,
509 waiters: Vec<OutputTx>,
511 pending_request: Option<PendingCompaction>,
513}
514
515impl CompactionStatus {
516 fn new(
518 region_id: RegionId,
519 version_control: VersionControlRef,
520 access_layer: AccessLayerRef,
521 ) -> CompactionStatus {
522 CompactionStatus {
523 region_id,
524 version_control,
525 access_layer,
526 waiters: Vec::new(),
527 pending_request: None,
528 }
529 }
530
531 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
533 if let Some(waiter) = waiter.take_inner() {
534 self.waiters.push(waiter);
535 }
536 }
537
538 fn set_pending_request(&mut self, pending: PendingCompaction) {
540 if let Some(prev) = self.pending_request.replace(pending) {
541 debug!(
542 "Replace pending compaction options with new request {:?} for region: {}",
543 prev.options, self.region_id
544 );
545 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
546 }
547 }
548
549 fn on_failure(mut self, err: Arc<Error>) {
550 for waiter in self.waiters.drain(..) {
551 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
552 region_id: self.region_id,
553 }));
554 }
555
556 if let Some(pending_compaction) = self.pending_request {
557 pending_compaction
558 .waiter
559 .send(Err(err.clone()).context(CompactRegionSnafu {
560 region_id: self.region_id,
561 }));
562 }
563 }
564
565 #[allow(clippy::too_many_arguments)]
569 fn new_compaction_request(
570 &mut self,
571 request_sender: Sender<WorkerRequestWithTime>,
572 mut waiter: OptionOutputTx,
573 engine_config: Arc<MitoConfig>,
574 cache_manager: CacheManagerRef,
575 manifest_ctx: &ManifestContextRef,
576 listener: WorkerListener,
577 schema_metadata_manager: SchemaMetadataManagerRef,
578 max_parallelism: usize,
579 ) -> CompactionRequest {
580 let current_version = CompactionVersion::from(self.version_control.current().version);
581 let start_time = Instant::now();
582 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
583 waiters.extend(std::mem::take(&mut self.waiters));
584
585 if let Some(waiter) = waiter.take_inner() {
586 waiters.push(waiter);
587 }
588
589 CompactionRequest {
590 engine_config,
591 current_version,
592 access_layer: self.access_layer.clone(),
593 request_sender: request_sender.clone(),
594 waiters,
595 start_time,
596 cache_manager,
597 manifest_ctx: manifest_ctx.clone(),
598 listener,
599 schema_metadata_manager,
600 max_parallelism,
601 }
602 }
603}
604
605#[derive(Debug, Clone)]
606pub struct CompactionOutput {
607 pub output_level: Level,
609 pub inputs: Vec<FileHandle>,
611 pub filter_deleted: bool,
613 pub output_time_range: Option<TimestampRange>,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct SerializedCompactionOutput {
620 output_level: Level,
621 inputs: Vec<FileMeta>,
622 filter_deleted: bool,
623 output_time_range: Option<TimestampRange>,
624}
625
626struct CompactionSstReaderBuilder<'a> {
628 metadata: RegionMetadataRef,
629 sst_layer: AccessLayerRef,
630 cache: CacheManagerRef,
631 inputs: &'a [FileHandle],
632 append_mode: bool,
633 filter_deleted: bool,
634 time_range: Option<TimestampRange>,
635 merge_mode: MergeMode,
636}
637
638impl CompactionSstReaderBuilder<'_> {
639 async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
641 let mut scan_input = ScanInput::new(
642 self.sst_layer,
643 ProjectionMapper::all(&self.metadata, false)?,
644 )
645 .with_files(self.inputs.to_vec())
646 .with_append_mode(self.append_mode)
647 .with_cache(CacheStrategy::Compaction(self.cache))
649 .with_filter_deleted(self.filter_deleted)
650 .with_ignore_file_not_found(true)
652 .with_merge_mode(self.merge_mode);
653
654 if let Some(time_range) = self.time_range {
657 scan_input =
658 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
659 }
660
661 SeqScan::new(scan_input, true)
662 .build_reader_for_compaction()
663 .await
664 }
665}
666
667fn time_range_to_predicate(
669 range: TimestampRange,
670 metadata: &RegionMetadataRef,
671) -> Result<PredicateGroup> {
672 let ts_col = metadata.time_index_column();
673
674 let ts_col_unit = ts_col
676 .column_schema
677 .data_type
678 .as_timestamp()
679 .unwrap()
680 .unit();
681
682 let exprs = match (range.start(), range.end()) {
683 (Some(start), Some(end)) => {
684 vec![
685 datafusion_expr::col(ts_col.column_schema.name.clone())
686 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
687 datafusion_expr::col(ts_col.column_schema.name.clone())
688 .lt(ts_to_lit(*end, ts_col_unit)?),
689 ]
690 }
691 (Some(start), None) => {
692 vec![datafusion_expr::col(ts_col.column_schema.name.clone())
693 .gt_eq(ts_to_lit(*start, ts_col_unit)?)]
694 }
695
696 (None, Some(end)) => {
697 vec![datafusion_expr::col(ts_col.column_schema.name.clone())
698 .lt(ts_to_lit(*end, ts_col_unit)?)]
699 }
700 (None, None) => {
701 return Ok(PredicateGroup::default());
702 }
703 };
704
705 let predicate = PredicateGroup::new(metadata, &exprs);
706 Ok(predicate)
707}
708
709fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
710 let ts = ts
711 .convert_to(ts_col_unit)
712 .context(TimeRangePredicateOverflowSnafu {
713 timestamp: ts,
714 unit: ts_col_unit,
715 })?;
716 let val = ts.value();
717 let scalar_value = match ts_col_unit {
718 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
719 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
720 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
721 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
722 };
723 Ok(datafusion_expr::lit(scalar_value))
724}
725
726fn get_expired_ssts(
728 levels: &[LevelMeta],
729 ttl: Option<TimeToLive>,
730 now: Timestamp,
731) -> Vec<FileHandle> {
732 let Some(ttl) = ttl else {
733 return vec![];
734 };
735
736 levels
737 .iter()
738 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
739 .collect()
740}
741
742struct PendingCompaction {
745 pub(crate) options: compact_request::Options,
747 pub(crate) waiter: OptionOutputTx,
749 pub(crate) max_parallelism: usize,
751}
752
753#[cfg(test)]
754mod tests {
755 use api::v1::region::StrictWindow;
756 use common_datasource::compression::CompressionType;
757 use tokio::sync::oneshot;
758
759 use super::*;
760 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
761 use crate::region::ManifestContext;
762 use crate::test_util::mock_schema_metadata_manager;
763 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
764 use crate::test_util::version_util::{apply_edit, VersionControlBuilder};
765
766 #[tokio::test]
767 async fn test_schedule_empty() {
768 let env = SchedulerEnv::new().await;
769 let (tx, _rx) = mpsc::channel(4);
770 let mut scheduler = env.mock_compaction_scheduler(tx);
771 let mut builder = VersionControlBuilder::new();
772 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
773 schema_metadata_manager
774 .register_region_table_info(
775 builder.region_id().table_id(),
776 "test_table",
777 "test_catalog",
778 "test_schema",
779 None,
780 kv_backend,
781 )
782 .await;
783 let version_control = Arc::new(builder.build());
785 let (output_tx, output_rx) = oneshot::channel();
786 let waiter = OptionOutputTx::from(output_tx);
787 let manifest_ctx = env
788 .mock_manifest_context(version_control.current().version.metadata.clone())
789 .await;
790 scheduler
791 .schedule_compaction(
792 builder.region_id(),
793 compact_request::Options::Regular(Default::default()),
794 &version_control,
795 &env.access_layer,
796 waiter,
797 &manifest_ctx,
798 schema_metadata_manager.clone(),
799 1,
800 )
801 .await
802 .unwrap();
803 let output = output_rx.await.unwrap().unwrap();
804 assert_eq!(output, 0);
805 assert!(scheduler.region_status.is_empty());
806
807 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
809 let (output_tx, output_rx) = oneshot::channel();
810 let waiter = OptionOutputTx::from(output_tx);
811 scheduler
812 .schedule_compaction(
813 builder.region_id(),
814 compact_request::Options::Regular(Default::default()),
815 &version_control,
816 &env.access_layer,
817 waiter,
818 &manifest_ctx,
819 schema_metadata_manager,
820 1,
821 )
822 .await
823 .unwrap();
824 let output = output_rx.await.unwrap().unwrap();
825 assert_eq!(output, 0);
826 assert!(scheduler.region_status.is_empty());
827 }
828
829 #[tokio::test]
830 async fn test_schedule_on_finished() {
831 common_telemetry::init_default_ut_logging();
832 let job_scheduler = Arc::new(VecScheduler::default());
833 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
834 let (tx, _rx) = mpsc::channel(4);
835 let mut scheduler = env.mock_compaction_scheduler(tx);
836 let mut builder = VersionControlBuilder::new();
837 let purger = builder.file_purger();
838 let region_id = builder.region_id();
839
840 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
841 schema_metadata_manager
842 .register_region_table_info(
843 builder.region_id().table_id(),
844 "test_table",
845 "test_catalog",
846 "test_schema",
847 None,
848 kv_backend,
849 )
850 .await;
851
852 let end = 1000 * 1000;
854 let version_control = Arc::new(
855 builder
856 .push_l0_file(0, end)
857 .push_l0_file(10, end)
858 .push_l0_file(50, end)
859 .push_l0_file(80, end)
860 .push_l0_file(90, end)
861 .build(),
862 );
863 let manifest_ctx = env
864 .mock_manifest_context(version_control.current().version.metadata.clone())
865 .await;
866 scheduler
867 .schedule_compaction(
868 region_id,
869 compact_request::Options::Regular(Default::default()),
870 &version_control,
871 &env.access_layer,
872 OptionOutputTx::none(),
873 &manifest_ctx,
874 schema_metadata_manager.clone(),
875 1,
876 )
877 .await
878 .unwrap();
879 assert_eq!(1, scheduler.region_status.len());
881 assert_eq!(1, job_scheduler.num_jobs());
882 let data = version_control.current();
883 let file_metas: Vec<_> = data.version.ssts.levels()[0]
884 .files
885 .values()
886 .map(|file| file.meta_ref().clone())
887 .collect();
888
889 apply_edit(
891 &version_control,
892 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
893 &file_metas,
894 purger.clone(),
895 );
896 let (tx, _rx) = oneshot::channel();
898 scheduler
899 .schedule_compaction(
900 region_id,
901 compact_request::Options::Regular(Default::default()),
902 &version_control,
903 &env.access_layer,
904 OptionOutputTx::new(Some(OutputTx::new(tx))),
905 &manifest_ctx,
906 schema_metadata_manager.clone(),
907 1,
908 )
909 .await
910 .unwrap();
911 assert_eq!(1, scheduler.region_status.len());
912 assert_eq!(1, job_scheduler.num_jobs());
913 assert!(!scheduler
914 .region_status
915 .get(&builder.region_id())
916 .unwrap()
917 .waiters
918 .is_empty());
919
920 scheduler
922 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
923 .await;
924 assert_eq!(1, scheduler.region_status.len());
925 assert_eq!(2, job_scheduler.num_jobs());
926
927 apply_edit(
929 &version_control,
930 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
931 &[],
932 purger.clone(),
933 );
934 let (tx, _rx) = oneshot::channel();
935 scheduler
937 .schedule_compaction(
938 region_id,
939 compact_request::Options::Regular(Default::default()),
940 &version_control,
941 &env.access_layer,
942 OptionOutputTx::new(Some(OutputTx::new(tx))),
943 &manifest_ctx,
944 schema_metadata_manager,
945 1,
946 )
947 .await
948 .unwrap();
949 assert_eq!(2, job_scheduler.num_jobs());
950 assert!(!scheduler
951 .region_status
952 .get(&builder.region_id())
953 .unwrap()
954 .waiters
955 .is_empty());
956 }
957
958 #[tokio::test]
959 async fn test_manual_compaction_when_compaction_in_progress() {
960 common_telemetry::init_default_ut_logging();
961 let job_scheduler = Arc::new(VecScheduler::default());
962 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
963 let (tx, _rx) = mpsc::channel(4);
964 let mut scheduler = env.mock_compaction_scheduler(tx);
965 let mut builder = VersionControlBuilder::new();
966 let purger = builder.file_purger();
967 let region_id = builder.region_id();
968
969 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
970 schema_metadata_manager
971 .register_region_table_info(
972 builder.region_id().table_id(),
973 "test_table",
974 "test_catalog",
975 "test_schema",
976 None,
977 kv_backend,
978 )
979 .await;
980
981 let end = 1000 * 1000;
983 let version_control = Arc::new(
984 builder
985 .push_l0_file(0, end)
986 .push_l0_file(10, end)
987 .push_l0_file(50, end)
988 .push_l0_file(80, end)
989 .push_l0_file(90, end)
990 .build(),
991 );
992 let manifest_ctx = env
993 .mock_manifest_context(version_control.current().version.metadata.clone())
994 .await;
995
996 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
997 .files
998 .values()
999 .map(|file| file.meta_ref().clone())
1000 .collect();
1001
1002 apply_edit(
1004 &version_control,
1005 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1006 &file_metas,
1007 purger.clone(),
1008 );
1009
1010 scheduler
1011 .schedule_compaction(
1012 region_id,
1013 compact_request::Options::Regular(Default::default()),
1014 &version_control,
1015 &env.access_layer,
1016 OptionOutputTx::none(),
1017 &manifest_ctx,
1018 schema_metadata_manager.clone(),
1019 1,
1020 )
1021 .await
1022 .unwrap();
1023 assert_eq!(1, scheduler.region_status.len());
1025 assert_eq!(1, job_scheduler.num_jobs());
1026 assert!(scheduler
1027 .region_status
1028 .get(®ion_id)
1029 .unwrap()
1030 .pending_request
1031 .is_none());
1032
1033 let (tx, _rx) = oneshot::channel();
1035 scheduler
1036 .schedule_compaction(
1037 region_id,
1038 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1039 &version_control,
1040 &env.access_layer,
1041 OptionOutputTx::new(Some(OutputTx::new(tx))),
1042 &manifest_ctx,
1043 schema_metadata_manager.clone(),
1044 1,
1045 )
1046 .await
1047 .unwrap();
1048 assert_eq!(1, scheduler.region_status.len());
1049 assert_eq!(1, job_scheduler.num_jobs());
1051 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1052 assert!(status.pending_request.is_some());
1053
1054 scheduler
1056 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1057 .await;
1058 assert_eq!(1, scheduler.region_status.len());
1059 assert_eq!(2, job_scheduler.num_jobs());
1060
1061 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1062 assert!(status.pending_request.is_none());
1063 }
1064
1065 #[tokio::test]
1066 async fn test_compaction_bypass_in_staging_mode() {
1067 let env = SchedulerEnv::new().await;
1068 let (tx, _rx) = mpsc::channel(4);
1069 let mut scheduler = env.mock_compaction_scheduler(tx);
1070
1071 let builder = VersionControlBuilder::new();
1073 let version_control = Arc::new(builder.build());
1074 let region_id = version_control.current().version.metadata.region_id;
1075
1076 let staging_manifest_ctx = {
1078 let manager = RegionManifestManager::new(
1079 version_control.current().version.metadata.clone(),
1080 0,
1081 RegionManifestOptions {
1082 manifest_dir: "".to_string(),
1083 object_store: env.access_layer.object_store().clone(),
1084 compress_type: CompressionType::Uncompressed,
1085 checkpoint_distance: 10,
1086 remove_file_options: Default::default(),
1087 },
1088 Default::default(),
1089 Default::default(),
1090 )
1091 .await
1092 .unwrap();
1093 Arc::new(ManifestContext::new(
1094 manager,
1095 RegionRoleState::Leader(RegionLeaderState::Staging),
1096 ))
1097 };
1098
1099 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1100
1101 let (tx, rx) = oneshot::channel();
1103 scheduler
1104 .schedule_compaction(
1105 region_id,
1106 compact_request::Options::Regular(Default::default()),
1107 &version_control,
1108 &env.access_layer,
1109 OptionOutputTx::new(Some(OutputTx::new(tx))),
1110 &staging_manifest_ctx,
1111 schema_metadata_manager,
1112 1,
1113 )
1114 .await
1115 .unwrap();
1116
1117 let result = rx.await.unwrap();
1118 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1120 }
1121}