1mod buckets;
16pub mod compactor;
17pub mod memory_manager;
18pub mod picker;
19pub mod run;
20mod task;
21#[cfg(test)]
22mod test_util;
23mod twcs;
24mod window;
25
26use std::collections::HashMap;
27use std::sync::{Arc, Mutex};
28use std::time::Instant;
29
30use api::v1::region::compact_request;
31use api::v1::region::compact_request::Options;
32use arrow_schema::Schema;
33use common_base::Plugins;
34use common_base::cancellation::CancellationHandle;
35use common_memory_manager::OnExhaustedPolicy;
36use common_meta::key::SchemaMetadataManagerRef;
37use common_telemetry::{debug, error, info, warn};
38use common_time::range::TimestampRange;
39use common_time::timestamp::TimeUnit;
40use common_time::{TimeToLive, Timestamp};
41use datafusion_common::ScalarValue;
42use datafusion_expr::Expr;
43use datatypes::extension::json::is_structured_json_field;
44use datatypes::types::json_type::JsonNativeType;
45use parquet::arrow::parquet_to_arrow_schema;
46use parquet::file::metadata::PageIndexPolicy;
47use serde::{Deserialize, Serialize};
48use snafu::{OptionExt, ResultExt};
49use store_api::metadata::RegionMetadataRef;
50use store_api::storage::RegionId;
51use task::MAX_PARALLEL_COMPACTION;
52use tokio::sync::mpsc::{self, Sender};
53
54use crate::access_layer::AccessLayerRef;
55use crate::cache::{CacheManagerRef, CacheStrategy};
56use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
57use crate::compaction::memory_manager::CompactionMemoryManager;
58use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
59use crate::compaction::task::CompactionTaskImpl;
60use crate::config::MitoConfig;
61use crate::error::{
62 CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error,
63 GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, ParquetToArrowSchemaSnafu,
64 RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
65 TimeRangePredicateOverflowSnafu, TimeoutSnafu,
66};
67use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
68use crate::read::FlatSource;
69use crate::read::flat_projection::FlatProjectionMapper;
70use crate::read::read_columns::ReadColumns;
71use crate::read::scan_region::{PredicateGroup, ScanInput};
72use crate::read::seq_scan::SeqScan;
73use crate::region::options::{MergeMode, RegionOptions};
74use crate::region::version::VersionControlRef;
75use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
76use crate::request::{OptionOutputTx, OutputTx, SenderDdlRequest, WorkerRequestWithTime};
77use crate::schedule::remote_job_scheduler::{
78 CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
79};
80use crate::schedule::scheduler::SchedulerRef;
81use crate::sst::file::{FileHandle, FileMeta, Level};
82use crate::sst::parquet::reader::MetadataCacheMetrics;
83use crate::sst::version::LevelMeta;
84use crate::worker::WorkerListener;
85
86pub struct CompactionRequest {
88 pub(crate) engine_config: Arc<MitoConfig>,
89 pub(crate) current_version: CompactionVersion,
90 pub(crate) access_layer: AccessLayerRef,
91 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
93 pub(crate) waiters: Vec<OutputTx>,
95 pub(crate) start_time: Instant,
97 pub(crate) cache_manager: CacheManagerRef,
98 pub(crate) manifest_ctx: ManifestContextRef,
99 pub(crate) listener: WorkerListener,
100 pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
101 pub(crate) max_parallelism: usize,
102}
103
104impl CompactionRequest {
105 pub(crate) fn region_id(&self) -> RegionId {
106 self.current_version.metadata.region_id
107 }
108}
109
110pub(crate) struct CompactionScheduler {
112 scheduler: SchedulerRef,
113 region_status: HashMap<RegionId, CompactionStatus>,
115 request_sender: Sender<WorkerRequestWithTime>,
117 cache_manager: CacheManagerRef,
118 engine_config: Arc<MitoConfig>,
119 memory_manager: Arc<CompactionMemoryManager>,
120 memory_policy: OnExhaustedPolicy,
121 listener: WorkerListener,
122 plugins: Plugins,
124}
125
126impl CompactionScheduler {
127 #[allow(clippy::too_many_arguments)]
128 pub(crate) fn new(
129 scheduler: SchedulerRef,
130 request_sender: Sender<WorkerRequestWithTime>,
131 cache_manager: CacheManagerRef,
132 engine_config: Arc<MitoConfig>,
133 listener: WorkerListener,
134 plugins: Plugins,
135 memory_manager: Arc<CompactionMemoryManager>,
136 memory_policy: OnExhaustedPolicy,
137 ) -> Self {
138 Self {
139 scheduler,
140 region_status: HashMap::new(),
141 request_sender,
142 cache_manager,
143 engine_config,
144 memory_manager,
145 memory_policy,
146 listener,
147 plugins,
148 }
149 }
150
151 #[allow(clippy::too_many_arguments)]
154 pub(crate) async fn schedule_compaction(
155 &mut self,
156 region_id: RegionId,
157 compact_options: compact_request::Options,
158 version_control: &VersionControlRef,
159 access_layer: &AccessLayerRef,
160 waiter: OptionOutputTx,
161 manifest_ctx: &ManifestContextRef,
162 schema_metadata_manager: SchemaMetadataManagerRef,
163 max_parallelism: usize,
164 ) -> Result<bool> {
165 let current_state = manifest_ctx.current_state();
167 if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
168 info!(
169 "Skipping compaction for region {} in staging mode, options: {:?}",
170 region_id, compact_options
171 );
172 waiter.send(Ok(0));
173 return Ok(false);
174 }
175
176 if let Some(status) = self.region_status.get_mut(®ion_id) {
177 match compact_options {
178 Options::Regular(_) => {
179 status.merge_waiter(waiter);
181 }
182 options @ Options::StrictWindow(_) => {
183 status.set_pending_request(PendingCompaction {
185 options,
186 waiter,
187 max_parallelism,
188 });
189 info!(
190 "Region {} is compacting, manually compaction will be re-scheduled.",
191 region_id
192 );
193 }
194 }
195 return Ok(false);
196 }
197
198 let mut status =
200 CompactionStatus::new(region_id, version_control.clone(), access_layer.clone());
201 let request = status.new_compaction_request(
202 self.request_sender.clone(),
203 waiter,
204 self.engine_config.clone(),
205 self.cache_manager.clone(),
206 manifest_ctx,
207 self.listener.clone(),
208 schema_metadata_manager,
209 max_parallelism,
210 );
211
212 match self
213 .schedule_compaction_request(request, compact_options)
214 .await
215 {
216 Ok(Some(active_compaction)) => {
217 status.active_compaction = Some(active_compaction);
221 self.region_status.insert(region_id, status);
222
223 self.listener.on_compaction_scheduled(region_id);
224 Ok(true)
225 }
226 Ok(None) => Ok(false),
227 Err(e) => Err(e),
228 }
229 }
230
231 pub(crate) async fn handle_pending_compaction_request(
235 &mut self,
236 region_id: RegionId,
237 manifest_ctx: &ManifestContextRef,
238 schema_metadata_manager: SchemaMetadataManagerRef,
239 ) -> bool {
240 let Some(status) = self.region_status.get_mut(®ion_id) else {
241 return true;
242 };
243
244 let Some(pending_request) = std::mem::take(&mut status.pending_request) else {
247 return false;
248 };
249
250 let PendingCompaction {
251 options,
252 waiter,
253 max_parallelism,
254 } = pending_request;
255
256 let request = {
257 status.new_compaction_request(
258 self.request_sender.clone(),
259 waiter,
260 self.engine_config.clone(),
261 self.cache_manager.clone(),
262 manifest_ctx,
263 self.listener.clone(),
264 schema_metadata_manager,
265 max_parallelism,
266 )
267 };
268
269 match self.schedule_compaction_request(request, options).await {
270 Ok(Some(active_compaction)) => {
271 let status = self.region_status.get_mut(®ion_id).unwrap();
272 status.active_compaction = Some(active_compaction);
273 debug!(
274 "Successfully scheduled manual compaction for region id: {}",
275 region_id
276 );
277 true
278 }
279 Ok(None) => {
280 false
283 }
284 Err(e) => {
285 error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
286 self.remove_region_on_failure(region_id, Arc::new(e));
287 true
288 }
289 }
290 }
291
292 pub(crate) async fn on_compaction_finished(
294 &mut self,
295 region_id: RegionId,
296 manifest_ctx: &ManifestContextRef,
297 schema_metadata_manager: SchemaMetadataManagerRef,
298 ) -> Vec<SenderDdlRequest> {
299 let Some(status) = self.region_status.get_mut(®ion_id) else {
300 return Vec::new();
301 };
302 status.clear_running_task();
303
304 if self
307 .handle_pending_compaction_request(
308 region_id,
309 manifest_ctx,
310 schema_metadata_manager.clone(),
311 )
312 .await
313 {
314 return Vec::new();
315 }
316
317 let Some(status) = self.region_status.get_mut(®ion_id) else {
318 return Vec::new();
321 };
322
323 for waiter in std::mem::take(&mut status.waiters) {
324 waiter.send(Ok(0));
325 }
326
327 let pending_ddl_requests = std::mem::take(&mut status.pending_ddl_requests);
329 if !pending_ddl_requests.is_empty() {
330 self.region_status.remove(®ion_id);
331 return pending_ddl_requests;
334 }
335 Vec::new()
336 }
337
338 pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool {
339 self.region_status
340 .get(®ion_id)
341 .map(|status| status.active_compaction.is_some())
342 .unwrap_or(false)
343 }
344
345 pub(crate) async fn schedule_next_compaction(
348 &mut self,
349 region_id: RegionId,
350 manifest_ctx: &ManifestContextRef,
351 schema_metadata_manager: SchemaMetadataManagerRef,
352 ) -> bool {
353 let Some(status) = self.region_status.get_mut(®ion_id) else {
354 return false;
355 };
356
357 let request = status.new_compaction_request(
359 self.request_sender.clone(),
360 OptionOutputTx::none(),
361 self.engine_config.clone(),
362 self.cache_manager.clone(),
363 manifest_ctx,
364 self.listener.clone(),
365 schema_metadata_manager,
366 MAX_PARALLEL_COMPACTION,
367 );
368
369 match self
371 .schedule_compaction_request(
372 request,
373 compact_request::Options::Regular(Default::default()),
374 )
375 .await
376 {
377 Ok(Some(active_compaction)) => {
378 self.region_status
379 .get_mut(®ion_id)
380 .unwrap()
381 .active_compaction = Some(active_compaction);
382 debug!(
383 "Successfully scheduled next compaction for region id: {}",
384 region_id
385 );
386 true
387 }
388 Ok(None) => {
389 self.region_status.remove(®ion_id);
393 false
394 }
395 Err(e) => {
396 error!(e; "Failed to schedule next compaction for region {}", region_id);
397 self.remove_region_on_failure(region_id, Arc::new(e));
398 false
399 }
400 }
401 }
402
403 pub(crate) async fn on_compaction_cancelled(
405 &mut self,
406 region_id: RegionId,
407 ) -> Vec<SenderDdlRequest> {
408 self.remove_region_on_cancel(region_id)
409 }
410
411 pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
413 error!(err; "Region {} failed to compact, cancel all pending tasks", region_id);
414 self.remove_region_on_failure(region_id, err);
415 }
416
417 pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
419 self.remove_region_on_failure(
420 region_id,
421 Arc::new(RegionDroppedSnafu { region_id }.build()),
422 );
423 }
424
425 pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
427 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
428 }
429
430 pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
432 self.remove_region_on_failure(
433 region_id,
434 Arc::new(RegionTruncatedSnafu { region_id }.build()),
435 );
436 }
437
438 pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
443 debug!(
444 "Added pending DDL request for region: {}, ddl: {:?}",
445 request.region_id, request.request
446 );
447 let status = self.region_status.get_mut(&request.region_id).unwrap();
448 status.pending_ddl_requests.push(request);
449 }
450
451 #[cfg(test)]
452 pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
453 let has_pending = self
454 .region_status
455 .get(®ion_id)
456 .map(|status| !status.pending_ddl_requests.is_empty())
457 .unwrap_or(false);
458 debug!(
459 "Checked pending DDL requests for region: {}, has_pending: {}",
460 region_id, has_pending
461 );
462 has_pending
463 }
464
465 pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult {
466 let Some(status) = self.region_status.get_mut(®ion_id) else {
467 return RequestCancelResult::NotRunning;
468 };
469
470 status.request_cancel()
471 }
472
473 async fn schedule_compaction_request(
478 &mut self,
479 request: CompactionRequest,
480 options: compact_request::Options,
481 ) -> Result<Option<ActiveCompaction>> {
482 let region_id = request.region_id();
483 let (dynamic_compaction_opts, ttl) = find_dynamic_options(
484 region_id,
485 &request.current_version.options,
486 &request.schema_metadata_manager,
487 )
488 .await
489 .unwrap_or_else(|e| {
490 warn!(e; "Failed to find dynamic options for region: {}", region_id);
491 (
492 request.current_version.options.compaction.clone(),
493 request.current_version.options.ttl.unwrap_or_default(),
494 )
495 });
496
497 let picker = new_picker(
498 &options,
499 &dynamic_compaction_opts,
500 request.current_version.options.append_mode,
501 Some(self.engine_config.max_background_compactions),
502 );
503 let region_id = request.region_id();
504 let CompactionRequest {
505 engine_config,
506 current_version,
507 access_layer,
508 request_sender,
509 waiters,
510 start_time,
511 cache_manager,
512 manifest_ctx,
513 listener,
514 schema_metadata_manager: _,
515 max_parallelism,
516 } = request;
517
518 debug!(
519 "Pick compaction strategy {:?} for region: {}, ttl: {:?}",
520 picker, region_id, ttl
521 );
522
523 let compaction_region = CompactionRegion {
524 region_id,
525 current_version: current_version.clone(),
526 region_options: RegionOptions {
527 compaction: dynamic_compaction_opts.clone(),
528 ..current_version.options.clone()
529 },
530 engine_config: engine_config.clone(),
531 region_metadata: current_version.metadata.clone(),
532 cache_manager: cache_manager.clone(),
533 access_layer: access_layer.clone(),
534 manifest_ctx: manifest_ctx.clone(),
535 file_purger: None,
536 ttl: Some(ttl),
537 max_parallelism,
538 plugins: self.plugins.clone(),
539 };
540
541 let picker_output = {
542 let _pick_timer = COMPACTION_STAGE_ELAPSED
543 .with_label_values(&["pick"])
544 .start_timer();
545 picker.pick(&compaction_region)
546 };
547
548 let picker_output = if let Some(picker_output) = picker_output {
549 picker_output
550 } else {
551 for waiter in waiters {
553 waiter.send(Ok(0));
554 }
555 return Ok(None);
556 };
557
558 let waiters = if dynamic_compaction_opts.remote_compaction() {
561 if let Some(remote_job_scheduler) = &self.plugins.get::<RemoteJobSchedulerRef>() {
562 let remote_compaction_job = CompactionJob {
563 compaction_region: compaction_region.clone(),
564 picker_output: picker_output.clone(),
565 start_time,
566 waiters,
567 ttl,
568 };
569
570 let result = remote_job_scheduler
571 .schedule(
572 RemoteJob::CompactionJob(remote_compaction_job),
573 Box::new(DefaultNotifier {
574 request_sender: request_sender.clone(),
575 }),
576 )
577 .await;
578
579 match result {
580 Ok(job_id) => {
581 info!(
582 "Scheduled remote compaction job {} for region {}",
583 job_id, region_id
584 );
585 INFLIGHT_COMPACTION_COUNT.inc();
586 return Ok(Some(ActiveCompaction::Remote));
587 }
588 Err(e) => {
589 if !dynamic_compaction_opts.fallback_to_local() {
590 error!(e; "Failed to schedule remote compaction job for region {}", region_id);
591 return RemoteCompactionSnafu {
592 region_id,
593 job_id: None,
594 reason: e.reason,
595 }
596 .fail();
597 }
598
599 error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
600
601 e.waiters
603 }
604 }
605 } else {
606 debug!(
607 "Remote compaction is not enabled, fallback to local compaction for region {}",
608 region_id
609 );
610 waiters
611 }
612 } else {
613 waiters
614 };
615
616 let estimated_bytes = estimate_compaction_bytes(&picker_output);
618
619 let cancel_handle = Arc::new(CancellationHandle::default());
620 let state = LocalCompactionState::new(cancel_handle.clone());
621 let local_compaction_task = Box::new(CompactionTaskImpl {
622 state: state.clone(),
623 request_sender,
624 waiters,
625 start_time,
626 listener,
627 picker_output,
628 compaction_region,
629 compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())),
630 memory_manager: self.memory_manager.clone(),
631 memory_policy: self.memory_policy,
632 estimated_memory_bytes: estimated_bytes,
633 });
634
635 self.submit_compaction_task(local_compaction_task, region_id)
636 .map(|_| Some(ActiveCompaction::Local { state }))
637 }
638
639 fn submit_compaction_task(
640 &mut self,
641 mut task: Box<CompactionTaskImpl>,
642 region_id: RegionId,
643 ) -> Result<()> {
644 self.scheduler
645 .schedule(Box::pin(async move {
646 INFLIGHT_COMPACTION_COUNT.inc();
647 task.run().await;
648 INFLIGHT_COMPACTION_COUNT.dec();
649 }))
650 .inspect_err(
651 |e| error!(e; "Failed to submit compaction request for region {}", region_id),
652 )
653 }
654
655 fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
656 let Some(status) = self.region_status.remove(®ion_id) else {
658 return;
659 };
660
661 status.on_failure(err);
663 }
664
665 fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec<SenderDdlRequest> {
666 let Some(status) = self.region_status.remove(®ion_id) else {
667 return Vec::new();
668 };
669
670 status.on_cancel()
671 }
672}
673
674#[derive(Debug, Clone)]
675pub(crate) struct LocalCompactionState {
676 cancel_handle: Arc<CancellationHandle>,
677 commit_started: Arc<Mutex<bool>>,
678}
679
680#[derive(Debug)]
681enum ActiveCompaction {
682 Local { state: LocalCompactionState },
683 Remote,
684}
685
686impl LocalCompactionState {
687 fn new(cancel_handle: Arc<CancellationHandle>) -> Self {
688 Self {
689 cancel_handle,
690 commit_started: Arc::new(Mutex::new(false)),
691 }
692 }
693
694 pub(crate) fn cancel_handle(&self) -> Arc<CancellationHandle> {
696 self.cancel_handle.clone()
697 }
698
699 pub(crate) fn mark_commit_started(&self) -> bool {
705 let mut commit_started = self.commit_started.lock().unwrap();
706 if self.cancel_handle.is_cancelled() {
707 return false;
708 }
709 *commit_started = true;
710 true
711 }
712
713 pub(crate) fn request_cancel(&self) -> RequestCancelResult {
715 let commit_started = self.commit_started.lock().unwrap();
717 if *commit_started {
718 return RequestCancelResult::TooLateToCancel;
719 }
720 if self.cancel_handle.is_cancelled() {
721 return RequestCancelResult::AlreadyCancelling;
722 }
723
724 self.cancel_handle.cancel();
725 RequestCancelResult::CancelIssued
726 }
727}
728
729#[derive(Debug, Clone, Copy, PartialEq, Eq)]
730pub(crate) enum RequestCancelResult {
731 CancelIssued,
732 AlreadyCancelling,
733 TooLateToCancel,
734 NotRunning,
735}
736
737impl Drop for CompactionScheduler {
738 fn drop(&mut self) {
739 for (region_id, status) in self.region_status.drain() {
740 status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
742 }
743 }
744}
745
746async fn find_dynamic_options(
748 region_id: RegionId,
749 region_options: &crate::region::options::RegionOptions,
750 schema_metadata_manager: &SchemaMetadataManagerRef,
751) -> Result<(crate::region::options::CompactionOptions, TimeToLive)> {
752 let table_id = region_id.table_id();
753 if let (true, Some(ttl)) = (region_options.compaction_override, region_options.ttl) {
754 debug!(
755 "Use region options directly for table {}: compaction={:?}, ttl={:?}",
756 table_id, region_options.compaction, region_options.ttl
757 );
758 return Ok((region_options.compaction.clone(), ttl));
759 }
760
761 let db_options = tokio::time::timeout(
762 crate::config::FETCH_OPTION_TIMEOUT,
763 schema_metadata_manager.get_schema_options_by_table_id(table_id),
764 )
765 .await
766 .context(TimeoutSnafu)?
767 .context(GetSchemaMetadataSnafu)?;
768
769 let ttl = if let Some(ttl) = region_options.ttl {
770 debug!(
771 "Use region TTL directly for table {}: ttl={:?}",
772 table_id, region_options.ttl
773 );
774 ttl
775 } else {
776 db_options
777 .as_ref()
778 .and_then(|options| options.ttl)
779 .unwrap_or_default()
780 .into()
781 };
782
783 let compaction = if !region_options.compaction_override {
784 if let Some(schema_opts) = db_options {
785 let map: HashMap<String, String> = schema_opts
786 .extra_options
787 .iter()
788 .filter_map(|(k, v)| {
789 if k.starts_with("compaction.") {
790 Some((k.clone(), v.clone()))
791 } else {
792 None
793 }
794 })
795 .collect();
796 if map.is_empty() {
797 region_options.compaction.clone()
798 } else {
799 crate::region::options::RegionOptions::try_from_options(region_id, &map)
800 .map(|o| o.compaction)
801 .unwrap_or_else(|e| {
802 error!(e; "Failed to create RegionOptions from map");
803 region_options.compaction.clone()
804 })
805 }
806 } else {
807 debug!(
808 "DB options is None for table {}, use region compaction: compaction={:?}",
809 table_id, region_options.compaction
810 );
811 region_options.compaction.clone()
812 }
813 } else {
814 debug!(
815 "No schema options for table {}, use region compaction: compaction={:?}",
816 table_id, region_options.compaction
817 );
818 region_options.compaction.clone()
819 };
820
821 debug!(
822 "Resolved dynamic options for table {}: compaction={:?}, ttl={:?}",
823 table_id, compaction, ttl
824 );
825 Ok((compaction, ttl))
826}
827
828struct CompactionStatus {
830 region_id: RegionId,
832 version_control: VersionControlRef,
834 access_layer: AccessLayerRef,
836 waiters: Vec<OutputTx>,
838 pending_request: Option<PendingCompaction>,
840 pending_ddl_requests: Vec<SenderDdlRequest>,
842 active_compaction: Option<ActiveCompaction>,
844}
845
846impl CompactionStatus {
847 fn new(
849 region_id: RegionId,
850 version_control: VersionControlRef,
851 access_layer: AccessLayerRef,
852 ) -> CompactionStatus {
853 CompactionStatus {
854 region_id,
855 version_control,
856 access_layer,
857 waiters: Vec::new(),
858 pending_request: None,
859 pending_ddl_requests: Vec::new(),
860 active_compaction: None,
861 }
862 }
863
864 #[cfg(test)]
865 fn start_local_task(&mut self) -> LocalCompactionState {
866 let state = LocalCompactionState::new(Arc::new(CancellationHandle::default()));
867 self.active_compaction = Some(ActiveCompaction::Local {
868 state: state.clone(),
869 });
870 state
871 }
872
873 #[cfg(test)]
874 fn start_remote_task(&mut self) {
875 self.active_compaction = Some(ActiveCompaction::Remote);
876 }
877
878 fn request_cancel(&mut self) -> RequestCancelResult {
879 let Some(active_compaction) = &self.active_compaction else {
880 return RequestCancelResult::NotRunning;
881 };
882
883 match active_compaction {
884 ActiveCompaction::Local { state, .. } => state.request_cancel(),
885 ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel,
886 }
887 }
888
889 fn clear_running_task(&mut self) -> bool {
890 self.active_compaction.take().is_some()
891 }
892
893 fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
895 if let Some(waiter) = waiter.take_inner() {
896 self.waiters.push(waiter);
897 }
898 }
899
900 fn set_pending_request(&mut self, pending: PendingCompaction) {
902 if let Some(prev) = self.pending_request.replace(pending) {
903 debug!(
904 "Replace pending compaction options with new request {:?} for region: {}",
905 prev.options, self.region_id
906 );
907 prev.waiter.send(ManualCompactionOverrideSnafu.fail());
908 }
909 }
910
911 fn on_failure(mut self, err: Arc<Error>) {
912 for waiter in self.waiters.drain(..) {
913 waiter.send(Err(err.clone()).context(CompactRegionSnafu {
914 region_id: self.region_id,
915 }));
916 }
917
918 if let Some(pending_compaction) = self.pending_request {
919 pending_compaction
920 .waiter
921 .send(Err(err.clone()).context(CompactRegionSnafu {
922 region_id: self.region_id,
923 }));
924 }
925
926 for pending_ddl in self.pending_ddl_requests {
927 pending_ddl
928 .sender
929 .send(Err(err.clone()).context(CompactRegionSnafu {
930 region_id: self.region_id,
931 }));
932 }
933 }
934
935 #[must_use]
936 fn on_cancel(mut self) -> Vec<SenderDdlRequest> {
937 for waiter in self.waiters.drain(..) {
938 waiter.send(CompactionCancelledSnafu.fail());
939 }
940
941 if let Some(pending_compaction) = self.pending_request {
942 pending_compaction.waiter.send(
943 Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu {
944 region_id: self.region_id,
945 }),
946 );
947 }
948
949 std::mem::take(&mut self.pending_ddl_requests)
950 }
951
952 #[allow(clippy::too_many_arguments)]
956 fn new_compaction_request(
957 &mut self,
958 request_sender: Sender<WorkerRequestWithTime>,
959 mut waiter: OptionOutputTx,
960 engine_config: Arc<MitoConfig>,
961 cache_manager: CacheManagerRef,
962 manifest_ctx: &ManifestContextRef,
963 listener: WorkerListener,
964 schema_metadata_manager: SchemaMetadataManagerRef,
965 max_parallelism: usize,
966 ) -> CompactionRequest {
967 let current_version = CompactionVersion::from(self.version_control.current().version);
968 let start_time = Instant::now();
969 let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
970 waiters.extend(std::mem::take(&mut self.waiters));
971
972 if let Some(waiter) = waiter.take_inner() {
973 waiters.push(waiter);
974 }
975
976 CompactionRequest {
977 engine_config,
978 current_version,
979 access_layer: self.access_layer.clone(),
980 request_sender: request_sender.clone(),
981 waiters,
982 start_time,
983 cache_manager,
984 manifest_ctx: manifest_ctx.clone(),
985 listener,
986 schema_metadata_manager,
987 max_parallelism,
988 }
989 }
990}
991
992#[derive(Debug, Clone)]
993pub struct CompactionOutput {
994 pub output_level: Level,
996 pub inputs: Vec<FileHandle>,
998 pub filter_deleted: bool,
1000 pub output_time_range: Option<TimestampRange>,
1002}
1003
1004#[derive(Debug, Clone, Serialize, Deserialize)]
1006pub struct SerializedCompactionOutput {
1007 output_level: Level,
1008 inputs: Vec<FileMeta>,
1009 filter_deleted: bool,
1010 output_time_range: Option<TimestampRange>,
1011}
1012
1013struct CompactionSstReaderBuilder<'a> {
1015 metadata: RegionMetadataRef,
1016 sst_layer: AccessLayerRef,
1017 cache: CacheManagerRef,
1018 inputs: &'a [FileHandle],
1019 append_mode: bool,
1020 filter_deleted: bool,
1021 time_range: Option<TimestampRange>,
1022 merge_mode: MergeMode,
1023}
1024
1025impl CompactionSstReaderBuilder<'_> {
1026 async fn build_flat_sst_reader(self) -> Result<FlatSource> {
1029 let scan_input = self.build_scan_input().await?.with_compaction(true);
1030
1031 let schema = scan_input.mapper.output_schema();
1032 let schema = schema.arrow_schema();
1033
1034 let stream = SeqScan::new(scan_input)
1035 .build_flat_reader_for_compaction()
1036 .await?;
1037 Ok(FlatSource::new_stream(schema.clone(), stream))
1038 }
1039
1040 async fn build_scan_input(self) -> Result<ScanInput> {
1041 let schema = self.metadata.schema.arrow_schema();
1042 let json_type_hint = if schema.fields().iter().any(is_structured_json_field) {
1043 let mut json_type_hint = schema
1044 .fields()
1045 .iter()
1046 .filter(|&field| is_structured_json_field(field))
1047 .map(|field| (field.name().clone(), JsonNativeType::Null))
1048 .collect::<HashMap<_, _>>();
1049
1050 let schemas = self.collect_arrow_schemas_from_parquet().await?;
1051 for schema in schemas {
1052 for field in schema.fields() {
1053 let Some(merged) = json_type_hint.get_mut(field.name()) else {
1054 continue;
1055 };
1056
1057 let json_type = JsonNativeType::try_from(field.data_type())
1058 .context(DataTypeMismatchSnafu)?;
1059 merged.merge(&json_type);
1060 }
1061 }
1062
1063 Some(json_type_hint)
1064 } else {
1065 None
1066 };
1067
1068 let projection = (0..self.metadata.column_metadatas.len()).collect();
1069 let read_columns = ReadColumns::from_deduped_column_ids(
1070 self.metadata.column_metadatas.iter().map(|x| x.column_id),
1071 );
1072 let mapper = FlatProjectionMapper::new_with_read_columns(
1073 &self.metadata,
1074 projection,
1075 read_columns,
1076 json_type_hint.as_ref(),
1077 )?;
1078
1079 let mut scan_input = ScanInput::new(self.sst_layer, mapper)
1080 .with_files(self.inputs.to_vec())
1081 .with_append_mode(self.append_mode)
1082 .with_cache(CacheStrategy::Compaction(self.cache))
1084 .with_filter_deleted(self.filter_deleted)
1085 .with_ignore_file_not_found(true)
1087 .with_merge_mode(self.merge_mode);
1088
1089 if let Some(time_range) = self.time_range {
1092 scan_input =
1093 scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
1094 }
1095
1096 Ok(scan_input)
1097 }
1098
1099 async fn collect_arrow_schemas_from_parquet(&self) -> Result<Vec<Schema>> {
1100 let mut schemas = Vec::with_capacity(self.inputs.len());
1101
1102 for file_handle in self.inputs {
1103 let file_path =
1104 file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type());
1105 let file_size = file_handle.meta_ref().file_size;
1106 let parquet_metadata = match self
1107 .sst_layer
1108 .read_sst(file_handle.clone())
1109 .cache(CacheStrategy::Compaction(self.cache.clone()))
1110 .read_parquet_metadata(
1111 &file_path,
1112 file_size,
1113 &mut MetadataCacheMetrics::default(),
1114 PageIndexPolicy::default(),
1115 )
1116 .await
1117 .map(|x| x.0.parquet_metadata())
1118 {
1119 Ok(x) => x,
1120 Err(e) if e.is_object_not_found() => continue,
1121 Err(e) => return Err(e),
1122 };
1123 let file_metadata = parquet_metadata.file_metadata();
1124
1125 let schema = parquet_to_arrow_schema(
1126 file_metadata.schema_descr(),
1127 file_metadata.key_value_metadata(),
1128 )
1129 .context(ParquetToArrowSchemaSnafu { file: file_path })?;
1130
1131 schemas.push(schema);
1132 }
1133 Ok(schemas)
1134 }
1135}
1136
1137fn time_range_to_predicate(
1139 range: TimestampRange,
1140 metadata: &RegionMetadataRef,
1141) -> Result<PredicateGroup> {
1142 let ts_col = metadata.time_index_column();
1143
1144 let ts_col_unit = ts_col
1146 .column_schema
1147 .data_type
1148 .as_timestamp()
1149 .unwrap()
1150 .unit();
1151
1152 let exprs = match (range.start(), range.end()) {
1153 (Some(start), Some(end)) => {
1154 vec![
1155 datafusion_expr::col(ts_col.column_schema.name.clone())
1156 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1157 datafusion_expr::col(ts_col.column_schema.name.clone())
1158 .lt(ts_to_lit(*end, ts_col_unit)?),
1159 ]
1160 }
1161 (Some(start), None) => {
1162 vec![
1163 datafusion_expr::col(ts_col.column_schema.name.clone())
1164 .gt_eq(ts_to_lit(*start, ts_col_unit)?),
1165 ]
1166 }
1167
1168 (None, Some(end)) => {
1169 vec![
1170 datafusion_expr::col(ts_col.column_schema.name.clone())
1171 .lt(ts_to_lit(*end, ts_col_unit)?),
1172 ]
1173 }
1174 (None, None) => {
1175 return Ok(PredicateGroup::default());
1176 }
1177 };
1178
1179 let predicate = PredicateGroup::new(metadata, &exprs)?;
1180 Ok(predicate)
1181}
1182
1183fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
1184 let ts = ts
1185 .convert_to(ts_col_unit)
1186 .context(TimeRangePredicateOverflowSnafu {
1187 timestamp: ts,
1188 unit: ts_col_unit,
1189 })?;
1190 let val = ts.value();
1191 let scalar_value = match ts_col_unit {
1192 TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
1193 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
1194 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
1195 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
1196 };
1197 Ok(datafusion_expr::lit(scalar_value))
1198}
1199
1200fn get_expired_ssts(
1202 levels: &[LevelMeta],
1203 ttl: Option<TimeToLive>,
1204 now: Timestamp,
1205) -> Vec<FileHandle> {
1206 let Some(ttl) = ttl else {
1207 return vec![];
1208 };
1209
1210 levels
1211 .iter()
1212 .flat_map(|l| l.get_expired_files(&now, &ttl).into_iter())
1213 .collect()
1214}
1215
1216fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
1219 picker_output
1220 .outputs
1221 .iter()
1222 .flat_map(|output| output.inputs.iter())
1223 .map(|file: &FileHandle| {
1224 let meta = file.meta_ref();
1225 meta.max_row_group_uncompressed_size
1226 })
1227 .sum()
1228}
1229
1230struct PendingCompaction {
1233 pub(crate) options: compact_request::Options,
1235 pub(crate) waiter: OptionOutputTx,
1237 pub(crate) max_parallelism: usize,
1239}
1240
1241#[cfg(test)]
1242mod tests {
1243 use std::assert_matches;
1244 use std::time::Duration;
1245
1246 use api::v1::region::StrictWindow;
1247 use common_datasource::compression::CompressionType;
1248 use common_meta::key::schema_name::SchemaNameValue;
1249 use common_time::DatabaseTimeToLive;
1250 use tokio::sync::{Barrier, oneshot};
1251
1252 use super::*;
1253 use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
1254 use crate::error::InvalidSchedulerStateSnafu;
1255 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1256 use crate::region::ManifestContext;
1257 use crate::schedule::scheduler::{Job, Scheduler};
1258 use crate::sst::FormatType;
1259 use crate::test_util::mock_schema_metadata_manager;
1260 use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1261 use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
1262
1263 struct FailingScheduler;
1264
1265 #[async_trait::async_trait]
1266 impl Scheduler for FailingScheduler {
1267 fn schedule(&self, _job: Job) -> Result<()> {
1268 InvalidSchedulerStateSnafu.fail()
1269 }
1270
1271 async fn stop(&self, _await_termination: bool) -> Result<()> {
1272 Ok(())
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn test_find_compaction_options_db_level() {
1278 let env = SchedulerEnv::new().await;
1279 let builder = VersionControlBuilder::new();
1280 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1281 let region_id = builder.region_id();
1282 let table_id = region_id.table_id();
1283 let mut schema_value = SchemaNameValue {
1285 ttl: Some(DatabaseTimeToLive::default()),
1286 ..Default::default()
1287 };
1288 schema_value
1289 .extra_options
1290 .insert("compaction.type".to_string(), "twcs".to_string());
1291 schema_value
1292 .extra_options
1293 .insert("compaction.twcs.time_window".to_string(), "2h".to_string());
1294 schema_metadata_manager
1295 .register_region_table_info(
1296 table_id,
1297 "t",
1298 "c",
1299 "s",
1300 Some(schema_value),
1301 kv_backend.clone(),
1302 )
1303 .await;
1304
1305 let version_control = Arc::new(builder.build());
1306 let region_opts = version_control.current().version.options.clone();
1307 let (opts, _) = find_dynamic_options(region_id, ®ion_opts, &schema_metadata_manager)
1308 .await
1309 .unwrap();
1310 match opts {
1311 crate::region::options::CompactionOptions::Twcs(t) => {
1312 assert_eq!(t.time_window_seconds(), Some(2 * 3600));
1313 }
1314 }
1315 let manifest_ctx = env
1316 .mock_manifest_context(version_control.current().version.metadata.clone())
1317 .await;
1318 let (tx, _rx) = mpsc::channel(4);
1319 let mut scheduler = env.mock_compaction_scheduler(tx);
1320 let (otx, _orx) = oneshot::channel();
1321 let request = scheduler
1322 .region_status
1323 .entry(region_id)
1324 .or_insert_with(|| {
1325 crate::compaction::CompactionStatus::new(
1326 region_id,
1327 version_control.clone(),
1328 env.access_layer.clone(),
1329 )
1330 })
1331 .new_compaction_request(
1332 scheduler.request_sender.clone(),
1333 OptionOutputTx::new(Some(OutputTx::new(otx))),
1334 scheduler.engine_config.clone(),
1335 scheduler.cache_manager.clone(),
1336 &manifest_ctx,
1337 scheduler.listener.clone(),
1338 schema_metadata_manager.clone(),
1339 1,
1340 );
1341 scheduler
1342 .schedule_compaction_request(
1343 request,
1344 compact_request::Options::Regular(Default::default()),
1345 )
1346 .await
1347 .unwrap();
1348 }
1349
1350 #[tokio::test]
1351 async fn test_find_compaction_options_priority() {
1352 fn schema_value_with_twcs(time_window: &str) -> SchemaNameValue {
1353 let mut schema_value = SchemaNameValue {
1354 ttl: Some(DatabaseTimeToLive::default()),
1355 ..Default::default()
1356 };
1357 schema_value
1358 .extra_options
1359 .insert("compaction.type".to_string(), "twcs".to_string());
1360 schema_value.extra_options.insert(
1361 "compaction.twcs.time_window".to_string(),
1362 time_window.to_string(),
1363 );
1364 schema_value
1365 }
1366
1367 let cases = [
1368 (
1369 "db options set and table override set",
1370 Some(schema_value_with_twcs("2h")),
1371 true,
1372 Some(Duration::from_secs(5 * 3600)),
1373 Some(5 * 3600),
1374 ),
1375 (
1376 "db options set and table override not set",
1377 Some(schema_value_with_twcs("2h")),
1378 false,
1379 None,
1380 Some(2 * 3600),
1381 ),
1382 (
1383 "db options not set and table override set",
1384 None,
1385 true,
1386 Some(Duration::from_secs(4 * 3600)),
1387 Some(4 * 3600),
1388 ),
1389 (
1390 "db options not set and table override not set",
1391 None,
1392 false,
1393 None,
1394 None,
1395 ),
1396 ];
1397
1398 for (case_name, schema_value, override_set, table_window, expected_window) in cases {
1399 let builder = VersionControlBuilder::new();
1400 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1401 let region_id = builder.region_id();
1402 let table_id = region_id.table_id();
1403 schema_metadata_manager
1404 .register_region_table_info(
1405 table_id,
1406 "t",
1407 "c",
1408 "s",
1409 schema_value,
1410 kv_backend.clone(),
1411 )
1412 .await;
1413
1414 let version_control = Arc::new(builder.build());
1415 let mut region_opts = version_control.current().version.options.clone();
1416 region_opts.compaction_override = override_set;
1417 if let Some(window) = table_window {
1418 let crate::region::options::CompactionOptions::Twcs(twcs) =
1419 &mut region_opts.compaction;
1420 twcs.time_window = Some(window);
1421 }
1422
1423 let (opts, _) = find_dynamic_options(region_id, ®ion_opts, &schema_metadata_manager)
1424 .await
1425 .unwrap();
1426 match opts {
1427 crate::region::options::CompactionOptions::Twcs(t) => {
1428 assert_eq!(t.time_window_seconds(), expected_window, "{case_name}");
1429 }
1430 }
1431 }
1432 }
1433
1434 #[tokio::test]
1435 async fn test_schedule_empty() {
1436 let env = SchedulerEnv::new().await;
1437 let (tx, _rx) = mpsc::channel(4);
1438 let mut scheduler = env.mock_compaction_scheduler(tx);
1439 let mut builder = VersionControlBuilder::new();
1440 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1441 schema_metadata_manager
1442 .register_region_table_info(
1443 builder.region_id().table_id(),
1444 "test_table",
1445 "test_catalog",
1446 "test_schema",
1447 None,
1448 kv_backend,
1449 )
1450 .await;
1451 let version_control = Arc::new(builder.build());
1453 let (output_tx, output_rx) = oneshot::channel();
1454 let waiter = OptionOutputTx::from(output_tx);
1455 let manifest_ctx = env
1456 .mock_manifest_context(version_control.current().version.metadata.clone())
1457 .await;
1458 let scheduled = scheduler
1459 .schedule_compaction(
1460 builder.region_id(),
1461 compact_request::Options::Regular(Default::default()),
1462 &version_control,
1463 &env.access_layer,
1464 waiter,
1465 &manifest_ctx,
1466 schema_metadata_manager.clone(),
1467 1,
1468 )
1469 .await
1470 .unwrap();
1471 assert!(!scheduled);
1472 let output = output_rx.await.unwrap().unwrap();
1473 assert_eq!(output, 0);
1474 assert!(scheduler.region_status.is_empty());
1475
1476 let version_control = Arc::new(builder.push_l0_file(0, 1000).build());
1478 let (output_tx, output_rx) = oneshot::channel();
1479 let waiter = OptionOutputTx::from(output_tx);
1480 let scheduled = scheduler
1481 .schedule_compaction(
1482 builder.region_id(),
1483 compact_request::Options::Regular(Default::default()),
1484 &version_control,
1485 &env.access_layer,
1486 waiter,
1487 &manifest_ctx,
1488 schema_metadata_manager,
1489 1,
1490 )
1491 .await
1492 .unwrap();
1493 assert!(!scheduled);
1494 let output = output_rx.await.unwrap().unwrap();
1495 assert_eq!(output, 0);
1496 assert!(scheduler.region_status.is_empty());
1497 }
1498
1499 #[tokio::test]
1500 async fn test_schedule_compaction_returns_true_when_task_scheduled() {
1501 let job_scheduler = Arc::new(VecScheduler::default());
1502 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1503 let (tx, _rx) = mpsc::channel(4);
1504 let mut scheduler = env.mock_compaction_scheduler(tx);
1505 let mut builder = VersionControlBuilder::new();
1506 let region_id = builder.region_id();
1507 let end = 1000 * 1000;
1508 let version_control = Arc::new(
1510 builder
1511 .push_l0_file(0, end)
1512 .push_l0_file(10, end)
1513 .push_l0_file(50, end)
1514 .push_l0_file(80, end)
1515 .push_l0_file(90, end)
1516 .build(),
1517 );
1518 let manifest_ctx = env
1519 .mock_manifest_context(version_control.current().version.metadata.clone())
1520 .await;
1521 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1522 schema_metadata_manager
1523 .register_region_table_info(
1524 region_id.table_id(),
1525 "test_table",
1526 "test_catalog",
1527 "test_schema",
1528 None,
1529 kv_backend,
1530 )
1531 .await;
1532
1533 let scheduled = scheduler
1534 .schedule_compaction(
1535 region_id,
1536 Options::Regular(Default::default()),
1537 &version_control,
1538 &env.access_layer,
1539 OptionOutputTx::none(),
1540 &manifest_ctx,
1541 schema_metadata_manager,
1542 1,
1543 )
1544 .await
1545 .unwrap();
1546
1547 assert!(scheduled);
1550 assert_eq!(1, job_scheduler.num_jobs());
1551 assert!(scheduler.region_status.contains_key(®ion_id));
1552 }
1553
1554 #[tokio::test]
1555 async fn test_schedule_on_finished() {
1556 common_telemetry::init_default_ut_logging();
1557 let job_scheduler = Arc::new(VecScheduler::default());
1558 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1559 let (tx, _rx) = mpsc::channel(4);
1560 let mut scheduler = env.mock_compaction_scheduler(tx);
1561 let mut builder = VersionControlBuilder::new();
1562 let purger = builder.file_purger();
1563 let region_id = builder.region_id();
1564
1565 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1566 schema_metadata_manager
1567 .register_region_table_info(
1568 builder.region_id().table_id(),
1569 "test_table",
1570 "test_catalog",
1571 "test_schema",
1572 None,
1573 kv_backend,
1574 )
1575 .await;
1576
1577 let end = 1000 * 1000;
1579 let version_control = Arc::new(
1580 builder
1581 .push_l0_file(0, end)
1582 .push_l0_file(10, end)
1583 .push_l0_file(50, end)
1584 .push_l0_file(80, end)
1585 .push_l0_file(90, end)
1586 .build(),
1587 );
1588 let manifest_ctx = env
1589 .mock_manifest_context(version_control.current().version.metadata.clone())
1590 .await;
1591 let scheduled = scheduler
1592 .schedule_compaction(
1593 region_id,
1594 compact_request::Options::Regular(Default::default()),
1595 &version_control,
1596 &env.access_layer,
1597 OptionOutputTx::none(),
1598 &manifest_ctx,
1599 schema_metadata_manager.clone(),
1600 1,
1601 )
1602 .await
1603 .unwrap();
1604 assert!(scheduled);
1606 assert_eq!(1, scheduler.region_status.len());
1607 assert_eq!(1, job_scheduler.num_jobs());
1608 let data = version_control.current();
1609 let file_metas: Vec<_> = data.version.ssts.levels()[0]
1610 .files
1611 .values()
1612 .map(|file| file.meta_ref().clone())
1613 .collect();
1614
1615 apply_edit(
1617 &version_control,
1618 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1619 &file_metas,
1620 purger.clone(),
1621 );
1622 let (tx, _rx) = oneshot::channel();
1624 let scheduled = scheduler
1625 .schedule_compaction(
1626 region_id,
1627 compact_request::Options::Regular(Default::default()),
1628 &version_control,
1629 &env.access_layer,
1630 OptionOutputTx::new(Some(OutputTx::new(tx))),
1631 &manifest_ctx,
1632 schema_metadata_manager.clone(),
1633 1,
1634 )
1635 .await
1636 .unwrap();
1637 assert!(!scheduled);
1638 assert_eq!(1, scheduler.region_status.len());
1639 assert_eq!(1, job_scheduler.num_jobs());
1640 assert!(
1641 !scheduler
1642 .region_status
1643 .get(&builder.region_id())
1644 .unwrap()
1645 .waiters
1646 .is_empty()
1647 );
1648
1649 scheduler
1651 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1652 .await;
1653 let scheduled = scheduler
1654 .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager.clone())
1655 .await;
1656 assert!(scheduled);
1657 assert_eq!(1, scheduler.region_status.len());
1658 assert_eq!(2, job_scheduler.num_jobs());
1659
1660 apply_edit(
1662 &version_control,
1663 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1664 &[],
1665 purger.clone(),
1666 );
1667 let (tx, _rx) = oneshot::channel();
1668 let scheduled = scheduler
1670 .schedule_compaction(
1671 region_id,
1672 compact_request::Options::Regular(Default::default()),
1673 &version_control,
1674 &env.access_layer,
1675 OptionOutputTx::new(Some(OutputTx::new(tx))),
1676 &manifest_ctx,
1677 schema_metadata_manager,
1678 1,
1679 )
1680 .await
1681 .unwrap();
1682 assert!(!scheduled);
1683 assert_eq!(2, job_scheduler.num_jobs());
1684 assert!(
1685 !scheduler
1686 .region_status
1687 .get(&builder.region_id())
1688 .unwrap()
1689 .waiters
1690 .is_empty()
1691 );
1692 }
1693
1694 #[tokio::test]
1695 async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() {
1696 common_telemetry::init_default_ut_logging();
1697 let env = SchedulerEnv::new()
1698 .await
1699 .scheduler(Arc::new(FailingScheduler));
1700 let (tx, _rx) = mpsc::channel(4);
1701 let mut scheduler = env.mock_compaction_scheduler(tx);
1702 let mut builder = VersionControlBuilder::new();
1703 let end = 1000 * 1000;
1704 let version_control = Arc::new(
1705 builder
1706 .push_l0_file(0, end)
1707 .push_l0_file(10, end)
1708 .push_l0_file(50, end)
1709 .push_l0_file(80, end)
1710 .push_l0_file(90, end)
1711 .build(),
1712 );
1713 let region_id = builder.region_id();
1714 let manifest_ctx = env
1715 .mock_manifest_context(version_control.current().version.metadata.clone())
1716 .await;
1717 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1718 schema_metadata_manager
1719 .register_region_table_info(
1720 builder.region_id().table_id(),
1721 "test_table",
1722 "test_catalog",
1723 "test_schema",
1724 None,
1725 kv_backend,
1726 )
1727 .await;
1728
1729 let result = scheduler
1730 .schedule_compaction(
1731 region_id,
1732 compact_request::Options::Regular(Default::default()),
1733 &version_control,
1734 &env.access_layer,
1735 OptionOutputTx::none(),
1736 &manifest_ctx,
1737 schema_metadata_manager,
1738 1,
1739 )
1740 .await;
1741
1742 assert!(result.is_err());
1743 assert!(!scheduler.region_status.contains_key(®ion_id));
1744 }
1745
1746 #[tokio::test]
1747 async fn test_manual_compaction_when_compaction_in_progress() {
1748 common_telemetry::init_default_ut_logging();
1749 let job_scheduler = Arc::new(VecScheduler::default());
1750 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1751 let (tx, _rx) = mpsc::channel(4);
1752 let mut scheduler = env.mock_compaction_scheduler(tx);
1753 let mut builder = VersionControlBuilder::new();
1754 let purger = builder.file_purger();
1755 let region_id = builder.region_id();
1756
1757 let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
1758 schema_metadata_manager
1759 .register_region_table_info(
1760 builder.region_id().table_id(),
1761 "test_table",
1762 "test_catalog",
1763 "test_schema",
1764 None,
1765 kv_backend,
1766 )
1767 .await;
1768
1769 let end = 1000 * 1000;
1771 let version_control = Arc::new(
1772 builder
1773 .push_l0_file(0, end)
1774 .push_l0_file(10, end)
1775 .push_l0_file(50, end)
1776 .push_l0_file(80, end)
1777 .push_l0_file(90, end)
1778 .build(),
1779 );
1780 let manifest_ctx = env
1781 .mock_manifest_context(version_control.current().version.metadata.clone())
1782 .await;
1783
1784 let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
1785 .files
1786 .values()
1787 .map(|file| file.meta_ref().clone())
1788 .collect();
1789
1790 apply_edit(
1792 &version_control,
1793 &[(0, end), (20, end), (40, end), (60, end), (80, end)],
1794 &file_metas,
1795 purger.clone(),
1796 );
1797
1798 scheduler
1799 .schedule_compaction(
1800 region_id,
1801 compact_request::Options::Regular(Default::default()),
1802 &version_control,
1803 &env.access_layer,
1804 OptionOutputTx::none(),
1805 &manifest_ctx,
1806 schema_metadata_manager.clone(),
1807 1,
1808 )
1809 .await
1810 .unwrap();
1811 assert_eq!(1, scheduler.region_status.len());
1813 assert_eq!(1, job_scheduler.num_jobs());
1814 assert!(
1815 scheduler
1816 .region_status
1817 .get(®ion_id)
1818 .unwrap()
1819 .pending_request
1820 .is_none()
1821 );
1822
1823 let (tx, _rx) = oneshot::channel();
1825 scheduler
1826 .schedule_compaction(
1827 region_id,
1828 compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
1829 &version_control,
1830 &env.access_layer,
1831 OptionOutputTx::new(Some(OutputTx::new(tx))),
1832 &manifest_ctx,
1833 schema_metadata_manager.clone(),
1834 1,
1835 )
1836 .await
1837 .unwrap();
1838 assert_eq!(1, scheduler.region_status.len());
1839 assert_eq!(1, job_scheduler.num_jobs());
1841 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1842 assert!(status.pending_request.is_some());
1843
1844 scheduler
1846 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
1847 .await;
1848 assert_eq!(1, scheduler.region_status.len());
1849 assert_eq!(2, job_scheduler.num_jobs());
1850
1851 let status = scheduler.region_status.get(&builder.region_id()).unwrap();
1852 assert!(status.pending_request.is_none());
1853 }
1854
1855 #[tokio::test]
1856 async fn test_compaction_bypass_in_staging_mode() {
1857 let env = SchedulerEnv::new().await;
1858 let (tx, _rx) = mpsc::channel(4);
1859 let mut scheduler = env.mock_compaction_scheduler(tx);
1860
1861 let builder = VersionControlBuilder::new();
1863 let version_control = Arc::new(builder.build());
1864 let region_id = version_control.current().version.metadata.region_id;
1865
1866 let staging_manifest_ctx = {
1868 let manager = RegionManifestManager::new(
1869 version_control.current().version.metadata.clone(),
1870 0,
1871 RegionManifestOptions {
1872 manifest_dir: "".to_string(),
1873 object_store: env.access_layer.object_store().clone(),
1874 compress_type: CompressionType::Uncompressed,
1875 checkpoint_distance: 10,
1876 remove_file_options: Default::default(),
1877 manifest_cache: None,
1878 },
1879 FormatType::PrimaryKey,
1880 &Default::default(),
1881 )
1882 .await
1883 .unwrap();
1884 Arc::new(ManifestContext::new(
1885 manager,
1886 RegionRoleState::Leader(RegionLeaderState::Staging),
1887 None,
1888 ))
1889 };
1890
1891 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
1892
1893 let (tx, rx) = oneshot::channel();
1895 scheduler
1896 .schedule_compaction(
1897 region_id,
1898 compact_request::Options::Regular(Default::default()),
1899 &version_control,
1900 &env.access_layer,
1901 OptionOutputTx::new(Some(OutputTx::new(tx))),
1902 &staging_manifest_ctx,
1903 schema_metadata_manager,
1904 1,
1905 )
1906 .await
1907 .unwrap();
1908
1909 let result = rx.await.unwrap();
1910 assert_eq!(result.unwrap(), 0); assert_eq!(0, scheduler.region_status.len());
1912 }
1913
1914 #[tokio::test]
1915 async fn test_add_ddl_request_to_pending() {
1916 let env = SchedulerEnv::new().await;
1917 let (tx, _rx) = mpsc::channel(4);
1918 let mut scheduler = env.mock_compaction_scheduler(tx);
1919 let builder = VersionControlBuilder::new();
1920 let version_control = Arc::new(builder.build());
1921 let region_id = builder.region_id();
1922
1923 scheduler.region_status.insert(
1924 region_id,
1925 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
1926 );
1927 scheduler
1928 .region_status
1929 .get_mut(®ion_id)
1930 .unwrap()
1931 .start_local_task();
1932
1933 let (output_tx, _output_rx) = oneshot::channel();
1934 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
1935 region_id,
1936 sender: OptionOutputTx::from(output_tx),
1937 request: crate::request::DdlRequest::EnterStaging(
1938 store_api::region_request::EnterStagingRequest {
1939 partition_directive:
1940 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
1941 },
1942 ),
1943 });
1944
1945 assert!(scheduler.has_pending_ddls(region_id));
1946 }
1947
1948 #[tokio::test]
1949 async fn test_request_cancel_state_transitions() {
1950 let env = SchedulerEnv::new().await;
1951 let builder = VersionControlBuilder::new();
1952 let region_id = builder.region_id();
1953 let version_control = Arc::new(builder.build());
1954 let mut status =
1955 CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1956 let state = status.start_local_task();
1957
1958 assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued);
1959 assert!(state.cancel_handle().is_cancelled());
1960 assert_eq!(
1961 status.request_cancel(),
1962 RequestCancelResult::AlreadyCancelling
1963 );
1964
1965 assert!(!state.mark_commit_started());
1966 assert_eq!(
1967 status.request_cancel(),
1968 RequestCancelResult::AlreadyCancelling
1969 );
1970
1971 assert!(status.clear_running_task());
1972 assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning);
1973 }
1974
1975 #[tokio::test]
1976 async fn test_request_cancel_remote_compaction_is_too_late() {
1977 let env = SchedulerEnv::new().await;
1978 let builder = VersionControlBuilder::new();
1979 let region_id = builder.region_id();
1980 let version_control = Arc::new(builder.build());
1981 let mut status =
1982 CompactionStatus::new(region_id, version_control, env.access_layer.clone());
1983
1984 status.start_remote_task();
1985
1986 assert_eq!(
1987 status.request_cancel(),
1988 RequestCancelResult::TooLateToCancel
1989 );
1990 assert!(status.active_compaction.is_some());
1991 }
1992
1993 #[tokio::test]
1994 async fn test_on_compaction_cancelled_returns_pending_ddl_requests() {
1995 let job_scheduler = Arc::new(VecScheduler::default());
1996 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1997 let (tx, _rx) = mpsc::channel(4);
1998 let mut scheduler = env.mock_compaction_scheduler(tx);
1999 let builder = VersionControlBuilder::new();
2000 let version_control = Arc::new(builder.build());
2001 let region_id = builder.region_id();
2002 let _manifest_ctx = env
2003 .mock_manifest_context(version_control.current().version.metadata.clone())
2004 .await;
2005 let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2006
2007 scheduler.region_status.insert(
2008 region_id,
2009 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2010 );
2011 scheduler
2012 .region_status
2013 .get_mut(®ion_id)
2014 .unwrap()
2015 .start_local_task();
2016
2017 let (output_tx, _output_rx) = oneshot::channel();
2018 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2019 region_id,
2020 sender: OptionOutputTx::from(output_tx),
2021 request: crate::request::DdlRequest::EnterStaging(
2022 store_api::region_request::EnterStagingRequest {
2023 partition_directive:
2024 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2025 },
2026 ),
2027 });
2028
2029 let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
2030
2031 assert_eq!(pending_ddls.len(), 1);
2032 assert!(!scheduler.has_pending_ddls(region_id));
2033 assert!(!scheduler.region_status.contains_key(®ion_id));
2034 assert_eq!(job_scheduler.num_jobs(), 0);
2035 }
2036
2037 #[tokio::test]
2038 async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() {
2039 let job_scheduler = Arc::new(VecScheduler::default());
2040 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2041 let (tx, _rx) = mpsc::channel(4);
2042 let mut scheduler = env.mock_compaction_scheduler(tx);
2043 let builder = VersionControlBuilder::new();
2044 let version_control = Arc::new(builder.build());
2045 let region_id = builder.region_id();
2046 let _manifest_ctx = env
2047 .mock_manifest_context(version_control.current().version.metadata.clone())
2048 .await;
2049 let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2050
2051 scheduler.region_status.insert(
2052 region_id,
2053 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2054 );
2055 let status = scheduler.region_status.get_mut(®ion_id).unwrap();
2056 status.start_local_task();
2057 let (manual_tx, manual_rx) = oneshot::channel();
2058 status.set_pending_request(PendingCompaction {
2059 options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
2060 waiter: OptionOutputTx::from(manual_tx),
2061 max_parallelism: 1,
2062 });
2063
2064 let (output_tx, _output_rx) = oneshot::channel();
2065 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2066 region_id,
2067 sender: OptionOutputTx::from(output_tx),
2068 request: crate::request::DdlRequest::EnterStaging(
2069 store_api::region_request::EnterStagingRequest {
2070 partition_directive:
2071 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2072 },
2073 ),
2074 });
2075
2076 let pending_ddls = scheduler.on_compaction_cancelled(region_id).await;
2077
2078 assert_eq!(pending_ddls.len(), 1);
2079 assert!(!scheduler.region_status.contains_key(®ion_id));
2080 assert_eq!(job_scheduler.num_jobs(), 0);
2081 assert_matches!(manual_rx.await.unwrap(), Err(_));
2082 }
2083
2084 #[tokio::test]
2085 async fn test_pending_ddl_request_failed_on_compaction_failed() {
2086 let env = SchedulerEnv::new().await;
2087 let (tx, _rx) = mpsc::channel(4);
2088 let mut scheduler = env.mock_compaction_scheduler(tx);
2089 let builder = VersionControlBuilder::new();
2090 let version_control = Arc::new(builder.build());
2091 let region_id = builder.region_id();
2092
2093 scheduler.region_status.insert(
2094 region_id,
2095 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2096 );
2097
2098 let (output_tx, output_rx) = oneshot::channel();
2099 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2100 region_id,
2101 sender: OptionOutputTx::from(output_tx),
2102 request: crate::request::DdlRequest::EnterStaging(
2103 store_api::region_request::EnterStagingRequest {
2104 partition_directive:
2105 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2106 },
2107 ),
2108 });
2109
2110 assert!(scheduler.has_pending_ddls(region_id));
2111 scheduler
2112 .on_compaction_failed(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
2113
2114 assert!(!scheduler.has_pending_ddls(region_id));
2115 let result = output_rx.await.unwrap();
2116 assert_matches!(result, Err(_));
2117 }
2118
2119 #[tokio::test]
2120 async fn test_pending_ddl_request_failed_on_region_closed() {
2121 let env = SchedulerEnv::new().await;
2122 let (tx, _rx) = mpsc::channel(4);
2123 let mut scheduler = env.mock_compaction_scheduler(tx);
2124 let builder = VersionControlBuilder::new();
2125 let version_control = Arc::new(builder.build());
2126 let region_id = builder.region_id();
2127
2128 scheduler.region_status.insert(
2129 region_id,
2130 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2131 );
2132
2133 let (output_tx, output_rx) = oneshot::channel();
2134 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2135 region_id,
2136 sender: OptionOutputTx::from(output_tx),
2137 request: crate::request::DdlRequest::EnterStaging(
2138 store_api::region_request::EnterStagingRequest {
2139 partition_directive:
2140 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2141 },
2142 ),
2143 });
2144
2145 assert!(scheduler.has_pending_ddls(region_id));
2146 scheduler.on_region_closed(region_id);
2147
2148 assert!(!scheduler.has_pending_ddls(region_id));
2149 let result = output_rx.await.unwrap();
2150 assert_matches!(result, Err(_));
2151 }
2152
2153 #[tokio::test]
2154 async fn test_pending_ddl_request_failed_on_region_dropped() {
2155 let env = SchedulerEnv::new().await;
2156 let (tx, _rx) = mpsc::channel(4);
2157 let mut scheduler = env.mock_compaction_scheduler(tx);
2158 let builder = VersionControlBuilder::new();
2159 let version_control = Arc::new(builder.build());
2160 let region_id = builder.region_id();
2161
2162 scheduler.region_status.insert(
2163 region_id,
2164 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2165 );
2166
2167 let (output_tx, output_rx) = oneshot::channel();
2168 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2169 region_id,
2170 sender: OptionOutputTx::from(output_tx),
2171 request: crate::request::DdlRequest::EnterStaging(
2172 store_api::region_request::EnterStagingRequest {
2173 partition_directive:
2174 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2175 },
2176 ),
2177 });
2178
2179 assert!(scheduler.has_pending_ddls(region_id));
2180 scheduler.on_region_dropped(region_id);
2181
2182 assert!(!scheduler.has_pending_ddls(region_id));
2183 let result = output_rx.await.unwrap();
2184 assert_matches!(result, Err(_));
2185 }
2186
2187 #[tokio::test]
2188 async fn test_pending_ddl_request_failed_on_region_truncated() {
2189 let env = SchedulerEnv::new().await;
2190 let (tx, _rx) = mpsc::channel(4);
2191 let mut scheduler = env.mock_compaction_scheduler(tx);
2192 let builder = VersionControlBuilder::new();
2193 let version_control = Arc::new(builder.build());
2194 let region_id = builder.region_id();
2195
2196 scheduler.region_status.insert(
2197 region_id,
2198 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2199 );
2200
2201 let (output_tx, output_rx) = oneshot::channel();
2202 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2203 region_id,
2204 sender: OptionOutputTx::from(output_tx),
2205 request: crate::request::DdlRequest::EnterStaging(
2206 store_api::region_request::EnterStagingRequest {
2207 partition_directive:
2208 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2209 },
2210 ),
2211 });
2212
2213 assert!(scheduler.has_pending_ddls(region_id));
2214 scheduler.on_region_truncated(region_id);
2215
2216 assert!(!scheduler.has_pending_ddls(region_id));
2217 let result = output_rx.await.unwrap();
2218 assert_matches!(result, Err(_));
2219 }
2220
2221 #[tokio::test]
2222 async fn test_on_compaction_finished_returns_pending_ddl_requests() {
2223 let job_scheduler = Arc::new(VecScheduler::default());
2224 let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
2225 let (tx, _rx) = mpsc::channel(4);
2226 let mut scheduler = env.mock_compaction_scheduler(tx);
2227 let builder = VersionControlBuilder::new();
2228 let version_control = Arc::new(builder.build());
2229 let region_id = builder.region_id();
2230 let manifest_ctx = env
2231 .mock_manifest_context(version_control.current().version.metadata.clone())
2232 .await;
2233 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2234
2235 scheduler.region_status.insert(
2236 region_id,
2237 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2238 );
2239 scheduler
2240 .region_status
2241 .get_mut(®ion_id)
2242 .unwrap()
2243 .start_local_task();
2244
2245 let (output_tx, _output_rx) = oneshot::channel();
2246 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2247 region_id,
2248 sender: OptionOutputTx::from(output_tx),
2249 request: crate::request::DdlRequest::EnterStaging(
2250 store_api::region_request::EnterStagingRequest {
2251 partition_directive:
2252 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2253 },
2254 ),
2255 });
2256
2257 let pending_ddls = scheduler
2258 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2259 .await;
2260
2261 assert_eq!(pending_ddls.len(), 1);
2262 assert!(!scheduler.has_pending_ddls(region_id));
2263 assert!(!scheduler.region_status.contains_key(®ion_id));
2264 assert_eq!(job_scheduler.num_jobs(), 0);
2265 }
2266
2267 #[tokio::test]
2268 async fn test_on_compaction_finished_replays_pending_ddl_after_manual_noop() {
2269 let env = SchedulerEnv::new().await;
2270 let (tx, _rx) = mpsc::channel(4);
2271 let mut scheduler = env.mock_compaction_scheduler(tx);
2272 let builder = VersionControlBuilder::new();
2273 let version_control = Arc::new(builder.build());
2274 let region_id = builder.region_id();
2275 let manifest_ctx = env
2276 .mock_manifest_context(version_control.current().version.metadata.clone())
2277 .await;
2278 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2279
2280 let (manual_tx, manual_rx) = oneshot::channel();
2281 let mut status =
2282 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2283 status.start_local_task();
2284 status.set_pending_request(PendingCompaction {
2285 options: compact_request::Options::Regular(Default::default()),
2286 waiter: OptionOutputTx::from(manual_tx),
2287 max_parallelism: 1,
2288 });
2289 scheduler.region_status.insert(region_id, status);
2290
2291 let (ddl_tx, _ddl_rx) = oneshot::channel();
2292 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2293 region_id,
2294 sender: OptionOutputTx::from(ddl_tx),
2295 request: crate::request::DdlRequest::EnterStaging(
2296 store_api::region_request::EnterStagingRequest {
2297 partition_directive:
2298 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2299 },
2300 ),
2301 });
2302
2303 let pending_ddls = scheduler
2304 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2305 .await;
2306
2307 assert_eq!(pending_ddls.len(), 1);
2308 assert!(!scheduler.region_status.contains_key(®ion_id));
2309 assert_eq!(manual_rx.await.unwrap().unwrap(), 0);
2310 }
2311
2312 #[tokio::test]
2313 async fn test_on_compaction_finished_returns_empty_when_region_absent() {
2314 let env = SchedulerEnv::new().await;
2315 let (tx, _rx) = mpsc::channel(4);
2316 let mut scheduler = env.mock_compaction_scheduler(tx);
2317 let builder = VersionControlBuilder::new();
2318 let region_id = builder.region_id();
2319 let version_control = Arc::new(builder.build());
2320 let manifest_ctx = env
2321 .mock_manifest_context(version_control.current().version.metadata.clone())
2322 .await;
2323 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2324
2325 let pending_ddls = scheduler
2326 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2327 .await;
2328
2329 assert!(pending_ddls.is_empty());
2330 }
2331
2332 #[tokio::test]
2333 async fn test_on_compaction_finished_manual_schedule_error_cleans_status() {
2334 let env = SchedulerEnv::new()
2335 .await
2336 .scheduler(Arc::new(FailingScheduler));
2337 let (tx, _rx) = mpsc::channel(4);
2338 let mut scheduler = env.mock_compaction_scheduler(tx);
2339 let mut builder = VersionControlBuilder::new();
2340 let end = 1000 * 1000;
2341 let version_control = Arc::new(
2342 builder
2343 .push_l0_file(0, end)
2344 .push_l0_file(10, end)
2345 .push_l0_file(50, end)
2346 .push_l0_file(80, end)
2347 .push_l0_file(90, end)
2348 .build(),
2349 );
2350 let region_id = builder.region_id();
2351 let manifest_ctx = env
2352 .mock_manifest_context(version_control.current().version.metadata.clone())
2353 .await;
2354 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2355
2356 let (manual_tx, manual_rx) = oneshot::channel();
2357 let mut status =
2358 CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone());
2359 status.start_local_task();
2360 status.set_pending_request(PendingCompaction {
2361 options: compact_request::Options::Regular(Default::default()),
2362 waiter: OptionOutputTx::from(manual_tx),
2363 max_parallelism: 1,
2364 });
2365 scheduler.region_status.insert(region_id, status);
2366
2367 let (ddl_tx, ddl_rx) = oneshot::channel();
2368 scheduler.add_ddl_request_to_pending(SenderDdlRequest {
2369 region_id,
2370 sender: OptionOutputTx::from(ddl_tx),
2371 request: crate::request::DdlRequest::EnterStaging(
2372 store_api::region_request::EnterStagingRequest {
2373 partition_directive:
2374 store_api::region_request::StagingPartitionDirective::RejectAllWrites,
2375 },
2376 ),
2377 });
2378
2379 let pending_ddls = scheduler
2380 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2381 .await;
2382
2383 assert!(pending_ddls.is_empty());
2384 assert!(!scheduler.region_status.contains_key(®ion_id));
2385 assert!(manual_rx.await.is_err());
2386 assert_matches!(ddl_rx.await.unwrap(), Err(_));
2387 }
2388
2389 #[tokio::test]
2390 async fn test_on_compaction_finished_next_schedule_noop_removes_status() {
2391 let env = SchedulerEnv::new().await;
2392 let (tx, _rx) = mpsc::channel(4);
2393 let mut scheduler = env.mock_compaction_scheduler(tx);
2394 let builder = VersionControlBuilder::new();
2395 let version_control = Arc::new(builder.build());
2396 let region_id = builder.region_id();
2397 let manifest_ctx = env
2398 .mock_manifest_context(version_control.current().version.metadata.clone())
2399 .await;
2400 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2401
2402 scheduler.region_status.insert(
2403 region_id,
2404 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2405 );
2406 scheduler
2407 .region_status
2408 .get_mut(®ion_id)
2409 .unwrap()
2410 .start_local_task();
2411
2412 let pending_ddls = scheduler
2413 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2414 .await;
2415
2416 assert!(pending_ddls.is_empty());
2417 assert!(scheduler.region_status.contains_key(®ion_id));
2418
2419 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2420 let scheduled = scheduler
2423 .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
2424 .await;
2425 assert!(!scheduled);
2426 assert!(!scheduler.region_status.contains_key(®ion_id));
2427 }
2428
2429 #[tokio::test]
2430 async fn test_on_compaction_finished_next_schedule_error_cleans_status() {
2431 let env = SchedulerEnv::new()
2432 .await
2433 .scheduler(Arc::new(FailingScheduler));
2434 let (tx, _rx) = mpsc::channel(4);
2435 let mut scheduler = env.mock_compaction_scheduler(tx);
2436 let mut builder = VersionControlBuilder::new();
2437 let end = 1000 * 1000;
2438 let version_control = Arc::new(
2439 builder
2440 .push_l0_file(0, end)
2441 .push_l0_file(10, end)
2442 .push_l0_file(50, end)
2443 .push_l0_file(80, end)
2444 .push_l0_file(90, end)
2445 .build(),
2446 );
2447 let region_id = builder.region_id();
2448 let manifest_ctx = env
2449 .mock_manifest_context(version_control.current().version.metadata.clone())
2450 .await;
2451 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2452
2453 scheduler.region_status.insert(
2454 region_id,
2455 CompactionStatus::new(region_id, version_control, env.access_layer.clone()),
2456 );
2457 scheduler
2458 .region_status
2459 .get_mut(®ion_id)
2460 .unwrap()
2461 .start_local_task();
2462
2463 let pending_ddls = scheduler
2464 .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager)
2465 .await;
2466
2467 assert!(pending_ddls.is_empty());
2468 assert!(scheduler.region_status.contains_key(®ion_id));
2469
2470 let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
2471 let scheduled = scheduler
2473 .schedule_next_compaction(region_id, &manifest_ctx, schema_metadata_manager)
2474 .await;
2475 assert!(!scheduled);
2476 assert!(!scheduler.region_status.contains_key(®ion_id));
2477 }
2478
2479 #[tokio::test]
2480 async fn test_concurrent_memory_competition() {
2481 let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); let barrier = Arc::new(Barrier::new(3));
2483 let mut handles = vec![];
2484
2485 for _i in 0..3 {
2487 let mgr = manager.clone();
2488 let bar = barrier.clone();
2489 let handle = tokio::spawn(async move {
2490 bar.wait().await; mgr.try_acquire(2 * 1024 * 1024)
2492 });
2493 handles.push(handle);
2494 }
2495
2496 let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
2497 .await
2498 .into_iter()
2499 .map(|r| r.unwrap())
2500 .collect();
2501
2502 let succeeded = results.iter().filter(|r| r.is_some()).count();
2504 let failed = results.iter().filter(|r| r.is_none()).count();
2505
2506 assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
2507 assert_eq!(failed, 2, "Expected 2 tasks to fail");
2508
2509 drop(results);
2511 assert_eq!(manager.used_bytes(), 0);
2512 }
2513}