1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::region_request::PathType;
30use store_api::storage::RegionId;
31
32use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
33use crate::cache::{CacheManager, CacheManagerRef};
34use crate::compaction::picker::{new_picker, PickerOutput};
35use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
36use crate::config::MitoConfig;
37use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
38use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
39use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
40use crate::manifest::storage::manifest_compress_type;
41use crate::metrics;
42use crate::read::Source;
43use crate::region::opener::new_manifest_dir;
44use crate::region::options::RegionOptions;
45use crate::region::version::VersionRef;
46use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
47use crate::schedule::scheduler::LocalScheduler;
48use crate::sst::file::FileMeta;
49use crate::sst::file_purger::LocalFilePurger;
50use crate::sst::index::intermediate::IntermediateManager;
51use crate::sst::index::puffin_manager::PuffinManagerFactory;
52use crate::sst::location::region_dir_from_table_dir;
53use crate::sst::parquet::WriteOptions;
54use crate::sst::version::{SstVersion, SstVersionRef};
55
56#[derive(Clone)]
58pub struct CompactionVersion {
59 pub(crate) metadata: RegionMetadataRef,
64 pub(crate) options: RegionOptions,
66 pub(crate) ssts: SstVersionRef,
68 pub(crate) compaction_time_window: Option<Duration>,
70}
71
72impl From<VersionRef> for CompactionVersion {
73 fn from(value: VersionRef) -> Self {
74 Self {
75 metadata: value.metadata.clone(),
76 options: value.options.clone(),
77 ssts: value.ssts.clone(),
78 compaction_time_window: value.compaction_time_window,
79 }
80 }
81}
82
83#[derive(Clone)]
86pub struct CompactionRegion {
87 pub region_id: RegionId,
88 pub region_options: RegionOptions,
89
90 pub(crate) engine_config: Arc<MitoConfig>,
91 pub(crate) region_metadata: RegionMetadataRef,
92 pub(crate) cache_manager: CacheManagerRef,
93 pub access_layer: AccessLayerRef,
95 pub(crate) manifest_ctx: Arc<ManifestContext>,
96 pub(crate) current_version: CompactionVersion,
97 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
98 pub(crate) ttl: Option<TimeToLive>,
99
100 pub max_parallelism: usize,
105}
106
107#[derive(Debug, Clone)]
109pub struct OpenCompactionRegionRequest {
110 pub region_id: RegionId,
111 pub table_dir: String,
112 pub path_type: PathType,
113 pub region_options: RegionOptions,
114 pub max_parallelism: usize,
115}
116
117pub async fn open_compaction_region(
120 req: &OpenCompactionRegionRequest,
121 mito_config: &MitoConfig,
122 object_store_manager: ObjectStoreManagerRef,
123 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
124) -> Result<CompactionRegion> {
125 let object_store = {
126 let name = &req.region_options.storage;
127 if let Some(name) = name {
128 object_store_manager
129 .find(name)
130 .context(ObjectStoreNotFoundSnafu {
131 object_store: name.to_string(),
132 })?
133 } else {
134 object_store_manager.default_object_store()
135 }
136 };
137
138 let access_layer = {
139 let puffin_manager_factory = PuffinManagerFactory::new(
140 &mito_config.index.aux_path,
141 mito_config.index.staging_size.as_bytes(),
142 Some(mito_config.index.write_buffer_size.as_bytes() as _),
143 mito_config.index.staging_ttl,
144 )
145 .await?;
146 let intermediate_manager =
147 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
148
149 Arc::new(AccessLayer::new(
150 &req.table_dir,
151 req.path_type,
152 object_store.clone(),
153 puffin_manager_factory,
154 intermediate_manager,
155 ))
156 };
157
158 let manifest_manager = {
159 let region_manifest_options = RegionManifestOptions {
160 manifest_dir: new_manifest_dir(®ion_dir_from_table_dir(
161 &req.table_dir,
162 req.region_id,
163 req.path_type,
164 )),
165 object_store: object_store.clone(),
166 compress_type: manifest_compress_type(mito_config.compress_manifest),
167 checkpoint_distance: mito_config.manifest_checkpoint_distance,
168 remove_file_options: RemoveFileOptions {
169 keep_count: mito_config.experimental_manifest_keep_removed_file_count,
170 keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
171 },
172 };
173
174 RegionManifestManager::open(
175 region_manifest_options,
176 Default::default(),
177 Default::default(),
178 )
179 .await?
180 .context(EmptyRegionDirSnafu {
181 region_id: req.region_id,
182 region_dir: ®ion_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
183 })?
184 };
185
186 let manifest = manifest_manager.manifest();
187 let region_metadata = manifest.metadata.clone();
188 let manifest_ctx = Arc::new(ManifestContext::new(
189 manifest_manager,
190 RegionRoleState::Leader(RegionLeaderState::Writable),
191 ));
192
193 let file_purger = {
194 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
195 Arc::new(LocalFilePurger::new(
196 purge_scheduler.clone(),
197 access_layer.clone(),
198 None,
199 ))
200 };
201
202 let current_version = {
203 let mut ssts = SstVersion::new();
204 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
205 CompactionVersion {
206 metadata: region_metadata.clone(),
207 options: req.region_options.clone(),
208 ssts: Arc::new(ssts),
209 compaction_time_window: manifest.compaction_time_window,
210 }
211 };
212
213 let ttl = match ttl_provider {
214 Either::Left(ttl) => ttl,
216 Either::Right(schema_metadata_manager) => find_ttl(
218 req.region_id.table_id(),
219 current_version.options.ttl,
220 &schema_metadata_manager,
221 )
222 .await
223 .unwrap_or_else(|e| {
224 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
225 TimeToLive::default()
226 }),
227 };
228
229 Ok(CompactionRegion {
230 region_id: req.region_id,
231 region_options: req.region_options.clone(),
232 engine_config: Arc::new(mito_config.clone()),
233 region_metadata: region_metadata.clone(),
234 cache_manager: Arc::new(CacheManager::default()),
235 access_layer,
236 manifest_ctx,
237 current_version,
238 file_purger: Some(file_purger),
239 ttl: Some(ttl),
240 max_parallelism: req.max_parallelism,
241 })
242}
243
244impl CompactionRegion {
245 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
247 self.file_purger.clone()
248 }
249
250 pub async fn stop_purger_scheduler(&self) -> Result<()> {
252 if let Some(file_purger) = &self.file_purger {
253 file_purger.stop_scheduler().await
254 } else {
255 Ok(())
256 }
257 }
258}
259
260#[derive(Default, Clone, Debug, Serialize, Deserialize)]
262pub struct MergeOutput {
263 pub files_to_add: Vec<FileMeta>,
264 pub files_to_remove: Vec<FileMeta>,
265 pub compaction_time_window: Option<i64>,
266}
267
268impl MergeOutput {
269 pub fn is_empty(&self) -> bool {
270 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
271 }
272
273 pub fn input_file_size(&self) -> u64 {
274 self.files_to_remove.iter().map(|f| f.file_size).sum()
275 }
276
277 pub fn output_file_size(&self) -> u64 {
278 self.files_to_add.iter().map(|f| f.file_size).sum()
279 }
280}
281
282#[async_trait::async_trait]
284pub trait Compactor: Send + Sync + 'static {
285 async fn merge_ssts(
287 &self,
288 compaction_region: &CompactionRegion,
289 picker_output: PickerOutput,
290 ) -> Result<MergeOutput>;
291
292 async fn update_manifest(
294 &self,
295 compaction_region: &CompactionRegion,
296 merge_output: MergeOutput,
297 ) -> Result<RegionEdit>;
298
299 async fn compact(
301 &self,
302 compaction_region: &CompactionRegion,
303 compact_request_options: compact_request::Options,
304 ) -> Result<()>;
305}
306
307pub struct DefaultCompactor;
309
310#[async_trait::async_trait]
311impl Compactor for DefaultCompactor {
312 async fn merge_ssts(
313 &self,
314 compaction_region: &CompactionRegion,
315 mut picker_output: PickerOutput,
316 ) -> Result<MergeOutput> {
317 let mut futs = Vec::with_capacity(picker_output.outputs.len());
318 let mut compacted_inputs =
319 Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
320 let internal_parallelism = compaction_region.max_parallelism.max(1);
321
322 for output in picker_output.outputs.drain(..) {
323 compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
324 let write_opts = WriteOptions {
325 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
326 max_file_size: picker_output.max_file_size,
327 ..Default::default()
328 };
329
330 let region_metadata = compaction_region.region_metadata.clone();
331 let sst_layer = compaction_region.access_layer.clone();
332 let region_id = compaction_region.region_id;
333 let cache_manager = compaction_region.cache_manager.clone();
334 let storage = compaction_region.region_options.storage.clone();
335 let index_options = compaction_region
336 .current_version
337 .options
338 .index_options
339 .clone();
340 let append_mode = compaction_region.current_version.options.append_mode;
341 let merge_mode = compaction_region.current_version.options.merge_mode();
342 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
343 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
344 let bloom_filter_index_config =
345 compaction_region.engine_config.bloom_filter_index.clone();
346 let max_sequence = output
347 .inputs
348 .iter()
349 .map(|f| f.meta_ref().sequence)
350 .max()
351 .flatten();
352 futs.push(async move {
353 let input_file_names = output
354 .inputs
355 .iter()
356 .map(|f| f.file_id().to_string())
357 .join(",");
358 let reader = CompactionSstReaderBuilder {
359 metadata: region_metadata.clone(),
360 sst_layer: sst_layer.clone(),
361 cache: cache_manager.clone(),
362 inputs: &output.inputs,
363 append_mode,
364 filter_deleted: output.filter_deleted,
365 time_range: output.output_time_range,
366 merge_mode,
367 }
368 .build_sst_reader()
369 .await?;
370 let (sst_infos, metrics) = sst_layer
371 .write_sst(
372 SstWriteRequest {
373 op_type: OperationType::Compact,
374 metadata: region_metadata,
375 source: Source::Reader(reader),
376 cache_manager,
377 storage,
378 max_sequence: max_sequence.map(NonZero::get),
379 index_options,
380 inverted_index_config,
381 fulltext_index_config,
382 bloom_filter_index_config,
383 },
384 &write_opts,
385 WriteType::Compaction,
386 )
387 .await?;
388 let output_files = sst_infos
389 .into_iter()
390 .map(|sst_info| FileMeta {
391 region_id,
392 file_id: sst_info.file_id,
393 time_range: sst_info.time_range,
394 level: output.output_level,
395 file_size: sst_info.file_size,
396 available_indexes: sst_info.index_metadata.build_available_indexes(),
397 index_file_size: sst_info.index_metadata.file_size,
398 num_rows: sst_info.num_rows as u64,
399 num_row_groups: sst_info.num_row_groups,
400 sequence: max_sequence,
401 })
402 .collect::<Vec<_>>();
403 let output_file_names =
404 output_files.iter().map(|f| f.file_id.to_string()).join(",");
405 info!(
406 "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
407 region_id, input_file_names, output_file_names, metrics
408 );
409 metrics.observe();
410 Ok(output_files)
411 });
412 }
413 let mut output_files = Vec::with_capacity(futs.len());
414 while !futs.is_empty() {
415 let mut task_chunk = Vec::with_capacity(internal_parallelism);
416 for _ in 0..internal_parallelism {
417 if let Some(task) = futs.pop() {
418 task_chunk.push(common_runtime::spawn_compact(task));
419 }
420 }
421 let metas = futures::future::try_join_all(task_chunk)
422 .await
423 .context(JoinSnafu)?
424 .into_iter()
425 .collect::<Result<Vec<Vec<_>>>>()?;
426 output_files.extend(metas.into_iter().flatten());
427 }
428
429 let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
430 inputs.extend(
431 picker_output
432 .expired_ssts
433 .iter()
434 .map(|f| f.meta_ref().clone()),
435 );
436
437 Ok(MergeOutput {
438 files_to_add: output_files,
439 files_to_remove: inputs,
440 compaction_time_window: Some(picker_output.time_window_size),
441 })
442 }
443
444 async fn update_manifest(
445 &self,
446 compaction_region: &CompactionRegion,
447 merge_output: MergeOutput,
448 ) -> Result<RegionEdit> {
449 let edit = RegionEdit {
451 files_to_add: merge_output.files_to_add,
452 files_to_remove: merge_output.files_to_remove,
453 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
455 compaction_time_window: merge_output
456 .compaction_time_window
457 .map(|seconds| Duration::from_secs(seconds as u64)),
458 flushed_entry_id: None,
459 flushed_sequence: None,
460 };
461
462 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
463 compaction_region
465 .manifest_ctx
466 .update_manifest(RegionLeaderState::Writable, action_list)
467 .await?;
468
469 Ok(edit)
470 }
471
472 async fn compact(
475 &self,
476 compaction_region: &CompactionRegion,
477 compact_request_options: compact_request::Options,
478 ) -> Result<()> {
479 let picker_output = {
480 let picker_output = new_picker(
481 &compact_request_options,
482 &compaction_region.region_options.compaction,
483 compaction_region.region_options.append_mode,
484 )
485 .pick(compaction_region);
486
487 if let Some(picker_output) = picker_output {
488 picker_output
489 } else {
490 info!(
491 "No files to compact for region_id: {}",
492 compaction_region.region_id
493 );
494 return Ok(());
495 }
496 };
497
498 let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
499 if merge_output.is_empty() {
500 info!(
501 "No files to compact for region_id: {}",
502 compaction_region.region_id
503 );
504 return Ok(());
505 }
506
507 metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
508 metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
509 self.update_manifest(compaction_region, merge_output)
510 .await?;
511
512 Ok(())
513 }
514}