1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use itertools::Itertools;
24use object_store::manager::ObjectStoreManagerRef;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::RegionId;
29
30use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
31use crate::cache::{CacheManager, CacheManagerRef};
32use crate::compaction::picker::{new_picker, PickerOutput};
33use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
34use crate::config::MitoConfig;
35use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
36use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
37use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
38use crate::manifest::storage::manifest_compress_type;
39use crate::metrics;
40use crate::read::Source;
41use crate::region::opener::new_manifest_dir;
42use crate::region::options::RegionOptions;
43use crate::region::version::VersionRef;
44use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
45use crate::schedule::scheduler::LocalScheduler;
46use crate::sst::file::FileMeta;
47use crate::sst::file_purger::LocalFilePurger;
48use crate::sst::index::intermediate::IntermediateManager;
49use crate::sst::index::puffin_manager::PuffinManagerFactory;
50use crate::sst::parquet::WriteOptions;
51use crate::sst::version::{SstVersion, SstVersionRef};
52
53#[derive(Clone)]
55pub struct CompactionVersion {
56 pub(crate) metadata: RegionMetadataRef,
61 pub(crate) options: RegionOptions,
63 pub(crate) ssts: SstVersionRef,
65 pub(crate) compaction_time_window: Option<Duration>,
67}
68
69impl From<VersionRef> for CompactionVersion {
70 fn from(value: VersionRef) -> Self {
71 Self {
72 metadata: value.metadata.clone(),
73 options: value.options.clone(),
74 ssts: value.ssts.clone(),
75 compaction_time_window: value.compaction_time_window,
76 }
77 }
78}
79
80#[derive(Clone)]
83pub struct CompactionRegion {
84 pub region_id: RegionId,
85 pub region_options: RegionOptions,
86 pub region_dir: String,
87
88 pub(crate) engine_config: Arc<MitoConfig>,
89 pub(crate) region_metadata: RegionMetadataRef,
90 pub(crate) cache_manager: CacheManagerRef,
91 pub(crate) access_layer: AccessLayerRef,
92 pub(crate) manifest_ctx: Arc<ManifestContext>,
93 pub(crate) current_version: CompactionVersion,
94 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
95 pub(crate) ttl: Option<TimeToLive>,
96
97 pub max_parallelism: usize,
102}
103
104#[derive(Debug, Clone)]
106pub struct OpenCompactionRegionRequest {
107 pub region_id: RegionId,
108 pub region_dir: String,
109 pub region_options: RegionOptions,
110 pub max_parallelism: usize,
111}
112
113pub async fn open_compaction_region(
116 req: &OpenCompactionRegionRequest,
117 mito_config: &MitoConfig,
118 object_store_manager: ObjectStoreManagerRef,
119 schema_metadata_manager: SchemaMetadataManagerRef,
120) -> Result<CompactionRegion> {
121 let object_store = {
122 let name = &req.region_options.storage;
123 if let Some(name) = name {
124 object_store_manager
125 .find(name)
126 .context(ObjectStoreNotFoundSnafu {
127 object_store: name.to_string(),
128 })?
129 } else {
130 object_store_manager.default_object_store()
131 }
132 };
133
134 let access_layer = {
135 let puffin_manager_factory = PuffinManagerFactory::new(
136 &mito_config.index.aux_path,
137 mito_config.index.staging_size.as_bytes(),
138 Some(mito_config.index.write_buffer_size.as_bytes() as _),
139 mito_config.index.staging_ttl,
140 )
141 .await?;
142 let intermediate_manager =
143 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
144
145 Arc::new(AccessLayer::new(
146 req.region_dir.as_str(),
147 object_store.clone(),
148 puffin_manager_factory,
149 intermediate_manager,
150 ))
151 };
152
153 let manifest_manager = {
154 let region_manifest_options = RegionManifestOptions {
155 manifest_dir: new_manifest_dir(req.region_dir.as_str()),
156 object_store: object_store.clone(),
157 compress_type: manifest_compress_type(mito_config.compress_manifest),
158 checkpoint_distance: mito_config.manifest_checkpoint_distance,
159 };
160
161 RegionManifestManager::open(
162 region_manifest_options,
163 Default::default(),
164 Default::default(),
165 )
166 .await?
167 .context(EmptyRegionDirSnafu {
168 region_id: req.region_id,
169 region_dir: req.region_dir.as_str(),
170 })?
171 };
172
173 let manifest = manifest_manager.manifest();
174 let region_metadata = manifest.metadata.clone();
175 let manifest_ctx = Arc::new(ManifestContext::new(
176 manifest_manager,
177 RegionRoleState::Leader(RegionLeaderState::Writable),
178 ));
179
180 let file_purger = {
181 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
182 Arc::new(LocalFilePurger::new(
183 purge_scheduler.clone(),
184 access_layer.clone(),
185 None,
186 ))
187 };
188
189 let current_version = {
190 let mut ssts = SstVersion::new();
191 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
192 CompactionVersion {
193 metadata: region_metadata.clone(),
194 options: req.region_options.clone(),
195 ssts: Arc::new(ssts),
196 compaction_time_window: manifest.compaction_time_window,
197 }
198 };
199
200 let ttl = find_ttl(
201 req.region_id.table_id(),
202 current_version.options.ttl,
203 &schema_metadata_manager,
204 )
205 .await
206 .unwrap_or_else(|e| {
207 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
208 TimeToLive::default()
209 });
210 Ok(CompactionRegion {
211 region_id: req.region_id,
212 region_options: req.region_options.clone(),
213 region_dir: req.region_dir.clone(),
214 engine_config: Arc::new(mito_config.clone()),
215 region_metadata: region_metadata.clone(),
216 cache_manager: Arc::new(CacheManager::default()),
217 access_layer,
218 manifest_ctx,
219 current_version,
220 file_purger: Some(file_purger),
221 ttl: Some(ttl),
222 max_parallelism: req.max_parallelism,
223 })
224}
225
226impl CompactionRegion {
227 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
228 self.file_purger.clone()
229 }
230}
231
232#[derive(Default, Clone, Debug, Serialize, Deserialize)]
234pub struct MergeOutput {
235 pub files_to_add: Vec<FileMeta>,
236 pub files_to_remove: Vec<FileMeta>,
237 pub compaction_time_window: Option<i64>,
238}
239
240impl MergeOutput {
241 pub fn is_empty(&self) -> bool {
242 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
243 }
244
245 pub fn input_file_size(&self) -> u64 {
246 self.files_to_remove.iter().map(|f| f.file_size).sum()
247 }
248
249 pub fn output_file_size(&self) -> u64 {
250 self.files_to_add.iter().map(|f| f.file_size).sum()
251 }
252}
253
254#[async_trait::async_trait]
256pub trait Compactor: Send + Sync + 'static {
257 async fn merge_ssts(
259 &self,
260 compaction_region: &CompactionRegion,
261 picker_output: PickerOutput,
262 ) -> Result<MergeOutput>;
263
264 async fn update_manifest(
266 &self,
267 compaction_region: &CompactionRegion,
268 merge_output: MergeOutput,
269 ) -> Result<RegionEdit>;
270
271 async fn compact(
273 &self,
274 compaction_region: &CompactionRegion,
275 compact_request_options: compact_request::Options,
276 ) -> Result<()>;
277}
278
279pub struct DefaultCompactor;
281
282#[async_trait::async_trait]
283impl Compactor for DefaultCompactor {
284 async fn merge_ssts(
285 &self,
286 compaction_region: &CompactionRegion,
287 mut picker_output: PickerOutput,
288 ) -> Result<MergeOutput> {
289 let mut futs = Vec::with_capacity(picker_output.outputs.len());
290 let mut compacted_inputs =
291 Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
292 let internal_parallelism = compaction_region.max_parallelism.max(1);
293
294 for output in picker_output.outputs.drain(..) {
295 compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
296 let write_opts = WriteOptions {
297 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
298 max_file_size: picker_output.max_file_size,
299 ..Default::default()
300 };
301
302 let region_metadata = compaction_region.region_metadata.clone();
303 let sst_layer = compaction_region.access_layer.clone();
304 let region_id = compaction_region.region_id;
305 let cache_manager = compaction_region.cache_manager.clone();
306 let storage = compaction_region.region_options.storage.clone();
307 let index_options = compaction_region
308 .current_version
309 .options
310 .index_options
311 .clone();
312 let append_mode = compaction_region.current_version.options.append_mode;
313 let merge_mode = compaction_region.current_version.options.merge_mode();
314 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
315 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
316 let bloom_filter_index_config =
317 compaction_region.engine_config.bloom_filter_index.clone();
318 let max_sequence = output
319 .inputs
320 .iter()
321 .map(|f| f.meta_ref().sequence)
322 .max()
323 .flatten();
324 futs.push(async move {
325 let input_file_names = output
326 .inputs
327 .iter()
328 .map(|f| f.file_id().to_string())
329 .join(",");
330 let reader = CompactionSstReaderBuilder {
331 metadata: region_metadata.clone(),
332 sst_layer: sst_layer.clone(),
333 cache: cache_manager.clone(),
334 inputs: &output.inputs,
335 append_mode,
336 filter_deleted: output.filter_deleted,
337 time_range: output.output_time_range,
338 merge_mode,
339 }
340 .build_sst_reader()
341 .await?;
342 let output_files = sst_layer
343 .write_sst(
344 SstWriteRequest {
345 op_type: OperationType::Compact,
346 metadata: region_metadata,
347 source: Source::Reader(reader),
348 cache_manager,
349 storage,
350 max_sequence: max_sequence.map(NonZero::get),
351 index_options,
352 inverted_index_config,
353 fulltext_index_config,
354 bloom_filter_index_config,
355 },
356 &write_opts,
357 )
358 .await?
359 .into_iter()
360 .map(|sst_info| FileMeta {
361 region_id,
362 file_id: sst_info.file_id,
363 time_range: sst_info.time_range,
364 level: output.output_level,
365 file_size: sst_info.file_size,
366 available_indexes: sst_info.index_metadata.build_available_indexes(),
367 index_file_size: sst_info.index_metadata.file_size,
368 num_rows: sst_info.num_rows as u64,
369 num_row_groups: sst_info.num_row_groups,
370 sequence: max_sequence,
371 })
372 .collect::<Vec<_>>();
373 let output_file_names =
374 output_files.iter().map(|f| f.file_id.to_string()).join(",");
375 info!(
376 "Region {} compaction inputs: [{}], outputs: [{}]",
377 region_id, input_file_names, output_file_names
378 );
379 Ok(output_files)
380 });
381 }
382 let mut output_files = Vec::with_capacity(futs.len());
383 while !futs.is_empty() {
384 let mut task_chunk = Vec::with_capacity(internal_parallelism);
385 for _ in 0..internal_parallelism {
386 if let Some(task) = futs.pop() {
387 task_chunk.push(common_runtime::spawn_compact(task));
388 }
389 }
390 let metas = futures::future::try_join_all(task_chunk)
391 .await
392 .context(JoinSnafu)?
393 .into_iter()
394 .collect::<Result<Vec<Vec<_>>>>()?;
395 output_files.extend(metas.into_iter().flatten());
396 }
397
398 let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
399 inputs.extend(
400 picker_output
401 .expired_ssts
402 .iter()
403 .map(|f| f.meta_ref().clone()),
404 );
405
406 Ok(MergeOutput {
407 files_to_add: output_files,
408 files_to_remove: inputs,
409 compaction_time_window: Some(picker_output.time_window_size),
410 })
411 }
412
413 async fn update_manifest(
414 &self,
415 compaction_region: &CompactionRegion,
416 merge_output: MergeOutput,
417 ) -> Result<RegionEdit> {
418 let edit = RegionEdit {
420 files_to_add: merge_output.files_to_add,
421 files_to_remove: merge_output.files_to_remove,
422 compaction_time_window: merge_output
423 .compaction_time_window
424 .map(|seconds| Duration::from_secs(seconds as u64)),
425 flushed_entry_id: None,
426 flushed_sequence: None,
427 };
428
429 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
430 compaction_region
432 .manifest_ctx
433 .update_manifest(RegionLeaderState::Writable, action_list)
434 .await?;
435
436 Ok(edit)
437 }
438
439 async fn compact(
442 &self,
443 compaction_region: &CompactionRegion,
444 compact_request_options: compact_request::Options,
445 ) -> Result<()> {
446 let picker_output = {
447 let picker_output = new_picker(
448 &compact_request_options,
449 &compaction_region.region_options.compaction,
450 compaction_region.region_options.append_mode,
451 )
452 .pick(compaction_region);
453
454 if let Some(picker_output) = picker_output {
455 picker_output
456 } else {
457 info!(
458 "No files to compact for region_id: {}",
459 compaction_region.region_id
460 );
461 return Ok(());
462 }
463 };
464
465 let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
466 if merge_output.is_empty() {
467 info!(
468 "No files to compact for region_id: {}",
469 compaction_region.region_id
470 );
471 return Ok(());
472 }
473
474 metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
475 metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
476 self.update_manifest(compaction_region, merge_output)
477 .await?;
478
479 Ok(())
480 }
481}