1mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::{Arc, Mutex};
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use common_base::Plugins;
33use common_base::cancellation::CancellationHandle;
34use common_memory_manager::OnExhaustedPolicy;
35use common_meta::key::SchemaMetadataManagerRef;
36use common_telemetry::{debug, error, info, warn};
37use common_time::range::TimestampRange;
38use common_time::timestamp::TimeUnit;
39use common_time::{TimeToLive, Timestamp};
40use datafusion_common::ScalarValue;
41use datafusion_expr::Expr;
42use serde::{Deserialize, Serialize};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{RegionId, TableId};
46use task::MAX_PARALLEL_COMPACTION;
47use tokio::sync::mpsc::{self, Sender};
48
49use crate::access_layer::AccessLayerRef;
50use crate::cache::{CacheManagerRef, CacheStrategy};
51use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
52use crate::compaction::memory_manager::CompactionMemoryManager;
53use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
54use crate::compaction::task::CompactionTaskImpl;
55use crate::config::MitoConfig;
56use crate::error::{
57 CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu,
58 ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
59 RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
60};
61use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
62use crate::read::FlatSource;
63use crate::read::flat_projection::FlatProjectionMapper;
64use crate::read::scan_region::{PredicateGroup, ScanInput};
65use crate::read::seq_scan::SeqScan;
66use crate::region::options::{MergeMode, RegionOptions};
67use crate::region::version::VersionControlRef;
68use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
69use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
70use crate::schedule::remote_job_scheduler::{
71 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
72};
73use crate::schedule::scheduler::SchedulerRef;
74use crate::sst::file::{FileHandle, FileMeta, Level};
75use crate::sst::version::LevelMeta;
76use crate::worker::WorkerListener;
77
78pub struct CompactionRequest {
80 pub(crate) engine_config: Arc<MitoConfig>,
81 pub(crate) current_version: CompactionVersion,
82 pub(crate) access_layer: AccessLayerRef,
83 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
85 pub(crate) waiters: Vec<OutputTx>,
87 pub(crate) start_time: Instant,
89 pub(crate) cache_manager: CacheManagerRef,
90 pub(crate) manifest_ctx: ManifestContextRef,
91 pub(crate) listener: WorkerListener,
92 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
93 pub(crate) max_parallelism: usize,
94}
95
96impl CompactionRequest {
97 pub(crate) fn region_id(&self) -> RegionId {
98 self.current_version.metadata.region_id
99 }
100}
101
102pub(crate) struct CompactionScheduler {
104 scheduler: SchedulerRef,
105 region_status: HashMap<RegionId, CompactionStatus>,
107 request_sender: Sender<WorkerRequestWithTime>,
109 cache_manager: CacheManagerRef,
110 engine_config: Arc<MitoConfig>,
111 memory_manager: Arc<CompactionMemoryManager>,
112 memory_policy: OnExhaustedPolicy,
113 listener: WorkerListener,
114 plugins: Plugins,
116}
117
118impl CompactionScheduler {
119 #[allow(clippy::too_many_arguments)]
120 pub(crate) fn new(
121 scheduler: SchedulerRef,
122 request_sender: Sender<WorkerRequestWithTime>,
123 cache_manager: CacheManagerRef,
124 engine_config: Arc<MitoConfig>,
125 listener: WorkerListener,
126 plugins: Plugins,
127 memory_manager: Arc<CompactionMemoryManager>,
128 memory_policy: OnExhaustedPolicy,
129 ) -> Self {
130 Self {
131 scheduler,
132 region_status: HashMap::new(),
133 request_sender,
134 cache_manager,
135 engine_config,
136 memory_manager,
137 memory_policy,
138 listener,
139 plugins,
140 }
141 }
142
143 #[allow(clippy::too_many_arguments)]
145 pub(crate) async fn schedule_compaction(
146 &mut self,
147 region_id: RegionId,
148 compact_options: compact_request::Options,
149 version_control: &VersionControlRef,
150 access_layer: &AccessLayerRef,
151 waiter: OptionOutputTx,
152 manifest_ctx: &ManifestContextRef,
153 schema_metadata_manager: SchemaMetadataManagerRef,
154 max_parallelism: usize,
155 ) -> Result<()> {
156 let current_state = manifest_ctx.current_state();
158 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
159 info!(
160 "Skipping compaction for region {} in staging mode, options: {:?}",
161 region_id, compact_options
162 );
163 waiter.send(Ok(0));
164 return Ok(());
165 }
166
167 if let Some(status) = self.region_status.get_mut(®ion_id) {
168 match compact_options {
169 Options::Regular(_) => {
170 status.merge_waiter(waiter);
172 }
173 options @ Options::StrictWindow(_) => {
174 status.set_pending_request(PendingCompaction {
176 options,
177 waiter,
178 max_parallelism,
179 });
180 info!(
181 "Region {} is compacting, manually compaction will be re-scheduled.",
182 region_id
183 );
184 }
185 }
186 return Ok(());
187 }
188
189 let mut status =
191 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
192 let request = status.new_compaction_request(
193 self.request_sender.clone(),
194 waiter,
195 self.engine_config.clone(),
196 self.cache_manager.clone(),
197 manifest_ctx,
198 self.listener.clone(),
199 schema_metadata_manager,
200 max_parallelism,
201 );
202
203 let result = match self
204 .schedule_compaction_request(request, compact_options)
205 .await
206 {
207 Ok(Some(active_compaction)) => {
208 status.active_compaction = Some(active_compaction);
212 self.region_status.insert(region_id, status);
213
214 Ok(())
215 }
216 Ok(None) => Ok(()),
217 Err(e) => Err(e),
218 };
219
220 self.listener.on_compaction_scheduled(region_id);
221 result
222 }
223
224 pub(crate) async fn handle_pending_compaction_request(
228 &mut self,
229 region_id: RegionId,
230 manifest_ctx: &ManifestContextRef,
231 schema_metadata_manager: SchemaMetadataManagerRef,
232 ) -> bool {
233 let Some(status) = self.region_status.get_mut(®ion_id) else {
234 return true;
235 };
236
237 let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
240 return false;
241 };
242
243 let PendingCompaction {
244 options,
245 waiter,
246 max_parallelism,
247 } = pending_request;
248
249 let request = {
250 status.new_compaction_request(
251 self.request_sender.clone(),
252 waiter,
253 self.engine_config.clone(),
254 self.cache_manager.clone(),
255 manifest_ctx,
256 self.listener.clone(),
257 schema_metadata_manager,
258 max_parallelism,
259 )
260 };
261
262 match self.schedule_compaction_request(request, options).await {
263 Ok(Some(active_compaction)) => {
264 let status = self.region_status.get_mut(®ion_id).unwrap();
265 status.active_compaction = Some(active_compaction);
266 debug!(
267 "Successfully scheduled manual compaction for region id: {}",
268 region_id
269 );
270 true
271 }
272 Ok(None) => {
273 false
276 }
277 Err(e) => {
278 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
279 self.remove_region_on_failure(region_id, Arc::new(e));
280 true
281 }
282 }
283 }
284
285 pub(crate) async fn on_compaction_finished(
287 &mut self,
288 region_id: RegionId,
289 manifest_ctx: &ManifestContextRef,
290 schema_metadata_manager: SchemaMetadataManagerRef,
291 ) -> Vec<SenderDdlRequest> {
292 let Some(status) = self.region_status.get_mut(®ion_id) else {
293 return Vec::new();
294 };
295 status.clear_running_task();
296
297 if self
300 .handle_pending_compaction_request(
301 region_id,
302 manifest_ctx,
303 schema_metadata_manager.clone(),
304 )
305 .await
306 {
307 return Vec::new();
308 }
309
310 let Some(status) = self.region_status.get_mut(®ion_id) else {
311 return Vec::new();
314 };
315
316 for waiter in std::mem::take(&mut status.waiters) {
317 waiter.send(Ok(0));
318 }
319
320 let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
322 if !pending_ddl_requests.is_empty() {
323 self.region_status.remove(®ion_id);
324 return pending_ddl_requests;
327 }
328
329 let request = status.new_compaction_request(
331 self.request_sender.clone(),
332 OptionOutputTx::none(),
333 self.engine_config.clone(),
334 self.cache_manager.clone(),
335 manifest_ctx,
336 self.listener.clone(),
337 schema_metadata_manager,
338 MAX_PARALLEL_COMPACTION,
339 );
340
341 match self
343 .schedule_compaction_request(
344 request,
345 compact_request::Options::Regular(Default::default()),
346 )
347 .await
348 {
349 Ok(Some(active_compaction)) => {
350 self.region_status
351 .get_mut(®ion_id)
352 .unwrap()
353 .active_compaction = Some(active_compaction);
354 debug!(
355 "Successfully scheduled next compaction for region id: {}",
356 region_id
357 );
358 }
359 Ok(None) => {
360 self.region_status.remove(®ion_id);
364 }
365 Err(e) => {
366 error!(e; "Failed to schedule next compaction for region {}", region_id);
367 self.remove_region_on_failure(region_id, Arc::new(e));
368 }
369 }
370
371 Vec::new()
372 }
373
374 pub(crate) async fn on_compaction_cancelled(
376 &mut self,
377 region_id: RegionId,
378 ) -> Vec<SenderDdlRequest> {
379 self.remove_region_on_cancel(region_id)
380 }
381
382 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
384 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
385 self.remove_region_on_failure(region_id, err);
386 }
387
388 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
390 self.remove_region_on_failure(
391 region_id,
392 Arc::new(RegionDroppedSnafu { region_id }.build()),
393 );
394 }
395
396 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
398 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
399 }
400
401 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
403 self.remove_region_on_failure(
404 region_id,
405 Arc::new(RegionTruncatedSnafu { region_id }.build()),
406 );
407 }
408
409 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
414 debug!(
415 "Added pending DDL request for region: {}, ddl: {:?}",
416 request.region_id, request.request
417 );
418 let status = self.region_status.get_mut(&request.region_id).unwrap();
419 status.pending_ddl_requests.push(request);
420 }
421
422 #[cfg(test)]
423 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
424 let has_pending = self
425 .region_status
426 .get(®ion_id)
427 .map(|status| !status.pending_ddl_requests.is_empty())
428 .unwrap_or(false);
429 debug!(
430 "Checked pending DDL requests for region: {}, has_pending: {}",
431 region_id, has_pending
432 );
433 has_pending
434 }
435
436 pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
437 let Some(status) = self.region_status.get_mut(®ion_id) else {
438 return RequestCancelResult::NotRunning;
439 };
440
441 status.request_cancel()
442 }
443
444 async fn schedule_compaction_request(
449 &mut self,
450 request: CompactionRequest,
451 options: compact_request::Options,
452 ) -> Result<Option<ActiveCompaction>> {
453 let region_id = request.region_id();
454 let (dynamic_compaction_opts, ttl) = find_dynamic_options(
455 region_id.table_id(),
456 &request.current_version.options,
457 &request.schema_metadata_manager,
458 )
459 .await
460 .unwrap_or_else(|e| {
461 warn!(e; "Failed to find dynamic options for region: {}", region_id);
462 (
463 request.current_version.options.compaction.clone(),
464 request.current_version.options.ttl.unwrap_or_default(),
465 )
466 });
467
468 let picker = new_picker(
469 &options,
470 &dynamic_compaction_opts,
471 request.current_version.options.append_mode,
472 Some(self.engine_config.max_background_compactions),
473 );
474 let region_id = request.region_id();
475 let CompactionRequest {
476 engine_config,
477 current_version,
478 access_layer,
479 request_sender,
480 waiters,
481 start_time,
482 cache_manager,
483 manifest_ctx,
484 listener,
485 schema_metadata_manager: _,
486 max_parallelism,
487 } = request;
488
489 debug!(
490 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
491 picker, region_id, ttl
492 );
493
494 let compaction_region = CompactionRegion {
495 region_id,
496 current_version: current_version.clone(),
497 region_options: RegionOptions {
498 compaction: dynamic_compaction_opts.clone(),
499 ..current_version.options.clone()
500 },
501 engine_config: engine_config.clone(),
502 region_metadata: current_version.metadata.clone(),
503 cache_manager: cache_manager.clone(),
504 access_layer: access_layer.clone(),
505 manifest_ctx: manifest_ctx.clone(),
506 file_purger: None,
507 ttl: Some(ttl),
508 max_parallelism,
509 };
510
511 let picker_output = {
512 let _pick_timer = COMPACTION_STAGE_ELAPSED
513 .with_label_values(&["pick"])
514 .start_timer();
515 picker.pick(&compaction_region)
516 };
517
518 let picker_output = if let Some(picker_output) = picker_output {
519 picker_output
520 } else {
521 for waiter in waiters {
523 waiter.send(Ok(0));
524 }
525 return Ok(None);
526 };
527
528 let waiters = if dynamic_compaction_opts.remote_compaction() {
531 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
532 let remote_compaction_job = CompactionJob {
533 compaction_region: compaction_region.clone(),
534 picker_output: picker_output.clone(),
535 start_time,
536 waiters,
537 ttl,
538 };
539
540 let result = remote_job_scheduler
541 .schedule(
542 RemoteJob::CompactionJob(remote_compaction_job),
543 Box::new(DefaultNotifier {
544 request_sender: request_sender.clone(),
545 }),
546 )
547 .await;
548
549 match result {
550 Ok(job_id) => {
551 info!(
552 "Scheduled remote compaction job {} for region {}",
553 job_id, region_id
554 );
555 INFLIGHT_COMPACTION_COUNT.inc();
556 return Ok(Some(ActiveCompaction::Remote));
557 }
558 Err(e) => {
559 if !dynamic_compaction_opts.fallback_to_local() {
560 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
561 return RemoteCompactionSnafu {
562 region_id,
563 job_id: None,
564 reason: e.reason,
565 }
566 .fail();
567 }
568
569 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
570
571 e.waiters
573 }
574 }
575 } else {
576 debug!(
577 "Remote compaction is not enabled, fallback to local compaction for region {}",
578 region_id
579 );
580 waiters
581 }
582 } else {
583 waiters
584 };
585
586 let estimated_bytes = estimate_compaction_bytes(&picker_output);
588
589 let cancel_handle = Arc::new(CancellationHandle::default());
590 let state = LocalCompactionState::new(cancel_handle.clone());
591 let local_compaction_task = Box::new(CompactionTaskImpl {
592 state: state.clone(),
593 request_sender,
594 waiters,
595 start_time,
596 listener,
597 picker_output,
598 compaction_region,
599 compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
600 memory_manager: self.memory_manager.clone(),
601 memory_policy: self.memory_policy,
602 estimated_memory_bytes: estimated_bytes,
603 });
604
605 self.submit_compaction_task(local_compaction_task, region_id)
606 .map(|_| Some(ActiveCompaction::Local { state }))
607 }
608
609 fn submit_compaction_task(
610 &mut self,
611 mut task: Box<CompactionTaskImpl>,
612 region_id: RegionId,
613 ) -> Result<()> {
614 self.scheduler
615 .schedule(Box::pin(async move {
616 INFLIGHT_COMPACTION_COUNT.inc();
617 task.run().await;
618 INFLIGHT_COMPACTION_COUNT.dec();
619 }))
620 .inspect_err(
621 |e| error!(e; "Failed to submit compaction request for region {}", region_id),
622 )
623 }
624
625 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
626 let Some(status) = self.region_status.remove(®ion_id) else {
628 return;
629 };
630
631 status.on_failure(err);
633 }
634
635 fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
636 let Some(status) = self.region_status.remove(®ion_id) else {
637 return Vec::new();
638 };
639
640 status.on_cancel()
641 }
642}
643
644#[derive(Debug, Clone)]
645pub(crate) struct LocalCompactionState {
646 cancel_handle: Arc<CancellationHandle>,
647 commit_started: Arc<Mutex<bool>>,
648}
649
650#[derive(Debug)]
651enum ActiveCompaction {
652 Local { state: LocalCompactionState },
653 Remote,
654}
655
656impl LocalCompactionState {
657 fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
658 Self {
659 cancel_handle,
660 commit_started: Arc::new(Mutex::new(false)),
661 }
662 }
663
664 pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
666 self.cancel_handle.clone()
667 }
668
669 pub(crate) fn mark_commit_started(&self) -> bool {
675 let mut commit_started = self.commit_started.lock().unwrap();
676 if self.cancel_handle.is_cancelled() {
677 return false;
678 }
679 *commit_started = true;
680 true
681 }
682
683 pub(crate) fn request_cancel(&self) -> RequestCancelResult {
685 let commit_started = self.commit_started.lock().unwrap();
687 if *commit_started {
688 return RequestCancelResult::TooLateToCancel;
689 }
690 if self.cancel_handle.is_cancelled() {
691 return RequestCancelResult::AlreadyCancelling;
692 }
693
694 self.cancel_handle.cancel();
695 RequestCancelResult::CancelIssued
696 }
697}
698
699#[derive(Debug, Clone, Copy, PartialEq, Eq)]
700pub(crate) enum RequestCancelResult {
701 CancelIssued,
702 AlreadyCancelling,
703 TooLateToCancel,
704 NotRunning,
705}
706
707impl Drop for CompactionScheduler {
708 fn drop(&mut self) {
709 for (region_id, status) in self.region_status.drain() {
710 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
712 }
713 }
714}
715
716async fn find_dynamic_options(
718 table_id: TableId,
719 region_options: &crate::region::options::RegionOptions,
720 schema_metadata_manager: &SchemaMetadataManagerRef,
721) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
722 if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
723 debug!(
724 "Use region options directly for table {}: compaction={:?}, ttl={:?}",
725 table_id, region_options.compaction, region_options.ttl
726 );
727 return Ok((region_options.compaction.clone(), ttl));
728 }
729
730 let db_options = tokio::time::timeout(
731 crate::config::FETCH_OPTION_TIMEOUT,
732 schema_metadata_manager.get_schema_options_by_table_id(table_id),
733 )
734 .await
735 .context(TimeoutSnafu)?
736 .context(GetSchemaMetadataSnafu)?;
737
738 let ttl = if let Some(ttl) = region_options.ttl {
739 debug!(
740 "Use region TTL directly for table {}: ttl={:?}",
741 table_id, region_options.ttl
742 );
743 ttl
744 } else {
745 db_options
746 .as_ref()
747 .and_then(|options| options.ttl)
748 .unwrap_or_default()
749 .into()
750 };
751
752 let compaction = if !region_options.compaction_override {
753 if let Some(schema_opts) = db_options {
754 let map: HashMap<String, String> = schema_opts
755 .extra_options
756 .iter()
757 .filter_map(|(k, v)| {
758 if k.starts_with("compaction.") {
759 Some((k.clone(), v.clone()))
760 } else {
761 None
762 }
763 })
764 .collect();
765 if map.is_empty() {
766 region_options.compaction.clone()
767 } else {
768 crate::region::options::RegionOptions::try_from(&map)
769 .map(|o| o.compaction)
770 .unwrap_or_else(|e| {
771 error!(e; "Failed to create RegionOptions from map");
772 region_options.compaction.clone()
773 })
774 }
775 } else {
776 debug!(
777 "DB options is None for table {}, use region compaction: compaction={:?}",
778 table_id, region_options.compaction
779 );
780 region_options.compaction.clone()
781 }
782 } else {
783 debug!(
784 "No schema options for table {}, use region compaction: compaction={:?}",
785 table_id, region_options.compaction
786 );
787 region_options.compaction.clone()
788 };
789
790 debug!(
791 "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
792 table_id, compaction, ttl
793 );
794 Ok((compaction, ttl))
795}
796
797struct CompactionStatus {
799 region_id: RegionId,
801 version_control: VersionControlRef,
803 access_layer: AccessLayerRef,
805 waiters: Vec<OutputTx>,
807 pending_request: Option<PendingCompaction>,
809 pending_ddl_requests: Vec<SenderDdlRequest>,
811 active_compaction: Option<ActiveCompaction>,
813}
814
815impl CompactionStatus {
816 fn new(
818 region_id: RegionId,
819 version_control: VersionControlRef,
820 access_layer: AccessLayerRef,
821 ) -> CompactionStatus {
822 CompactionStatus {
823 region_id,
824 version_control,
825 access_layer,
826 waiters: Vec::new(),
827 pending_request: None,
828 pending_ddl_requests: Vec::new(),
829 active_compaction: None,
830 }
831 }
832
833 #[cfg(test)]
834 fn start_local_task(&mut self) -> LocalCompactionState {
835 let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
836 self.active_compaction = Some(ActiveCompaction::Local {
837 state: state.clone(),
838 });
839 state
840 }
841
842 #[cfg(test)]
843 fn start_remote_task(&mut self) {
844 self.active_compaction = Some(ActiveCompaction::Remote);
845 }
846
847 fn request_cancel(&mut self) -> RequestCancelResult {
848 let Some(active_compaction) = &self.active_compaction else {
849 return RequestCancelResult::NotRunning;
850 };
851
852 match active_compaction {
853 ActiveCompaction::Local { state, .. } => state.request_cancel(),
854 ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
855 }
856 }
857
858 fn clear_running_task(&mut self) -> bool {
859 self.active_compaction.take().is_some()
860 }
861
862 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
864 if let Some(waiter) = waiter.take_inner() {
865 self.waiters.push(waiter);
866 }
867 }
868
869 fn set_pending_request(&mut self, pending: PendingCompaction) {
871 if let Some(prev) = self.pending_request.replace(pending) {
872 debug!(
873 "Replace pending compaction options with new request {:?} for region: {}",
874 prev.options, self.region_id
875 );
876 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
877 }
878 }
879
880 fn on_failure(mut self, err: Arc<Error>) {
881 for waiter in self.waiters.drain(..) {
882 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
883 region_id: self.region_id,
884 }));
885 }
886
887 if let Some(pending_compaction) = self.pending_request {
888 pending_compaction
889 .waiter
890 .send(Err(err.clone()).context(CompactRegionSnafu {
891 region_id: self.region_id,
892 }));
893 }
894
895 for pending_ddl in self.pending_ddl_requests {
896 pending_ddl
897 .sender
898 .send(Err(err.clone()).context(CompactRegionSnafu {
899 region_id: self.region_id,
900 }));
901 }
902 }
903
904 #[must_use]
905 fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
906 for waiter in self.waiters.drain(..) {
907 waiter.send(CompactionCancelledSnafu.fail());
908 }
909
910 if let Some(pending_compaction) = self.pending_request {
911 pending_compaction.waiter.send(
912 Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
913 region_id: self.region_id,
914 }),
915 );
916 }
917
918 std::mem::take(&mut self.pending_ddl_requests)
919 }
920
921 #[allow(clippy::too_many_arguments)]
925 fn new_compaction_request(
926 &mut self,
927 request_sender: Sender<WorkerRequestWithTime>,
928 mut waiter: OptionOutputTx,
929 engine_config: Arc<MitoConfig>,
930 cache_manager: CacheManagerRef,
931 manifest_ctx: &ManifestContextRef,
932 listener: WorkerListener,
933 schema_metadata_manager: SchemaMetadataManagerRef,
934 max_parallelism: usize,
935 ) -> CompactionRequest {
936 let current_version = CompactionVersion::from(self.version_control.current().version);
937 let start_time = Instant::now();
938 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
939 waiters.extend(std::mem::take(&mut self.waiters));
940
941 if let Some(waiter) = waiter.take_inner() {
942 waiters.push(waiter);
943 }
944
945 CompactionRequest {
946 engine_config,
947 current_version,
948 access_layer: self.access_layer.clone(),
949 request_sender: request_sender.clone(),
950 waiters,
951 start_time,
952 cache_manager,
953 manifest_ctx: manifest_ctx.clone(),
954 listener,
955 schema_metadata_manager,
956 max_parallelism,
957 }
958 }
959}
960
961#[derive(Debug, Clone)]
962pub struct CompactionOutput {
963 pub output_level: Level,
965 pub inputs: Vec<FileHandle>,
967 pub filter_deleted: bool,
969 pub output_time_range: Option<TimestampRange>,
971}
972
973#[derive(Debug, Clone, Serialize, Deserialize)]
975pub struct SerializedCompactionOutput {
976 output_level: Level,
977 inputs: Vec<FileMeta>,
978 filter_deleted: bool,
979 output_time_range: Option<TimestampRange>,
980}
981
982struct CompactionSstReaderBuilder<'a> {
984 metadata: RegionMetadataRef,
985 sst_layer: AccessLayerRef,
986 cache: CacheManagerRef,
987 inputs: &'a [FileHandle],
988 append_mode: bool,
989 filter_deleted: bool,
990 time_range: Option<TimestampRange>,
991 merge_mode: MergeMode,
992}
993
994impl CompactionSstReaderBuilder<'_> {
995 async fn build_flat_sst_reader(self) -> Result<FlatSource> {
998 let scan_input = self.build_scan_input()?.with_compaction(true);
999
1000 let schema = scan_input.mapper.output_schema();
1001 let schema = schema.arrow_schema();
1002
1003 SeqScan::new(scan_input)
1004 .build_flat_reader_for_compaction()
1005 .await
1006 .map(|stream| FlatSource::new_stream(schema.clone(), stream))
1007 }
1008
1009 fn build_scan_input(self) -> Result<ScanInput> {
1010 let mapper = FlatProjectionMapper::all(&self.metadata)?;
1011 let mut scan_input = ScanInput::new(self.sst_layer, mapper)
1012 .with_files(self.inputs.to_vec())
1013 .with_append_mode(self.append_mode)
1014 .with_cache(CacheStrategy::Compaction(self.cache))
1016 .with_filter_deleted(self.filter_deleted)
1017 .with_ignore_file_not_found(true)
1019 .with_merge_mode(self.merge_mode);
1020
1021 if let Some(time_range) = self.time_range {
1024 scan_input =
1025 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
1026 }
1027
1028 Ok(scan_input)
1029 }
1030}
1031
1032fn time_range_to_predicate(
1034 range: TimestampRange,
1035 metadata: &RegionMetadataRef,
1036) -> Result<PredicateGroup> {
1037 let ts_col = metadata.time_index_column();
1038
1039 let ts_col_unit = ts_col
1041 .column_schema
1042 .data_type
1043 .as_timestamp()
1044 .unwrap()
1045 .unit();
1046
1047 let exprs = match (range.start(), range.end()) {
1048 (Some(start), Some(end)) => {
1049 vec![
1050 datafusion_expr::col(ts_col.column_schema.name.clone())
1051 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1052 datafusion_expr::col(ts_col.column_schema.name.clone())
1053 .lt(ts_to_lit(*end, ts_col_unit)?),
1054 ]
1055 }
1056 (Some(start), None) => {
1057 vec![
1058 datafusion_expr::col(ts_col.column_schema.name.clone())
1059 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1060 ]
1061 }
1062
1063 (None, Some(end)) => {
1064 vec![
1065 datafusion_expr::col(ts_col.column_schema.name.clone())
1066 .lt(ts_to_lit(*end, ts_col_unit)?),
1067 ]
1068 }
1069 (None, None) => {
1070 return Ok(PredicateGroup::default());
1071 }
1072 };
1073
1074 let predicate = PredicateGroup::new(metadata, &exprs)?;
1075 Ok(predicate)
1076}
1077
1078fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
1079 let ts = ts
1080 .convert_to(ts_col_unit)
1081 .context(TimeRangePredicateOverflowSnafu {
1082 timestamp: ts,
1083 unit: ts_col_unit,
1084 })?;
1085 let val = ts.value();
1086 let scalar_value = match ts_col_unit {
1087 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
1088 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
1089 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
1090 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
1091 };
1092 Ok(datafusion_expr::lit(scalar_value))
1093}
1094
1095fn get_expired_ssts(
1097 levels: &[LevelMeta],
1098 ttl: Option<TimeToLive>,
1099 now: Timestamp,
1100) -> Vec<FileHandle> {
1101 let Some(ttl) = ttl else {
1102 return vec![];
1103 };
1104
1105 levels
1106 .iter()
1107 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
1108 .collect()
1109}
1110
1111fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
1114 picker_output
1115 .outputs
1116 .iter()
1117 .flat_map(|output| output.inputs.iter())
1118 .map(|file: &FileHandle| {
1119 let meta = file.meta_ref();
1120 meta.max_row_group_uncompressed_size
1121 })
1122 .sum()
1123}
1124
1125struct PendingCompaction {
1128 pub(crate) options: compact_request::Options,
1130 pub(crate) waiter: OptionOutputTx,
1132 pub(crate) max_parallelism: usize,
1134}
1135
1136#[cfg(test)]
1137mod tests {
1138 use std::assert_matches;
1139 use std::time::Duration;
1140
1141 use api::v1::region::StrictWindow;
1142 use common_datasource::compression::CompressionType;
1143 use common_meta::key::schema_name::SchemaNameValue;
1144 use common_time::DatabaseTimeToLive;
1145 use tokio::sync::{Barrier, oneshot};
1146
1147 use super::*;
1148 use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
1149 use crate::error::InvalidSchedulerStateSnafu;
1150 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1151 use crate::region::ManifestContext;
1152 use crate::schedule::scheduler::{Job, Scheduler};
1153 use crate::sst::FormatType;
1154 use crate::test_util::mock_schema_metadata_manager;
1155 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1156 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1157
1158 struct FailingScheduler;
1159
1160 #[async_trait::async_trait]
1161 impl Scheduler for FailingScheduler {
1162 fn schedule(&self, _job: Job) -> Result<()> {
1163 InvalidSchedulerStateSnafu.fail()
1164 }
1165
1166 async fn stop(&self, _await_termination: bool) -> Result<()> {
1167 Ok(())
1168 }
1169 }
1170
1171 #[tokio::test]
1172 async fn test_find_compaction_options_db_level() {
1173 let env = SchedulerEnv::new().await;
1174 let builder = VersionControlBuilder::new();
1175 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1176 let region_id = builder.region_id();
1177 let table_id = region_id.table_id();
1178 let mut schema_value = SchemaNameValue {
1180 ttl: Some(DatabaseTimeToLive::default()),
1181 ..Default::default()
1182 };
1183 schema_value
1184 .extra_options
1185 .insert("compaction.type".to_string(), "twcs".to_string());
1186 schema_value
1187 .extra_options
1188 .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1189 schema_metadata_manager
1190 .register_region_table_info(
1191 table_id,
1192 "t",
1193 "c",
1194 "s",
1195 Some(schema_value),
1196 kv_backend.clone(),
1197 )
1198 .await;
1199
1200 let version_control = Arc::new(builder.build());
1201 let region_opts = version_control.current().version.options.clone();
1202 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
1203 .await
1204 .unwrap();
1205 match opts {
1206 crate::region::options::CompactionOptions::Twcs(t) => {
1207 assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1208 }
1209 }
1210 let manifest_ctx = env
1211 .mock_manifest_context(version_control.current().version.metadata.clone())
1212 .await;
1213 let (tx, _rx) = mpsc::channel(4);
1214 let mut scheduler = env.mock_compaction_scheduler(tx);
1215 let (otx, _orx) = oneshot::channel();
1216 let request = scheduler
1217 .region_status
1218 .entry(region_id)
1219 .or_insert_with(|| {
1220 crate::compaction::CompactionStatus::new(
1221 region_id,
1222 version_control.clone(),
1223 env.access_layer.clone(),
1224 )
1225 })
1226 .new_compaction_request(
1227 scheduler.request_sender.clone(),
1228 OptionOutputTx::new(Some(OutputTx::new(otx))),
1229 scheduler.engine_config.clone(),
1230 scheduler.cache_manager.clone(),
1231 &manifest_ctx,
1232 scheduler.listener.clone(),
1233 schema_metadata_manager.clone(),
1234 1,
1235 );
1236 scheduler
1237 .schedule_compaction_request(
1238 request,
1239 compact_request::Options::Regular(Default::default()),
1240 )
1241 .await
1242 .unwrap();
1243 }
1244
1245 #[tokio::test]
1246 async fn test_find_compaction_options_priority() {
1247 fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1248 let mut schema_value = SchemaNameValue {
1249 ttl: Some(DatabaseTimeToLive::default()),
1250 ..Default::default()
1251 };
1252 schema_value
1253 .extra_options
1254 .insert("compaction.type".to_string(), "twcs".to_string());
1255 schema_value.extra_options.insert(
1256 "compaction.twcs.time_window".to_string(),
1257 time_window.to_string(),
1258 );
1259 schema_value
1260 }
1261
1262 let cases = [
1263 (
1264 "db options set and table override set",
1265 Some(schema_value_with_twcs("2h")),
1266 true,
1267 Some(Duration::from_secs(5 * 3600)),
1268 Some(5 * 3600),
1269 ),
1270 (
1271 "db options set and table override not set",
1272 Some(schema_value_with_twcs("2h")),
1273 false,
1274 None,
1275 Some(2 * 3600),
1276 ),
1277 (
1278 "db options not set and table override set",
1279 None,
1280 true,
1281 Some(Duration::from_secs(4 * 3600)),
1282 Some(4 * 3600),
1283 ),
1284 (
1285 "db options not set and table override not set",
1286 None,
1287 false,
1288 None,
1289 None,
1290 ),
1291 ];
1292
1293 for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1294 let builder = VersionControlBuilder::new();
1295 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1296 let table_id = builder.region_id().table_id();
1297 schema_metadata_manager
1298 .register_region_table_info(
1299 table_id,
1300 "t",
1301 "c",
1302 "s",
1303 schema_value,
1304 kv_backend.clone(),
1305 )
1306 .await;
1307
1308 let version_control = Arc::new(builder.build());
1309 let mut region_opts = version_control.current().version.options.clone();
1310 region_opts.compaction_override = override_set;
1311 if let Some(window) = table_window {
1312 let crate::region::options::CompactionOptions::Twcs(twcs) =
1313 &mut region_opts.compaction;
1314 twcs.time_window = Some(window);
1315 }
1316
1317 let (opts, _) = find_dynamic_options(table_id, ®ion_opts, &schema_metadata_manager)
1318 .await
1319 .unwrap();
1320 match opts {
1321 crate::region::options::CompactionOptions::Twcs(t) => {
1322 assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1323 }
1324 }
1325 }
1326 }
1327
1328 #[tokio::test]
1329 async fn test_schedule_empty() {
1330 let env = SchedulerEnv::new().await;
1331 let (tx, _rx) = mpsc::channel(4);
1332 let mut scheduler = env.mock_compaction_scheduler(tx);
1333 let mut builder = VersionControlBuilder::new();
1334 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1335 schema_metadata_manager
1336 .register_region_table_info(
1337 builder.region_id().table_id(),
1338 "test_table",
1339 "test_catalog",
1340 "test_schema",
1341 None,
1342 kv_backend,
1343 )
1344 .await;
1345 let version_control = Arc::new(builder.build());
1347 let (output_tx, output_rx) = oneshot::channel();
1348 let waiter = OptionOutputTx::from(output_tx);
1349 let manifest_ctx = env
1350 .mock_manifest_context(version_control.current().version.metadata.clone())
1351 .await;
1352 scheduler
1353 .schedule_compaction(
1354 builder.region_id(),
1355 compact_request::Options::Regular(Default::default()),
1356 &version_control,
1357 &env.access_layer,
1358 waiter,
1359 &manifest_ctx,
1360 schema_metadata_manager.clone(),
1361 1,
1362 )
1363 .await
1364 .unwrap();
1365 let output = output_rx.await.unwrap().unwrap();
1366 assert_eq!(output, 0);
1367 assert!(scheduler.region_status.is_empty());
1368
1369 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1371 let (output_tx, output_rx) = oneshot::channel();
1372 let waiter = OptionOutputTx::from(output_tx);
1373 scheduler
1374 .schedule_compaction(
1375 builder.region_id(),
1376 compact_request::Options::Regular(Default::default()),
1377 &version_control,
1378 &env.access_layer,
1379 waiter,
1380 &manifest_ctx,
1381 schema_metadata_manager,
1382 1,
1383 )
1384 .await
1385 .unwrap();
1386 let output = output_rx.await.unwrap().unwrap();
1387 assert_eq!(output, 0);
1388 assert!(scheduler.region_status.is_empty());
1389 }
1390
1391 #[tokio::test]
1392 async fn test_schedule_on_finished() {
1393 common_telemetry::init_default_ut_logging();
1394 let job_scheduler = Arc::new(VecScheduler::default());
1395 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1396 let (tx, _rx) = mpsc::channel(4);
1397 let mut scheduler = env.mock_compaction_scheduler(tx);
1398 let mut builder = VersionControlBuilder::new();
1399 let purger = builder.file_purger();
1400 let region_id = builder.region_id();
1401
1402 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1403 schema_metadata_manager
1404 .register_region_table_info(
1405 builder.region_id().table_id(),
1406 "test_table",
1407 "test_catalog",
1408 "test_schema",
1409 None,
1410 kv_backend,
1411 )
1412 .await;
1413
1414 let end = 1000 * 1000;
1416 let version_control = Arc::new(
1417 builder
1418 .push_l0_file(0, end)
1419 .push_l0_file(10, end)
1420 .push_l0_file(50, end)
1421 .push_l0_file(80, end)
1422 .push_l0_file(90, end)
1423 .build(),
1424 );
1425 let manifest_ctx = env
1426 .mock_manifest_context(version_control.current().version.metadata.clone())
1427 .await;
1428 scheduler
1429 .schedule_compaction(
1430 region_id,
1431 compact_request::Options::Regular(Default::default()),
1432 &version_control,
1433 &env.access_layer,
1434 OptionOutputTx::none(),
1435 &manifest_ctx,
1436 schema_metadata_manager.clone(),
1437 1,
1438 )
1439 .await
1440 .unwrap();
1441 assert_eq!(1, scheduler.region_status.len());
1443 assert_eq!(1, job_scheduler.num_jobs());
1444 let data = version_control.current();
1445 let file_metas: Vec<_> = data.version.ssts.levels()[0]
1446 .files
1447 .values()
1448 .map(|file| file.meta_ref().clone())
1449 .collect();
1450
1451 apply_edit(
1453 &version_control,
1454 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1455 &file_metas,
1456 purger.clone(),
1457 );
1458 let (tx, _rx) = oneshot::channel();
1460 scheduler
1461 .schedule_compaction(
1462 region_id,
1463 compact_request::Options::Regular(Default::default()),
1464 &version_control,
1465 &env.access_layer,
1466 OptionOutputTx::new(Some(OutputTx::new(tx))),
1467 &manifest_ctx,
1468 schema_metadata_manager.clone(),
1469 1,
1470 )
1471 .await
1472 .unwrap();
1473 assert_eq!(1, scheduler.region_status.len());
1474 assert_eq!(1, job_scheduler.num_jobs());
1475 assert!(
1476 !scheduler
1477 .region_status
1478 .get(&builder.region_id())
1479 .unwrap()
1480 .waiters
1481 .is_empty()
1482 );
1483
1484 scheduler
1486 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1487 .await;
1488 assert_eq!(1, scheduler.region_status.len());
1489 assert_eq!(2, job_scheduler.num_jobs());
1490
1491 apply_edit(
1493 &version_control,
1494 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1495 &[],
1496 purger.clone(),
1497 );
1498 let (tx, _rx) = oneshot::channel();
1499 scheduler
1501 .schedule_compaction(
1502 region_id,
1503 compact_request::Options::Regular(Default::default()),
1504 &version_control,
1505 &env.access_layer,
1506 OptionOutputTx::new(Some(OutputTx::new(tx))),
1507 &manifest_ctx,
1508 schema_metadata_manager,
1509 1,
1510 )
1511 .await
1512 .unwrap();
1513 assert_eq!(2, job_scheduler.num_jobs());
1514 assert!(
1515 !scheduler
1516 .region_status
1517 .get(&builder.region_id())
1518 .unwrap()
1519 .waiters
1520 .is_empty()
1521 );
1522 }
1523
1524 #[tokio::test]
1525 async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
1526 common_telemetry::init_default_ut_logging();
1527 let env = SchedulerEnv::new()
1528 .await
1529 .scheduler(Arc::new(FailingScheduler));
1530 let (tx, _rx) = mpsc::channel(4);
1531 let mut scheduler = env.mock_compaction_scheduler(tx);
1532 let mut builder = VersionControlBuilder::new();
1533 let end = 1000 * 1000;
1534 let version_control = Arc::new(
1535 builder
1536 .push_l0_file(0, end)
1537 .push_l0_file(10, end)
1538 .push_l0_file(50, end)
1539 .push_l0_file(80, end)
1540 .push_l0_file(90, end)
1541 .build(),
1542 );
1543 let region_id = builder.region_id();
1544 let manifest_ctx = env
1545 .mock_manifest_context(version_control.current().version.metadata.clone())
1546 .await;
1547 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1548 schema_metadata_manager
1549 .register_region_table_info(
1550 builder.region_id().table_id(),
1551 "test_table",
1552 "test_catalog",
1553 "test_schema",
1554 None,
1555 kv_backend,
1556 )
1557 .await;
1558
1559 let result = scheduler
1560 .schedule_compaction(
1561 region_id,
1562 compact_request::Options::Regular(Default::default()),
1563 &version_control,
1564 &env.access_layer,
1565 OptionOutputTx::none(),
1566 &manifest_ctx,
1567 schema_metadata_manager,
1568 1,
1569 )
1570 .await;
1571
1572 assert!(result.is_err());
1573 assert!(!scheduler.region_status.contains_key(®ion_id));
1574 }
1575
1576 #[tokio::test]
1577 async fn test_manual_compaction_when_compaction_in_progress() {
1578 common_telemetry::init_default_ut_logging();
1579 let job_scheduler = Arc::new(VecScheduler::default());
1580 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1581 let (tx, _rx) = mpsc::channel(4);
1582 let mut scheduler = env.mock_compaction_scheduler(tx);
1583 let mut builder = VersionControlBuilder::new();
1584 let purger = builder.file_purger();
1585 let region_id = builder.region_id();
1586
1587 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1588 schema_metadata_manager
1589 .register_region_table_info(
1590 builder.region_id().table_id(),
1591 "test_table",
1592 "test_catalog",
1593 "test_schema",
1594 None,
1595 kv_backend,
1596 )
1597 .await;
1598
1599 let end = 1000 * 1000;
1601 let version_control = Arc::new(
1602 builder
1603 .push_l0_file(0, end)
1604 .push_l0_file(10, end)
1605 .push_l0_file(50, end)
1606 .push_l0_file(80, end)
1607 .push_l0_file(90, end)
1608 .build(),
1609 );
1610 let manifest_ctx = env
1611 .mock_manifest_context(version_control.current().version.metadata.clone())
1612 .await;
1613
1614 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1615 .files
1616 .values()
1617 .map(|file| file.meta_ref().clone())
1618 .collect();
1619
1620 apply_edit(
1622 &version_control,
1623 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1624 &file_metas,
1625 purger.clone(),
1626 );
1627
1628 scheduler
1629 .schedule_compaction(
1630 region_id,
1631 compact_request::Options::Regular(Default::default()),
1632 &version_control,
1633 &env.access_layer,
1634 OptionOutputTx::none(),
1635 &manifest_ctx,
1636 schema_metadata_manager.clone(),
1637 1,
1638 )
1639 .await
1640 .unwrap();
1641 assert_eq!(1, scheduler.region_status.len());
1643 assert_eq!(1, job_scheduler.num_jobs());
1644 assert!(
1645 scheduler
1646 .region_status
1647 .get(®ion_id)
1648 .unwrap()
1649 .pending_request
1650 .is_none()
1651 );
1652
1653 let (tx, _rx) = oneshot::channel();
1655 scheduler
1656 .schedule_compaction(
1657 region_id,
1658 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1659 &version_control,
1660 &env.access_layer,
1661 OptionOutputTx::new(Some(OutputTx::new(tx))),
1662 &manifest_ctx,
1663 schema_metadata_manager.clone(),
1664 1,
1665 )
1666 .await
1667 .unwrap();
1668 assert_eq!(1, scheduler.region_status.len());
1669 assert_eq!(1, job_scheduler.num_jobs());
1671 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1672 assert!(status.pending_request.is_some());
1673
1674 scheduler
1676 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1677 .await;
1678 assert_eq!(1, scheduler.region_status.len());
1679 assert_eq!(2, job_scheduler.num_jobs());
1680
1681 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1682 assert!(status.pending_request.is_none());
1683 }
1684
1685 #[tokio::test]
1686 async fn test_compaction_bypass_in_staging_mode() {
1687 let env = SchedulerEnv::new().await;
1688 let (tx, _rx) = mpsc::channel(4);
1689 let mut scheduler = env.mock_compaction_scheduler(tx);
1690
1691 let builder = VersionControlBuilder::new();
1693 let version_control = Arc::new(builder.build());
1694 let region_id = version_control.current().version.metadata.region_id;
1695
1696 let staging_manifest_ctx = {
1698 let manager = RegionManifestManager::new(
1699 version_control.current().version.metadata.clone(),
1700 0,
1701 RegionManifestOptions {
1702 manifest_dir: "".to_string(),
1703 object_store: env.access_layer.object_store().clone(),
1704 compress_type: CompressionType::Uncompressed,
1705 checkpoint_distance: 10,
1706 remove_file_options: Default::default(),
1707 manifest_cache: None,
1708 },
1709 FormatType::PrimaryKey,
1710 &Default::default(),
1711 )
1712 .await
1713 .unwrap();
1714 Arc::new(ManifestContext::new(
1715 manager,
1716 RegionRoleState::Leader(RegionLeaderState::Staging),
1717 ))
1718 };
1719
1720 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1721
1722 let (tx, rx) = oneshot::channel();
1724 scheduler
1725 .schedule_compaction(
1726 region_id,
1727 compact_request::Options::Regular(Default::default()),
1728 &version_control,
1729 &env.access_layer,
1730 OptionOutputTx::new(Some(OutputTx::new(tx))),
1731 &staging_manifest_ctx,
1732 schema_metadata_manager,
1733 1,
1734 )
1735 .await
1736 .unwrap();
1737
1738 let result = rx.await.unwrap();
1739 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1741 }
1742
1743 #[tokio::test]
1744 async fn test_add_ddl_request_to_pending() {
1745 let env = SchedulerEnv::new().await;
1746 let (tx, _rx) = mpsc::channel(4);
1747 let mut scheduler = env.mock_compaction_scheduler(tx);
1748 let builder = VersionControlBuilder::new();
1749 let version_control = Arc::new(builder.build());
1750 let region_id = builder.region_id();
1751
1752 scheduler.region_status.insert(
1753 region_id,
1754 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1755 );
1756 scheduler
1757 .region_status
1758 .get_mut(®ion_id)
1759 .unwrap()
1760 .start_local_task();
1761
1762 let (output_tx, _output_rx) = oneshot::channel();
1763 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1764 region_id,
1765 sender: OptionOutputTx::from(output_tx),
1766 request: crate::request::DdlRequest::EnterStaging(
1767 store_api::region_request::EnterStagingRequest {
1768 partition_directive:
1769 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1770 },
1771 ),
1772 });
1773
1774 assert!(scheduler.has_pending_ddls(region_id));
1775 }
1776
1777 #[tokio::test]
1778 async fn test_request_cancel_state_transitions() {
1779 let env = SchedulerEnv::new().await;
1780 let builder = VersionControlBuilder::new();
1781 let region_id = builder.region_id();
1782 let version_control = Arc::new(builder.build());
1783 let mut status =
1784 CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1785 let state = status.start_local_task();
1786
1787 assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
1788 assert!(state.cancel_handle().is_cancelled());
1789 assert_eq!(
1790 status.request_cancel(),
1791 RequestCancelResult::AlreadyCancelling
1792 );
1793
1794 assert!(!state.mark_commit_started());
1795 assert_eq!(
1796 status.request_cancel(),
1797 RequestCancelResult::AlreadyCancelling
1798 );
1799
1800 assert!(status.clear_running_task());
1801 assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_request_cancel_remote_compaction_is_too_late() {
1806 let env = SchedulerEnv::new().await;
1807 let builder = VersionControlBuilder::new();
1808 let region_id = builder.region_id();
1809 let version_control = Arc::new(builder.build());
1810 let mut status =
1811 CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1812
1813 status.start_remote_task();
1814
1815 assert_eq!(
1816 status.request_cancel(),
1817 RequestCancelResult::TooLateToCancel
1818 );
1819 assert!(status.active_compaction.is_some());
1820 }
1821
1822 #[tokio::test]
1823 async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
1824 let job_scheduler = Arc::new(VecScheduler::default());
1825 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1826 let (tx, _rx) = mpsc::channel(4);
1827 let mut scheduler = env.mock_compaction_scheduler(tx);
1828 let builder = VersionControlBuilder::new();
1829 let version_control = Arc::new(builder.build());
1830 let region_id = builder.region_id();
1831 let _manifest_ctx = env
1832 .mock_manifest_context(version_control.current().version.metadata.clone())
1833 .await;
1834 let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1835
1836 scheduler.region_status.insert(
1837 region_id,
1838 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1839 );
1840 scheduler
1841 .region_status
1842 .get_mut(®ion_id)
1843 .unwrap()
1844 .start_local_task();
1845
1846 let (output_tx, _output_rx) = oneshot::channel();
1847 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1848 region_id,
1849 sender: OptionOutputTx::from(output_tx),
1850 request: crate::request::DdlRequest::EnterStaging(
1851 store_api::region_request::EnterStagingRequest {
1852 partition_directive:
1853 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1854 },
1855 ),
1856 });
1857
1858 let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
1859
1860 assert_eq!(pending_ddls.len(), 1);
1861 assert!(!scheduler.has_pending_ddls(region_id));
1862 assert!(!scheduler.region_status.contains_key(®ion_id));
1863 assert_eq!(job_scheduler.num_jobs(), 0);
1864 }
1865
1866 #[tokio::test]
1867 async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
1868 let job_scheduler = Arc::new(VecScheduler::default());
1869 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1870 let (tx, _rx) = mpsc::channel(4);
1871 let mut scheduler = env.mock_compaction_scheduler(tx);
1872 let builder = VersionControlBuilder::new();
1873 let version_control = Arc::new(builder.build());
1874 let region_id = builder.region_id();
1875 let _manifest_ctx = env
1876 .mock_manifest_context(version_control.current().version.metadata.clone())
1877 .await;
1878 let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1879
1880 scheduler.region_status.insert(
1881 region_id,
1882 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1883 );
1884 let status = scheduler.region_status.get_mut(®ion_id).unwrap();
1885 status.start_local_task();
1886 let (manual_tx, manual_rx) = oneshot::channel();
1887 status.set_pending_request(PendingCompaction {
1888 options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1889 waiter: OptionOutputTx::from(manual_tx),
1890 max_parallelism: 1,
1891 });
1892
1893 let (output_tx, _output_rx) = oneshot::channel();
1894 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1895 region_id,
1896 sender: OptionOutputTx::from(output_tx),
1897 request: crate::request::DdlRequest::EnterStaging(
1898 store_api::region_request::EnterStagingRequest {
1899 partition_directive:
1900 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1901 },
1902 ),
1903 });
1904
1905 let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
1906
1907 assert_eq!(pending_ddls.len(), 1);
1908 assert!(!scheduler.region_status.contains_key(®ion_id));
1909 assert_eq!(job_scheduler.num_jobs(), 0);
1910 assert_matches!(manual_rx.await.unwrap(), Err(_));
1911 }
1912
1913 #[tokio::test]
1914 async fn test_pending_ddl_request_failed_on_compaction_failed() {
1915 let env = SchedulerEnv::new().await;
1916 let (tx, _rx) = mpsc::channel(4);
1917 let mut scheduler = env.mock_compaction_scheduler(tx);
1918 let builder = VersionControlBuilder::new();
1919 let version_control = Arc::new(builder.build());
1920 let region_id = builder.region_id();
1921
1922 scheduler.region_status.insert(
1923 region_id,
1924 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1925 );
1926
1927 let (output_tx, output_rx) = oneshot::channel();
1928 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1929 region_id,
1930 sender: OptionOutputTx::from(output_tx),
1931 request: crate::request::DdlRequest::EnterStaging(
1932 store_api::region_request::EnterStagingRequest {
1933 partition_directive:
1934 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1935 },
1936 ),
1937 });
1938
1939 assert!(scheduler.has_pending_ddls(region_id));
1940 scheduler
1941 .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1942
1943 assert!(!scheduler.has_pending_ddls(region_id));
1944 let result = output_rx.await.unwrap();
1945 assert_matches!(result, Err(_));
1946 }
1947
1948 #[tokio::test]
1949 async fn test_pending_ddl_request_failed_on_region_closed() {
1950 let env = SchedulerEnv::new().await;
1951 let (tx, _rx) = mpsc::channel(4);
1952 let mut scheduler = env.mock_compaction_scheduler(tx);
1953 let builder = VersionControlBuilder::new();
1954 let version_control = Arc::new(builder.build());
1955 let region_id = builder.region_id();
1956
1957 scheduler.region_status.insert(
1958 region_id,
1959 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1960 );
1961
1962 let (output_tx, output_rx) = oneshot::channel();
1963 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1964 region_id,
1965 sender: OptionOutputTx::from(output_tx),
1966 request: crate::request::DdlRequest::EnterStaging(
1967 store_api::region_request::EnterStagingRequest {
1968 partition_directive:
1969 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1970 },
1971 ),
1972 });
1973
1974 assert!(scheduler.has_pending_ddls(region_id));
1975 scheduler.on_region_closed(region_id);
1976
1977 assert!(!scheduler.has_pending_ddls(region_id));
1978 let result = output_rx.await.unwrap();
1979 assert_matches!(result, Err(_));
1980 }
1981
1982 #[tokio::test]
1983 async fn test_pending_ddl_request_failed_on_region_dropped() {
1984 let env = SchedulerEnv::new().await;
1985 let (tx, _rx) = mpsc::channel(4);
1986 let mut scheduler = env.mock_compaction_scheduler(tx);
1987 let builder = VersionControlBuilder::new();
1988 let version_control = Arc::new(builder.build());
1989 let region_id = builder.region_id();
1990
1991 scheduler.region_status.insert(
1992 region_id,
1993 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1994 );
1995
1996 let (output_tx, output_rx) = oneshot::channel();
1997 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1998 region_id,
1999 sender: OptionOutputTx::from(output_tx),
2000 request: crate::request::DdlRequest::EnterStaging(
2001 store_api::region_request::EnterStagingRequest {
2002 partition_directive:
2003 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2004 },
2005 ),
2006 });
2007
2008 assert!(scheduler.has_pending_ddls(region_id));
2009 scheduler.on_region_dropped(region_id);
2010
2011 assert!(!scheduler.has_pending_ddls(region_id));
2012 let result = output_rx.await.unwrap();
2013 assert_matches!(result, Err(_));
2014 }
2015
2016 #[tokio::test]
2017 async fn test_pending_ddl_request_failed_on_region_truncated() {
2018 let env = SchedulerEnv::new().await;
2019 let (tx, _rx) = mpsc::channel(4);
2020 let mut scheduler = env.mock_compaction_scheduler(tx);
2021 let builder = VersionControlBuilder::new();
2022 let version_control = Arc::new(builder.build());
2023 let region_id = builder.region_id();
2024
2025 scheduler.region_status.insert(
2026 region_id,
2027 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2028 );
2029
2030 let (output_tx, output_rx) = oneshot::channel();
2031 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2032 region_id,
2033 sender: OptionOutputTx::from(output_tx),
2034 request: crate::request::DdlRequest::EnterStaging(
2035 store_api::region_request::EnterStagingRequest {
2036 partition_directive:
2037 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2038 },
2039 ),
2040 });
2041
2042 assert!(scheduler.has_pending_ddls(region_id));
2043 scheduler.on_region_truncated(region_id);
2044
2045 assert!(!scheduler.has_pending_ddls(region_id));
2046 let result = output_rx.await.unwrap();
2047 assert_matches!(result, Err(_));
2048 }
2049
2050 #[tokio::test]
2051 async fn test_on_compaction_finished_returns_pending_ddl_requests() {
2052 let job_scheduler = Arc::new(VecScheduler::default());
2053 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2054 let (tx, _rx) = mpsc::channel(4);
2055 let mut scheduler = env.mock_compaction_scheduler(tx);
2056 let builder = VersionControlBuilder::new();
2057 let version_control = Arc::new(builder.build());
2058 let region_id = builder.region_id();
2059 let manifest_ctx = env
2060 .mock_manifest_context(version_control.current().version.metadata.clone())
2061 .await;
2062 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2063
2064 scheduler.region_status.insert(
2065 region_id,
2066 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2067 );
2068 scheduler
2069 .region_status
2070 .get_mut(®ion_id)
2071 .unwrap()
2072 .start_local_task();
2073
2074 let (output_tx, _output_rx) = oneshot::channel();
2075 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2076 region_id,
2077 sender: OptionOutputTx::from(output_tx),
2078 request: crate::request::DdlRequest::EnterStaging(
2079 store_api::region_request::EnterStagingRequest {
2080 partition_directive:
2081 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2082 },
2083 ),
2084 });
2085
2086 let pending_ddls = scheduler
2087 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2088 .await;
2089
2090 assert_eq!(pending_ddls.len(), 1);
2091 assert!(!scheduler.has_pending_ddls(region_id));
2092 assert!(!scheduler.region_status.contains_key(®ion_id));
2093 assert_eq!(job_scheduler.num_jobs(), 0);
2094 }
2095
2096 #[tokio::test]
2097 async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
2098 let env = SchedulerEnv::new().await;
2099 let (tx, _rx) = mpsc::channel(4);
2100 let mut scheduler = env.mock_compaction_scheduler(tx);
2101 let builder = VersionControlBuilder::new();
2102 let version_control = Arc::new(builder.build());
2103 let region_id = builder.region_id();
2104 let manifest_ctx = env
2105 .mock_manifest_context(version_control.current().version.metadata.clone())
2106 .await;
2107 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2108
2109 let (manual_tx, manual_rx) = oneshot::channel();
2110 let mut status =
2111 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2112 status.start_local_task();
2113 status.set_pending_request(PendingCompaction {
2114 options: compact_request::Options::Regular(Default::default()),
2115 waiter: OptionOutputTx::from(manual_tx),
2116 max_parallelism: 1,
2117 });
2118 scheduler.region_status.insert(region_id, status);
2119
2120 let (ddl_tx, _ddl_rx) = oneshot::channel();
2121 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2122 region_id,
2123 sender: OptionOutputTx::from(ddl_tx),
2124 request: crate::request::DdlRequest::EnterStaging(
2125 store_api::region_request::EnterStagingRequest {
2126 partition_directive:
2127 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2128 },
2129 ),
2130 });
2131
2132 let pending_ddls = scheduler
2133 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2134 .await;
2135
2136 assert_eq!(pending_ddls.len(), 1);
2137 assert!(!scheduler.region_status.contains_key(®ion_id));
2138 assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
2139 }
2140
2141 #[tokio::test]
2142 async fn test_on_compaction_finished_returns_empty_when_region_absent() {
2143 let env = SchedulerEnv::new().await;
2144 let (tx, _rx) = mpsc::channel(4);
2145 let mut scheduler = env.mock_compaction_scheduler(tx);
2146 let builder = VersionControlBuilder::new();
2147 let region_id = builder.region_id();
2148 let version_control = Arc::new(builder.build());
2149 let manifest_ctx = env
2150 .mock_manifest_context(version_control.current().version.metadata.clone())
2151 .await;
2152 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2153
2154 let pending_ddls = scheduler
2155 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2156 .await;
2157
2158 assert!(pending_ddls.is_empty());
2159 }
2160
2161 #[tokio::test]
2162 async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
2163 let env = SchedulerEnv::new()
2164 .await
2165 .scheduler(Arc::new(FailingScheduler));
2166 let (tx, _rx) = mpsc::channel(4);
2167 let mut scheduler = env.mock_compaction_scheduler(tx);
2168 let mut builder = VersionControlBuilder::new();
2169 let end = 1000 * 1000;
2170 let version_control = Arc::new(
2171 builder
2172 .push_l0_file(0, end)
2173 .push_l0_file(10, end)
2174 .push_l0_file(50, end)
2175 .push_l0_file(80, end)
2176 .push_l0_file(90, end)
2177 .build(),
2178 );
2179 let region_id = builder.region_id();
2180 let manifest_ctx = env
2181 .mock_manifest_context(version_control.current().version.metadata.clone())
2182 .await;
2183 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2184
2185 let (manual_tx, manual_rx) = oneshot::channel();
2186 let mut status =
2187 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2188 status.start_local_task();
2189 status.set_pending_request(PendingCompaction {
2190 options: compact_request::Options::Regular(Default::default()),
2191 waiter: OptionOutputTx::from(manual_tx),
2192 max_parallelism: 1,
2193 });
2194 scheduler.region_status.insert(region_id, status);
2195
2196 let (ddl_tx, ddl_rx) = oneshot::channel();
2197 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2198 region_id,
2199 sender: OptionOutputTx::from(ddl_tx),
2200 request: crate::request::DdlRequest::EnterStaging(
2201 store_api::region_request::EnterStagingRequest {
2202 partition_directive:
2203 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2204 },
2205 ),
2206 });
2207
2208 let pending_ddls = scheduler
2209 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2210 .await;
2211
2212 assert!(pending_ddls.is_empty());
2213 assert!(!scheduler.region_status.contains_key(®ion_id));
2214 assert!(manual_rx.await.is_err());
2215 assert_matches!(ddl_rx.await.unwrap(), Err(_));
2216 }
2217
2218 #[tokio::test]
2219 async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
2220 let env = SchedulerEnv::new().await;
2221 let (tx, _rx) = mpsc::channel(4);
2222 let mut scheduler = env.mock_compaction_scheduler(tx);
2223 let builder = VersionControlBuilder::new();
2224 let version_control = Arc::new(builder.build());
2225 let region_id = builder.region_id();
2226 let manifest_ctx = env
2227 .mock_manifest_context(version_control.current().version.metadata.clone())
2228 .await;
2229 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2230
2231 scheduler.region_status.insert(
2232 region_id,
2233 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2234 );
2235 scheduler
2236 .region_status
2237 .get_mut(®ion_id)
2238 .unwrap()
2239 .start_local_task();
2240
2241 let pending_ddls = scheduler
2242 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2243 .await;
2244
2245 assert!(pending_ddls.is_empty());
2246 assert!(!scheduler.region_status.contains_key(®ion_id));
2247 }
2248
2249 #[tokio::test]
2250 async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
2251 let env = SchedulerEnv::new()
2252 .await
2253 .scheduler(Arc::new(FailingScheduler));
2254 let (tx, _rx) = mpsc::channel(4);
2255 let mut scheduler = env.mock_compaction_scheduler(tx);
2256 let mut builder = VersionControlBuilder::new();
2257 let end = 1000 * 1000;
2258 let version_control = Arc::new(
2259 builder
2260 .push_l0_file(0, end)
2261 .push_l0_file(10, end)
2262 .push_l0_file(50, end)
2263 .push_l0_file(80, end)
2264 .push_l0_file(90, end)
2265 .build(),
2266 );
2267 let region_id = builder.region_id();
2268 let manifest_ctx = env
2269 .mock_manifest_context(version_control.current().version.metadata.clone())
2270 .await;
2271 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2272
2273 scheduler.region_status.insert(
2274 region_id,
2275 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2276 );
2277 scheduler
2278 .region_status
2279 .get_mut(®ion_id)
2280 .unwrap()
2281 .start_local_task();
2282
2283 let pending_ddls = scheduler
2284 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2285 .await;
2286
2287 assert!(pending_ddls.is_empty());
2288 assert!(!scheduler.region_status.contains_key(®ion_id));
2289 }
2290
2291 #[tokio::test]
2292 async fn test_concurrent_memory_competition() {
2293 let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); let barrier = Arc::new(Barrier::new(3));
2295 let mut handles = vec![];
2296
2297 for _i in 0..3 {
2299 let mgr = manager.clone();
2300 let bar = barrier.clone();
2301 let handle = tokio::spawn(async move {
2302 bar.wait().await; mgr.try_acquire(2 * 1024 * 1024)
2304 });
2305 handles.push(handle);
2306 }
2307
2308 let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
2309 .await
2310 .into_iter()
2311 .map(|r| r.unwrap())
2312 .collect();
2313
2314 let succeeded = results.iter().filter(|r| r.is_some()).count();
2316 let failed = results.iter().filter(|r| r.is_none()).count();
2317
2318 assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
2319 assert_eq!(failed, 2, "Expected 2 tasks to fail");
2320
2321 drop(results);
2323 assert_eq!(manager.used_bytes(), 0);
2324 }
2325}