1use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Instant;
18
19use clap::Parser;
20use colored::Colorize;
21use datanode::config::RegionEngineConfig;
22use datanode::store;
23use either::Either;
24use mito2::access_layer::{
25 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
26};
27use mito2::cache::{CacheManager, CacheManagerRef};
28use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
29use mito2::read::Source;
30use mito2::sst::file::{FileHandle, FileMeta};
31use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
32use mito2::sst::index::intermediate::IntermediateManager;
33use mito2::sst::index::puffin_manager::PuffinManagerFactory;
34use mito2::sst::parquet::reader::ParquetReaderBuilder;
35use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
36use mito2::worker::write_cache_from_config;
37use object_store::ObjectStore;
38use regex::Regex;
39use snafu::OptionExt;
40use store_api::metadata::{RegionMetadata, RegionMetadataRef};
41use store_api::path_utils::region_name;
42use store_api::region_request::PathType;
43use store_api::storage::FileId;
44
45use crate::datanode::{StorageConfig, StorageConfigWrapper};
46use crate::error;
47
48#[derive(Debug, Parser)]
50pub struct ObjbenchCommand {
51 #[clap(long, value_name = "FILE")]
53 pub config: PathBuf,
54
55 #[clap(long, value_name = "PATH")]
57 pub source: String,
58
59 #[clap(short, long, default_value_t = false)]
61 pub verbose: bool,
62
63 #[clap(long, value_name = "FILE")]
65 pub pprof_file: Option<PathBuf>,
66}
67
68fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> {
69 let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
70 error::IllegalConfigSnafu {
71 msg: format!("failed to read config {}: {e}", config_path.display()),
72 }
73 .build()
74 })?;
75
76 let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
77 error::IllegalConfigSnafu {
78 msg: format!("failed to parse config {}: {e}", config_path.display()),
79 }
80 .build()
81 })?;
82
83 let storage_config = store_cfg.storage;
84 let mito_engine_config = store_cfg
85 .region_engine
86 .into_iter()
87 .filter_map(|c| {
88 if let RegionEngineConfig::Mito(mito) = c {
89 Some(mito)
90 } else {
91 None
92 }
93 })
94 .next()
95 .with_context(|| error::IllegalConfigSnafu {
96 msg: format!("Engine config not found in {:?}", config_path),
97 })?;
98 Ok((storage_config, mito_engine_config))
99}
100
101impl ObjbenchCommand {
102 pub async fn run(&self) -> error::Result<()> {
103 if self.verbose {
104 common_telemetry::init_default_ut_logging();
105 }
106
107 println!("{}", "Starting objbench with config:".cyan().bold());
108
109 let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?;
111
112 let object_store = build_object_store(&store_cfg).await?;
113 println!("{} Object store initialized", "✓".green());
114
115 let components = parse_file_dir_components(&self.source)?;
117 println!(
118 "{} Source path parsed: {}, components: {:?}",
119 "✓".green(),
120 self.source,
121 components
122 );
123
124 println!("{}", "Loading parquet metadata...".yellow());
126 let file_size = object_store
127 .stat(&self.source)
128 .await
129 .map_err(|e| {
130 error::IllegalConfigSnafu {
131 msg: format!("stat failed: {e}"),
132 }
133 .build()
134 })?
135 .content_length();
136 let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
137 .await
138 .map_err(|e| {
139 error::IllegalConfigSnafu {
140 msg: format!("read parquet metadata failed: {e}"),
141 }
142 .build()
143 })?;
144
145 let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
146 let num_rows = parquet_meta.file_metadata().num_rows() as u64;
147 let num_row_groups = parquet_meta.num_row_groups() as u64;
148 let max_row_group_uncompressed_size: u64 = parquet_meta
149 .row_groups()
150 .iter()
151 .map(|rg| {
152 rg.columns()
153 .iter()
154 .map(|c| c.uncompressed_size() as u64)
155 .sum::<u64>()
156 })
157 .max()
158 .unwrap_or(0);
159
160 println!(
161 "{} Metadata loaded - rows: {}, size: {} bytes",
162 "✓".green(),
163 num_rows,
164 file_size
165 );
166
167 let file_meta = FileMeta {
169 region_id: region_meta.region_id,
170 file_id: components.file_id,
171 time_range: Default::default(),
172 level: 0,
173 file_size,
174 max_row_group_uncompressed_size,
175 available_indexes: Default::default(),
176 indexes: Default::default(),
177 index_file_size: 0,
178 index_version: 0,
179 num_rows,
180 num_row_groups,
181 sequence: None,
182 partition_expr: None,
183 num_series: 0,
184 };
185 let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
186
187 let table_dir = components.table_dir();
189 let (src_access_layer, cache_manager) = build_access_layer_simple(
190 &components,
191 object_store.clone(),
192 &mut mito_engine_config,
193 &store_cfg.data_home,
194 )
195 .await?;
196 let reader_build_start = Instant::now();
197
198 let reader = ParquetReaderBuilder::new(
199 table_dir,
200 components.path_type,
201 src_handle.clone(),
202 object_store.clone(),
203 )
204 .expected_metadata(Some(region_meta.clone()))
205 .build()
206 .await
207 .map_err(|e| {
208 error::IllegalConfigSnafu {
209 msg: format!("build reader failed: {e:?}"),
210 }
211 .build()
212 })?;
213
214 let reader_build_elapsed = reader_build_start.elapsed();
215 let total_rows = reader.parquet_metadata().file_metadata().num_rows();
216 println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
217
218 let fulltext_index_config = FulltextIndexConfig {
220 create_on_compaction: Mode::Disable,
221 ..Default::default()
222 };
223
224 let write_req = SstWriteRequest {
225 op_type: OperationType::Flush,
226 metadata: region_meta,
227 source: Either::Left(Source::Reader(Box::new(reader))),
228 cache_manager,
229 storage: None,
230 max_sequence: None,
231 index_options: Default::default(),
232 index_config: mito_engine_config.index.clone(),
233 inverted_index_config: MitoConfig::default().inverted_index,
234 fulltext_index_config,
235 bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
236 };
237
238 println!("{}", "Writing SST...".yellow());
240
241 #[cfg(unix)]
243 let profiler_guard = if self.pprof_file.is_some() {
244 println!("{} Starting profiling...", "⚡".yellow());
245 Some(
246 pprof::ProfilerGuardBuilder::default()
247 .frequency(99)
248 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
249 .build()
250 .map_err(|e| {
251 error::IllegalConfigSnafu {
252 msg: format!("Failed to start profiler: {e}"),
253 }
254 .build()
255 })?,
256 )
257 } else {
258 None
259 };
260
261 #[cfg(not(unix))]
262 if self.pprof_file.is_some() {
263 eprintln!(
264 "{}: Profiling is not supported on this platform",
265 "Warning".yellow()
266 );
267 }
268
269 let write_start = Instant::now();
270 let mut metrics = Metrics::new(WriteType::Flush);
271 let infos = src_access_layer
272 .write_sst(write_req, &WriteOptions::default(), &mut metrics)
273 .await
274 .map_err(|e| {
275 error::IllegalConfigSnafu {
276 msg: format!("write_sst failed: {e:?}"),
277 }
278 .build()
279 })?;
280
281 let write_elapsed = write_start.elapsed();
282
283 #[cfg(unix)]
285 if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
286 println!("{} Generating flamegraph...", "🔥".yellow());
287 match guard.report().build() {
288 Ok(report) => {
289 let mut flamegraph_data = Vec::new();
290 if let Err(e) = report.flamegraph(&mut flamegraph_data) {
291 println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
292 } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
293 println!(
294 "{}: Failed to write flamegraph to {}: {}",
295 "Error".red(),
296 pprof_file.display(),
297 e
298 );
299 } else {
300 println!(
301 "{} Flamegraph saved to {}",
302 "✓".green(),
303 pprof_file.display().to_string().cyan()
304 );
305 }
306 }
307 Err(e) => {
308 println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
309 }
310 }
311 }
312 assert_eq!(infos.len(), 1);
313 let dst_file_id = infos[0].file_id;
314 let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
315 let mut dst_index_path = None;
316 if infos[0].index_metadata.file_size > 0 {
317 dst_index_path = Some(format!(
318 "{}/index/{}.puffin",
319 components.region_dir(),
320 dst_file_id
321 ));
322 }
323
324 println!("\n{} {}", "Write complete!".green().bold(), "✓".green());
326 println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
327 println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
328 println!(
329 " {}: {}",
330 "File size".bold(),
331 format!("{} bytes", file_size).cyan()
332 );
333 println!(
334 " {}: {:?}",
335 "Reader build time".bold(),
336 reader_build_elapsed
337 );
338 println!(" {}: {:?}", "Total time".bold(), write_elapsed);
339
340 println!(" {}: {:?}", "Metrics".bold(), metrics,);
342
343 println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
345
346 println!("\n{}", "Cleaning up...".yellow());
348 object_store.delete(&dst_file_path).await.map_err(|e| {
349 error::IllegalConfigSnafu {
350 msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
351 }
352 .build()
353 })?;
354 println!("{} Temporary file {} deleted", "✓".green(), dst_file_path);
355
356 if let Some(index_path) = dst_index_path {
357 object_store.delete(&index_path).await.map_err(|e| {
358 error::IllegalConfigSnafu {
359 msg: format!("Failed to delete dest index file {}: {}", index_path, e),
360 }
361 .build()
362 })?;
363 println!(
364 "{} Temporary index file {} deleted",
365 "✓".green(),
366 index_path
367 );
368 }
369
370 println!("\n{}", "Benchmark completed successfully!".green().bold());
371 Ok(())
372 }
373}
374
375#[derive(Debug)]
376struct FileDirComponents {
377 catalog: String,
378 schema: String,
379 table_id: u32,
380 region_sequence: u32,
381 path_type: PathType,
382 file_id: FileId,
383}
384
385impl FileDirComponents {
386 fn table_dir(&self) -> String {
387 format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
388 }
389
390 fn region_dir(&self) -> String {
391 let region_name = region_name(self.table_id, self.region_sequence);
392 match self.path_type {
393 PathType::Bare => {
394 format!(
395 "data/{}/{}/{}/{}",
396 self.catalog, self.schema, self.table_id, region_name
397 )
398 }
399 PathType::Data => {
400 format!(
401 "data/{}/{}/{}/{}/data",
402 self.catalog, self.schema, self.table_id, region_name
403 )
404 }
405 PathType::Metadata => {
406 format!(
407 "data/{}/{}/{}/{}/metadata",
408 self.catalog, self.schema, self.table_id, region_name
409 )
410 }
411 }
412 }
413}
414
415fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
416 let pattern =
418 r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
419
420 let re = Regex::new(pattern).expect("Invalid regex pattern");
422
423 let path_type = if path.contains("/data/") {
425 PathType::Data
426 } else if path.contains("/metadata/") {
427 PathType::Metadata
428 } else {
429 PathType::Bare
430 };
431
432 let components = (|| {
434 let captures = re.captures(path)?;
435 if captures.len() != 7 {
436 return None;
437 }
438 let mut components = FileDirComponents {
439 catalog: "".to_string(),
440 schema: "".to_string(),
441 table_id: 0,
442 region_sequence: 0,
443 path_type,
444 file_id: FileId::default(),
445 };
446 components.catalog = captures.get(1)?.as_str().to_string();
448 components.schema = captures.get(2)?.as_str().to_string();
449 components.table_id = captures[3].parse().ok()?;
450 components.region_sequence = captures[5].parse().ok()?;
451 let file_id_str = &captures[6];
452 components.file_id = FileId::parse_str(file_id_str).ok()?;
453 Some(components)
454 })();
455 components.context(error::IllegalConfigSnafu {
456 msg: format!("Expect valid source file path, got: {}", path),
457 })
458}
459
460fn extract_region_metadata(
461 file_path: &str,
462 meta: &parquet::file::metadata::ParquetMetaData,
463) -> error::Result<RegionMetadataRef> {
464 use parquet::format::KeyValue;
465 let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
466 let Some(kvs) = kvs else {
467 return Err(error::IllegalConfigSnafu {
468 msg: format!("{file_path}: missing parquet key_value metadata"),
469 }
470 .build());
471 };
472 let json = kvs
473 .iter()
474 .find(|kv| kv.key == PARQUET_METADATA_KEY)
475 .and_then(|kv| kv.value.as_ref())
476 .ok_or_else(|| {
477 error::IllegalConfigSnafu {
478 msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
479 }
480 .build()
481 })?;
482 let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
483 error::IllegalConfigSnafu {
484 msg: format!("invalid region metadata json: {e}"),
485 }
486 .build()
487 })?;
488 Ok(Arc::new(region))
489}
490
491async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
492 store::new_object_store(sc.store.clone(), &sc.data_home)
493 .await
494 .map_err(|e| {
495 error::IllegalConfigSnafu {
496 msg: format!("Failed to build object store: {e:?}"),
497 }
498 .build()
499 })
500}
501
502async fn build_access_layer_simple(
503 components: &FileDirComponents,
504 object_store: ObjectStore,
505 config: &mut MitoConfig,
506 data_home: &str,
507) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
508 let _ = config.index.sanitize(data_home, &config.inverted_index);
509 let puffin_manager = PuffinManagerFactory::new(
510 &config.index.aux_path,
511 config.index.staging_size.as_bytes(),
512 Some(config.index.write_buffer_size.as_bytes() as _),
513 config.index.staging_ttl,
514 )
515 .await
516 .map_err(|e| {
517 error::IllegalConfigSnafu {
518 msg: format!("Failed to build access layer: {e:?}"),
519 }
520 .build()
521 })?;
522
523 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
524 .await
525 .map_err(|e| {
526 error::IllegalConfigSnafu {
527 msg: format!("Failed to build IntermediateManager: {e:?}"),
528 }
529 .build()
530 })?
531 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
532
533 let cache_manager =
534 build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
535 let layer = AccessLayer::new(
536 components.table_dir(),
537 components.path_type,
538 object_store,
539 puffin_manager,
540 intermediate_manager,
541 );
542 Ok((Arc::new(layer), cache_manager))
543}
544
545async fn build_cache_manager(
546 config: &MitoConfig,
547 puffin_manager: PuffinManagerFactory,
548 intermediate_manager: IntermediateManager,
549) -> error::Result<CacheManagerRef> {
550 let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
551 .await
552 .map_err(|e| {
553 error::IllegalConfigSnafu {
554 msg: format!("Failed to build write cache: {e:?}"),
555 }
556 .build()
557 })?;
558 let cache_manager = Arc::new(
559 CacheManager::builder()
560 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
561 .vector_cache_size(config.vector_cache_size.as_bytes())
562 .page_cache_size(config.page_cache_size.as_bytes())
563 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
564 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
565 .index_content_size(config.index.content_cache_size.as_bytes())
566 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
567 .index_result_cache_size(config.index.result_cache_size.as_bytes())
568 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
569 .write_cache(write_cache)
570 .build(),
571 );
572 Ok(cache_manager)
573}
574
575fn new_noop_file_purger() -> FilePurgerRef {
576 #[derive(Debug)]
577 struct Noop;
578 impl FilePurger for Noop {
579 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
580 }
581 Arc::new(Noop)
582}
583
584async fn load_parquet_metadata(
585 object_store: ObjectStore,
586 path: &str,
587 file_size: u64,
588) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
589 use parquet::file::FOOTER_SIZE;
590 use parquet::file::metadata::ParquetMetaDataReader;
591 let actual_size = if file_size == 0 {
592 object_store.stat(path).await?.content_length()
593 } else {
594 file_size
595 };
596 if actual_size < FOOTER_SIZE as u64 {
597 return Err("file too small".into());
598 }
599 let prefetch: u64 = 64 * 1024;
600 let start = actual_size.saturating_sub(prefetch);
601 let buffer = object_store
602 .read_with(path)
603 .range(start..actual_size)
604 .await?
605 .to_vec();
606 let buffer_len = buffer.len();
607 let mut footer = [0; 8];
608 footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
609 let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
610 let metadata_len = footer.metadata_length() as u64;
611 if actual_size - (FOOTER_SIZE as u64) < metadata_len {
612 return Err("invalid footer/metadata length".into());
613 }
614 if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
615 let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
616 let meta = ParquetMetaDataReader::decode_metadata(
617 &buffer[metadata_start..buffer_len - FOOTER_SIZE],
618 )?;
619 Ok(meta)
620 } else {
621 let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
622 let data = object_store
623 .read_with(path)
624 .range(metadata_start..(actual_size - FOOTER_SIZE as u64))
625 .await?
626 .to_vec();
627 let meta = ParquetMetaDataReader::decode_metadata(&data)?;
628 Ok(meta)
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use std::path::PathBuf;
635 use std::str::FromStr;
636
637 use common_base::readable_size::ReadableSize;
638 use store_api::region_request::PathType;
639
640 use crate::datanode::objbench::{parse_config, parse_file_dir_components};
641
642 #[test]
643 fn test_parse_dir() {
644 let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
645 let c = parse_file_dir_components(meta_path).unwrap();
646 assert_eq!(
647 c.file_id.to_string(),
648 "00020380-009c-426d-953e-b4e34c15af34"
649 );
650 assert_eq!(c.catalog, "greptime");
651 assert_eq!(c.schema, "public");
652 assert_eq!(c.table_id, 1024);
653 assert_eq!(c.region_sequence, 0);
654 assert_eq!(c.path_type, PathType::Metadata);
655
656 let c = parse_file_dir_components(
657 "data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
658 ).unwrap();
659 assert_eq!(
660 c.file_id.to_string(),
661 "00020380-009c-426d-953e-b4e34c15af34"
662 );
663 assert_eq!(c.catalog, "greptime");
664 assert_eq!(c.schema, "public");
665 assert_eq!(c.table_id, 1024);
666 assert_eq!(c.region_sequence, 0);
667 assert_eq!(c.path_type, PathType::Data);
668
669 let c = parse_file_dir_components(
670 "data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
671 ).unwrap();
672 assert_eq!(
673 c.file_id.to_string(),
674 "00020380-009c-426d-953e-b4e34c15af34"
675 );
676 assert_eq!(c.catalog, "greptime");
677 assert_eq!(c.schema, "public");
678 assert_eq!(c.table_id, 1024);
679 assert_eq!(c.region_sequence, 0);
680 assert_eq!(c.path_type, PathType::Bare);
681 }
682
683 #[test]
684 fn test_parse_config() {
685 let path = "../../config/datanode.example.toml";
686 let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
687 assert_eq!(storage.data_home, "./greptimedb_data");
688 assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
689 }
690}