1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{
34 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
35};
36use crate::cache::{CacheManager, CacheManagerRef};
37use crate::compaction::picker::{PickerOutput, new_picker};
38use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_ttl};
39use crate::config::MitoConfig;
40use crate::error::{
41 EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
42};
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
45use crate::metrics;
46use crate::read::{FlatSource, Source};
47use crate::region::options::RegionOptions;
48use crate::region::version::VersionRef;
49use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
50use crate::schedule::scheduler::LocalScheduler;
51use crate::sst::FormatType;
52use crate::sst::file::FileMeta;
53use crate::sst::file_purger::LocalFilePurger;
54use crate::sst::index::intermediate::IntermediateManager;
55use crate::sst::index::puffin_manager::PuffinManagerFactory;
56use crate::sst::location::region_dir_from_table_dir;
57use crate::sst::parquet::WriteOptions;
58use crate::sst::version::{SstVersion, SstVersionRef};
59
60#[derive(Clone)]
62pub struct CompactionVersion {
63 pub(crate) metadata: RegionMetadataRef,
68 pub(crate) options: RegionOptions,
70 pub(crate) ssts: SstVersionRef,
72 pub(crate) compaction_time_window: Option<Duration>,
74}
75
76impl From<VersionRef> for CompactionVersion {
77 fn from(value: VersionRef) -> Self {
78 Self {
79 metadata: value.metadata.clone(),
80 options: value.options.clone(),
81 ssts: value.ssts.clone(),
82 compaction_time_window: value.compaction_time_window,
83 }
84 }
85}
86
87#[derive(Clone)]
90pub struct CompactionRegion {
91 pub region_id: RegionId,
92 pub region_options: RegionOptions,
93
94 pub(crate) engine_config: Arc<MitoConfig>,
95 pub(crate) region_metadata: RegionMetadataRef,
96 pub(crate) cache_manager: CacheManagerRef,
97 pub access_layer: AccessLayerRef,
99 pub(crate) manifest_ctx: Arc<ManifestContext>,
100 pub(crate) current_version: CompactionVersion,
101 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
102 pub(crate) ttl: Option<TimeToLive>,
103
104 pub max_parallelism: usize,
109}
110
111#[derive(Debug, Clone)]
113pub struct OpenCompactionRegionRequest {
114 pub region_id: RegionId,
115 pub table_dir: String,
116 pub path_type: PathType,
117 pub region_options: RegionOptions,
118 pub max_parallelism: usize,
119}
120
121pub async fn open_compaction_region(
124 req: &OpenCompactionRegionRequest,
125 mito_config: &MitoConfig,
126 object_store_manager: ObjectStoreManagerRef,
127 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
128) -> Result<CompactionRegion> {
129 let object_store = {
130 let name = &req.region_options.storage;
131 if let Some(name) = name {
132 object_store_manager
133 .find(name)
134 .with_context(|| ObjectStoreNotFoundSnafu {
135 object_store: name.clone(),
136 })?
137 } else {
138 object_store_manager.default_object_store()
139 }
140 };
141
142 let access_layer = {
143 let puffin_manager_factory = PuffinManagerFactory::new(
144 &mito_config.index.aux_path,
145 mito_config.index.staging_size.as_bytes(),
146 Some(mito_config.index.write_buffer_size.as_bytes() as _),
147 mito_config.index.staging_ttl,
148 )
149 .await?;
150 let intermediate_manager =
151 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
152
153 Arc::new(AccessLayer::new(
154 &req.table_dir,
155 req.path_type,
156 object_store.clone(),
157 puffin_manager_factory,
158 intermediate_manager,
159 ))
160 };
161
162 let manifest_manager = {
163 let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
164 let region_manifest_options =
165 RegionManifestOptions::new(mito_config, ®ion_dir, object_store);
166
167 RegionManifestManager::open(region_manifest_options, &Default::default())
168 .await?
169 .with_context(|| EmptyRegionDirSnafu {
170 region_id: req.region_id,
171 region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
172 })?
173 };
174
175 let manifest = manifest_manager.manifest();
176 let region_metadata = manifest.metadata.clone();
177 let manifest_ctx = Arc::new(ManifestContext::new(
178 manifest_manager,
179 RegionRoleState::Leader(RegionLeaderState::Writable),
180 ));
181
182 let file_purger = {
183 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
184 Arc::new(LocalFilePurger::new(
185 purge_scheduler.clone(),
186 access_layer.clone(),
187 None,
188 ))
189 };
190
191 let current_version = {
192 let mut ssts = SstVersion::new();
193 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
194 CompactionVersion {
195 metadata: region_metadata.clone(),
196 options: req.region_options.clone(),
197 ssts: Arc::new(ssts),
198 compaction_time_window: manifest.compaction_time_window,
199 }
200 };
201
202 let ttl = match ttl_provider {
203 Either::Left(ttl) => ttl,
205 Either::Right(schema_metadata_manager) => find_ttl(
207 req.region_id.table_id(),
208 current_version.options.ttl,
209 &schema_metadata_manager,
210 )
211 .await
212 .unwrap_or_else(|e| {
213 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
214 TimeToLive::default()
215 }),
216 };
217
218 Ok(CompactionRegion {
219 region_id: req.region_id,
220 region_options: req.region_options.clone(),
221 engine_config: Arc::new(mito_config.clone()),
222 region_metadata: region_metadata.clone(),
223 cache_manager: Arc::new(CacheManager::default()),
224 access_layer,
225 manifest_ctx,
226 current_version,
227 file_purger: Some(file_purger),
228 ttl: Some(ttl),
229 max_parallelism: req.max_parallelism,
230 })
231}
232
233impl CompactionRegion {
234 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
236 self.file_purger.clone()
237 }
238
239 pub async fn stop_purger_scheduler(&self) -> Result<()> {
241 if let Some(file_purger) = &self.file_purger {
242 file_purger.stop_scheduler().await
243 } else {
244 Ok(())
245 }
246 }
247}
248
249#[derive(Default, Clone, Debug, Serialize, Deserialize)]
251pub struct MergeOutput {
252 pub files_to_add: Vec<FileMeta>,
253 pub files_to_remove: Vec<FileMeta>,
254 pub compaction_time_window: Option<i64>,
255}
256
257impl MergeOutput {
258 pub fn is_empty(&self) -> bool {
259 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
260 }
261
262 pub fn input_file_size(&self) -> u64 {
263 self.files_to_remove.iter().map(|f| f.file_size).sum()
264 }
265
266 pub fn output_file_size(&self) -> u64 {
267 self.files_to_add.iter().map(|f| f.file_size).sum()
268 }
269}
270
271#[async_trait::async_trait]
273pub trait Compactor: Send + Sync + 'static {
274 async fn merge_ssts(
276 &self,
277 compaction_region: &CompactionRegion,
278 picker_output: PickerOutput,
279 ) -> Result<MergeOutput>;
280
281 async fn update_manifest(
283 &self,
284 compaction_region: &CompactionRegion,
285 merge_output: MergeOutput,
286 ) -> Result<RegionEdit>;
287
288 async fn compact(
290 &self,
291 compaction_region: &CompactionRegion,
292 compact_request_options: compact_request::Options,
293 ) -> Result<()>;
294}
295
296pub struct DefaultCompactor;
298
299impl DefaultCompactor {
300 async fn merge_single_output(
302 compaction_region: CompactionRegion,
303 output: CompactionOutput,
304 write_opts: WriteOptions,
305 ) -> Result<Vec<FileMeta>> {
306 let region_id = compaction_region.region_id;
307 let storage = compaction_region.region_options.storage.clone();
308 let index_options = compaction_region
309 .current_version
310 .options
311 .index_options
312 .clone();
313 let append_mode = compaction_region.current_version.options.append_mode;
314 let merge_mode = compaction_region.current_version.options.merge_mode();
315 let flat_format = compaction_region
316 .region_options
317 .sst_format
318 .map(|format| format == FormatType::Flat)
319 .unwrap_or(
320 compaction_region
321 .engine_config
322 .default_experimental_flat_format,
323 );
324
325 let index_config = compaction_region.engine_config.index.clone();
326 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
327 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
328 let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
329
330 let input_file_names = output
331 .inputs
332 .iter()
333 .map(|f| f.file_id().to_string())
334 .join(",");
335 let max_sequence = output
336 .inputs
337 .iter()
338 .map(|f| f.meta_ref().sequence)
339 .max()
340 .flatten();
341 let builder = CompactionSstReaderBuilder {
342 metadata: compaction_region.region_metadata.clone(),
343 sst_layer: compaction_region.access_layer.clone(),
344 cache: compaction_region.cache_manager.clone(),
345 inputs: &output.inputs,
346 append_mode,
347 filter_deleted: output.filter_deleted,
348 time_range: output.output_time_range,
349 merge_mode,
350 };
351 let source = if flat_format {
352 let reader = builder.build_flat_sst_reader().await?;
353 Either::Right(FlatSource::Stream(reader))
354 } else {
355 let reader = builder.build_sst_reader().await?;
356 Either::Left(Source::Reader(reader))
357 };
358 let mut metrics = Metrics::new(WriteType::Compaction);
359 let region_metadata = compaction_region.region_metadata.clone();
360 let sst_infos = compaction_region
361 .access_layer
362 .write_sst(
363 SstWriteRequest {
364 op_type: OperationType::Compact,
365 metadata: region_metadata.clone(),
366 source,
367 cache_manager: compaction_region.cache_manager.clone(),
368 storage,
369 max_sequence: max_sequence.map(NonZero::get),
370 index_options,
371 index_config,
372 inverted_index_config,
373 fulltext_index_config,
374 bloom_filter_index_config,
375 },
376 &write_opts,
377 &mut metrics,
378 )
379 .await?;
380 let partition_expr = match ®ion_metadata.partition_expr {
382 None => None,
383 Some(json_str) if json_str.is_empty() => None,
384 Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
385 InvalidPartitionExprSnafu {
386 expr: json_str.clone(),
387 }
388 })?,
389 };
390
391 let output_files = sst_infos
392 .into_iter()
393 .map(|sst_info| FileMeta {
394 region_id,
395 file_id: sst_info.file_id,
396 time_range: sst_info.time_range,
397 level: output.output_level,
398 file_size: sst_info.file_size,
399 available_indexes: sst_info.index_metadata.build_available_indexes(),
400 indexes: sst_info.index_metadata.build_indexes(),
401 index_file_size: sst_info.index_metadata.file_size,
402 index_file_id: None,
403 num_rows: sst_info.num_rows as u64,
404 num_row_groups: sst_info.num_row_groups,
405 sequence: max_sequence,
406 partition_expr: partition_expr.clone(),
407 num_series: sst_info.num_series,
408 })
409 .collect::<Vec<_>>();
410 let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
411 info!(
412 "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
413 region_id, input_file_names, output_file_names, flat_format, metrics
414 );
415 metrics.observe();
416 Ok(output_files)
417 }
418}
419
420#[async_trait::async_trait]
421impl Compactor for DefaultCompactor {
422 async fn merge_ssts(
423 &self,
424 compaction_region: &CompactionRegion,
425 mut picker_output: PickerOutput,
426 ) -> Result<MergeOutput> {
427 let mut futs = Vec::with_capacity(picker_output.outputs.len());
428 let mut compacted_inputs =
429 Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
430 let internal_parallelism = compaction_region.max_parallelism.max(1);
431 let compaction_time_window = picker_output.time_window_size;
432
433 for output in picker_output.outputs.drain(..) {
434 let inputs_to_remove: Vec<_> =
435 output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
436 compacted_inputs.extend(inputs_to_remove.iter().cloned());
437 let write_opts = WriteOptions {
438 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
439 max_file_size: picker_output.max_file_size,
440 ..Default::default()
441 };
442 futs.push(Self::merge_single_output(
443 compaction_region.clone(),
444 output,
445 write_opts,
446 ));
447 }
448 let mut output_files = Vec::with_capacity(futs.len());
449 while !futs.is_empty() {
450 let mut task_chunk = Vec::with_capacity(internal_parallelism);
451 for _ in 0..internal_parallelism {
452 if let Some(task) = futs.pop() {
453 task_chunk.push(common_runtime::spawn_compact(task));
454 }
455 }
456 let metas = futures::future::try_join_all(task_chunk)
457 .await
458 .context(JoinSnafu)?
459 .into_iter()
460 .collect::<Result<Vec<Vec<_>>>>()?;
461 output_files.extend(metas.into_iter().flatten());
462 }
463
464 let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
467 inputs.extend(
468 picker_output
469 .expired_ssts
470 .iter()
471 .map(|f| f.meta_ref().clone()),
472 );
473
474 Ok(MergeOutput {
475 files_to_add: output_files,
476 files_to_remove: inputs,
477 compaction_time_window: Some(compaction_time_window),
478 })
479 }
480
481 async fn update_manifest(
482 &self,
483 compaction_region: &CompactionRegion,
484 merge_output: MergeOutput,
485 ) -> Result<RegionEdit> {
486 let edit = RegionEdit {
488 files_to_add: merge_output.files_to_add,
489 files_to_remove: merge_output.files_to_remove,
490 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
492 compaction_time_window: merge_output
493 .compaction_time_window
494 .map(|seconds| Duration::from_secs(seconds as u64)),
495 flushed_entry_id: None,
496 flushed_sequence: None,
497 committed_sequence: None,
498 };
499
500 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
501 compaction_region
503 .manifest_ctx
504 .update_manifest(RegionLeaderState::Writable, action_list, false)
505 .await?;
506
507 Ok(edit)
508 }
509
510 async fn compact(
513 &self,
514 compaction_region: &CompactionRegion,
515 compact_request_options: compact_request::Options,
516 ) -> Result<()> {
517 let picker_output = {
518 let picker_output = new_picker(
519 &compact_request_options,
520 &compaction_region.region_options.compaction,
521 compaction_region.region_options.append_mode,
522 None,
523 )
524 .pick(compaction_region);
525
526 if let Some(picker_output) = picker_output {
527 picker_output
528 } else {
529 info!(
530 "No files to compact for region_id: {}",
531 compaction_region.region_id
532 );
533 return Ok(());
534 }
535 };
536
537 let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
538 if merge_output.is_empty() {
539 info!(
540 "No files to compact for region_id: {}",
541 compaction_region.region_id
542 );
543 return Ok(());
544 }
545
546 metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
547 metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
548 self.update_manifest(compaction_region, merge_output)
549 .await?;
550
551 Ok(())
552 }
553}