cmd/datanode/
objbench.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Instant;
18
19use clap::Parser;
20use colored::Colorize;
21use datanode::config::RegionEngineConfig;
22use datanode::store;
23use either::Either;
24use mito2::access_layer::{
25    AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
26};
27use mito2::cache::{CacheManager, CacheManagerRef};
28use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
29use mito2::read::Source;
30use mito2::sst::file::{FileHandle, FileMeta};
31use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
32use mito2::sst::index::intermediate::IntermediateManager;
33use mito2::sst::index::puffin_manager::PuffinManagerFactory;
34use mito2::sst::parquet::reader::ParquetReaderBuilder;
35use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
36use mito2::worker::write_cache_from_config;
37use object_store::ObjectStore;
38use regex::Regex;
39use snafu::OptionExt;
40use store_api::metadata::{RegionMetadata, RegionMetadataRef};
41use store_api::path_utils::region_name;
42use store_api::region_request::PathType;
43use store_api::storage::FileId;
44
45use crate::datanode::{StorageConfig, StorageConfigWrapper};
46use crate::error;
47
48/// Object storage benchmark command
49#[derive(Debug, Parser)]
50pub struct ObjbenchCommand {
51    /// Path to the object-store config file (TOML). Must deserialize into object_store::config::ObjectStoreConfig.
52    #[clap(long, value_name = "FILE")]
53    pub config: PathBuf,
54
55    /// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
56    #[clap(long, value_name = "PATH")]
57    pub source: String,
58
59    /// Verbose output
60    #[clap(short, long, default_value_t = false)]
61    pub verbose: bool,
62
63    /// Output file path for pprof flamegraph (enables profiling)
64    #[clap(long, value_name = "FILE")]
65    pub pprof_file: Option<PathBuf>,
66}
67
68fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> {
69    let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
70        error::IllegalConfigSnafu {
71            msg: format!("failed to read config {}: {e}", config_path.display()),
72        }
73        .build()
74    })?;
75
76    let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
77        error::IllegalConfigSnafu {
78            msg: format!("failed to parse config {}: {e}", config_path.display()),
79        }
80        .build()
81    })?;
82
83    let storage_config = store_cfg.storage;
84    let mito_engine_config = store_cfg
85        .region_engine
86        .into_iter()
87        .filter_map(|c| {
88            if let RegionEngineConfig::Mito(mito) = c {
89                Some(mito)
90            } else {
91                None
92            }
93        })
94        .next()
95        .with_context(|| error::IllegalConfigSnafu {
96            msg: format!("Engine config not found in {:?}", config_path),
97        })?;
98    Ok((storage_config, mito_engine_config))
99}
100
101impl ObjbenchCommand {
102    pub async fn run(&self) -> error::Result<()> {
103        if self.verbose {
104            common_telemetry::init_default_ut_logging();
105        }
106
107        println!("{}", "Starting objbench with config:".cyan().bold());
108
109        // Build object store from config
110        let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?;
111
112        let object_store = build_object_store(&store_cfg).await?;
113        println!("{} Object store initialized", "✓".green());
114
115        // Prepare source identifiers
116        let components = parse_file_dir_components(&self.source)?;
117        println!(
118            "{} Source path parsed: {}, components: {:?}",
119            "✓".green(),
120            self.source,
121            components
122        );
123
124        // Load parquet metadata to extract RegionMetadata and file stats
125        println!("{}", "Loading parquet metadata...".yellow());
126        let file_size = object_store
127            .stat(&self.source)
128            .await
129            .map_err(|e| {
130                error::IllegalConfigSnafu {
131                    msg: format!("stat failed: {e}"),
132                }
133                .build()
134            })?
135            .content_length();
136        let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
137            .await
138            .map_err(|e| {
139                error::IllegalConfigSnafu {
140                    msg: format!("read parquet metadata failed: {e}"),
141                }
142                .build()
143            })?;
144
145        let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
146        let num_rows = parquet_meta.file_metadata().num_rows() as u64;
147        let num_row_groups = parquet_meta.num_row_groups() as u64;
148        let max_row_group_uncompressed_size: u64 = parquet_meta
149            .row_groups()
150            .iter()
151            .map(|rg| {
152                rg.columns()
153                    .iter()
154                    .map(|c| c.uncompressed_size() as u64)
155                    .sum::<u64>()
156            })
157            .max()
158            .unwrap_or(0);
159
160        println!(
161            "{} Metadata loaded - rows: {}, size: {} bytes",
162            "✓".green(),
163            num_rows,
164            file_size
165        );
166
167        // Build a FileHandle for the source file
168        let file_meta = FileMeta {
169            region_id: region_meta.region_id,
170            file_id: components.file_id,
171            time_range: Default::default(),
172            level: 0,
173            file_size,
174            max_row_group_uncompressed_size,
175            available_indexes: Default::default(),
176            indexes: Default::default(),
177            index_file_size: 0,
178            index_version: 0,
179            num_rows,
180            num_row_groups,
181            sequence: None,
182            partition_expr: None,
183            num_series: 0,
184        };
185        let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
186
187        // Build the reader for a single file via ParquetReaderBuilder
188        let table_dir = components.table_dir();
189        let (src_access_layer, cache_manager) = build_access_layer_simple(
190            &components,
191            object_store.clone(),
192            &mut mito_engine_config,
193            &store_cfg.data_home,
194        )
195        .await?;
196        let reader_build_start = Instant::now();
197
198        let reader = ParquetReaderBuilder::new(
199            table_dir,
200            components.path_type,
201            src_handle.clone(),
202            object_store.clone(),
203        )
204        .expected_metadata(Some(region_meta.clone()))
205        .build()
206        .await
207        .map_err(|e| {
208            error::IllegalConfigSnafu {
209                msg: format!("build reader failed: {e:?}"),
210            }
211            .build()
212        })?;
213
214        let reader_build_elapsed = reader_build_start.elapsed();
215        let total_rows = reader.parquet_metadata().file_metadata().num_rows();
216        println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
217
218        // Build write request
219        let fulltext_index_config = FulltextIndexConfig {
220            create_on_compaction: Mode::Disable,
221            ..Default::default()
222        };
223
224        let write_req = SstWriteRequest {
225            op_type: OperationType::Flush,
226            metadata: region_meta,
227            source: Either::Left(Source::Reader(Box::new(reader))),
228            cache_manager,
229            storage: None,
230            max_sequence: None,
231            index_options: Default::default(),
232            index_config: mito_engine_config.index.clone(),
233            inverted_index_config: MitoConfig::default().inverted_index,
234            fulltext_index_config,
235            bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
236        };
237
238        // Write SST
239        println!("{}", "Writing SST...".yellow());
240
241        // Start profiling if pprof_file is specified
242        #[cfg(unix)]
243        let profiler_guard = if self.pprof_file.is_some() {
244            println!("{} Starting profiling...", "⚡".yellow());
245            Some(
246                pprof::ProfilerGuardBuilder::default()
247                    .frequency(99)
248                    .blocklist(&["libc", "libgcc", "pthread", "vdso"])
249                    .build()
250                    .map_err(|e| {
251                        error::IllegalConfigSnafu {
252                            msg: format!("Failed to start profiler: {e}"),
253                        }
254                        .build()
255                    })?,
256            )
257        } else {
258            None
259        };
260
261        #[cfg(not(unix))]
262        if self.pprof_file.is_some() {
263            eprintln!(
264                "{}: Profiling is not supported on this platform",
265                "Warning".yellow()
266            );
267        }
268
269        let write_start = Instant::now();
270        let mut metrics = Metrics::new(WriteType::Flush);
271        let infos = src_access_layer
272            .write_sst(write_req, &WriteOptions::default(), &mut metrics)
273            .await
274            .map_err(|e| {
275                error::IllegalConfigSnafu {
276                    msg: format!("write_sst failed: {e:?}"),
277                }
278                .build()
279            })?;
280
281        let write_elapsed = write_start.elapsed();
282
283        // Stop profiling and generate flamegraph if enabled
284        #[cfg(unix)]
285        if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
286            println!("{} Generating flamegraph...", "🔥".yellow());
287            match guard.report().build() {
288                Ok(report) => {
289                    let mut flamegraph_data = Vec::new();
290                    if let Err(e) = report.flamegraph(&mut flamegraph_data) {
291                        println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
292                    } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
293                        println!(
294                            "{}: Failed to write flamegraph to {}: {}",
295                            "Error".red(),
296                            pprof_file.display(),
297                            e
298                        );
299                    } else {
300                        println!(
301                            "{} Flamegraph saved to {}",
302                            "✓".green(),
303                            pprof_file.display().to_string().cyan()
304                        );
305                    }
306                }
307                Err(e) => {
308                    println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
309                }
310            }
311        }
312        assert_eq!(infos.len(), 1);
313        let dst_file_id = infos[0].file_id;
314        let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
315        let mut dst_index_path = None;
316        if infos[0].index_metadata.file_size > 0 {
317            dst_index_path = Some(format!(
318                "{}/index/{}.puffin",
319                components.region_dir(),
320                dst_file_id
321            ));
322        }
323
324        // Report results with ANSI colors
325        println!("\n{} {}", "Write complete!".green().bold(), "✓".green());
326        println!("  {}: {}", "Destination file".bold(), dst_file_path.cyan());
327        println!("  {}: {}", "Rows".bold(), total_rows.to_string().cyan());
328        println!(
329            "  {}: {}",
330            "File size".bold(),
331            format!("{} bytes", file_size).cyan()
332        );
333        println!(
334            "  {}: {:?}",
335            "Reader build time".bold(),
336            reader_build_elapsed
337        );
338        println!("  {}: {:?}", "Total time".bold(), write_elapsed);
339
340        // Print metrics in a formatted way
341        println!("  {}: {:?}", "Metrics".bold(), metrics,);
342
343        // Print infos
344        println!("  {}: {:?}", "Index".bold(), infos[0].index_metadata);
345
346        // Cleanup
347        println!("\n{}", "Cleaning up...".yellow());
348        object_store.delete(&dst_file_path).await.map_err(|e| {
349            error::IllegalConfigSnafu {
350                msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
351            }
352            .build()
353        })?;
354        println!("{} Temporary file {} deleted", "✓".green(), dst_file_path);
355
356        if let Some(index_path) = dst_index_path {
357            object_store.delete(&index_path).await.map_err(|e| {
358                error::IllegalConfigSnafu {
359                    msg: format!("Failed to delete dest index file {}: {}", index_path, e),
360                }
361                .build()
362            })?;
363            println!(
364                "{} Temporary index file {} deleted",
365                "✓".green(),
366                index_path
367            );
368        }
369
370        println!("\n{}", "Benchmark completed successfully!".green().bold());
371        Ok(())
372    }
373}
374
375#[derive(Debug)]
376struct FileDirComponents {
377    catalog: String,
378    schema: String,
379    table_id: u32,
380    region_sequence: u32,
381    path_type: PathType,
382    file_id: FileId,
383}
384
385impl FileDirComponents {
386    fn table_dir(&self) -> String {
387        format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
388    }
389
390    fn region_dir(&self) -> String {
391        let region_name = region_name(self.table_id, self.region_sequence);
392        match self.path_type {
393            PathType::Bare => {
394                format!(
395                    "data/{}/{}/{}/{}",
396                    self.catalog, self.schema, self.table_id, region_name
397                )
398            }
399            PathType::Data => {
400                format!(
401                    "data/{}/{}/{}/{}/data",
402                    self.catalog, self.schema, self.table_id, region_name
403                )
404            }
405            PathType::Metadata => {
406                format!(
407                    "data/{}/{}/{}/{}/metadata",
408                    self.catalog, self.schema, self.table_id, region_name
409                )
410            }
411        }
412    }
413}
414
415fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
416    // Define the regex pattern to match all three path styles
417    let pattern =
418        r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
419
420    // Compile the regex
421    let re = Regex::new(pattern).expect("Invalid regex pattern");
422
423    // Determine the path type
424    let path_type = if path.contains("/data/") {
425        PathType::Data
426    } else if path.contains("/metadata/") {
427        PathType::Metadata
428    } else {
429        PathType::Bare
430    };
431
432    // Try to match the path
433    let components = (|| {
434        let captures = re.captures(path)?;
435        if captures.len() != 7 {
436            return None;
437        }
438        let mut components = FileDirComponents {
439            catalog: "".to_string(),
440            schema: "".to_string(),
441            table_id: 0,
442            region_sequence: 0,
443            path_type,
444            file_id: FileId::default(),
445        };
446        // Extract the components
447        components.catalog = captures.get(1)?.as_str().to_string();
448        components.schema = captures.get(2)?.as_str().to_string();
449        components.table_id = captures[3].parse().ok()?;
450        components.region_sequence = captures[5].parse().ok()?;
451        let file_id_str = &captures[6];
452        components.file_id = FileId::parse_str(file_id_str).ok()?;
453        Some(components)
454    })();
455    components.context(error::IllegalConfigSnafu {
456        msg: format!("Expect valid source file path, got: {}", path),
457    })
458}
459
460fn extract_region_metadata(
461    file_path: &str,
462    meta: &parquet::file::metadata::ParquetMetaData,
463) -> error::Result<RegionMetadataRef> {
464    use parquet::format::KeyValue;
465    let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
466    let Some(kvs) = kvs else {
467        return Err(error::IllegalConfigSnafu {
468            msg: format!("{file_path}: missing parquet key_value metadata"),
469        }
470        .build());
471    };
472    let json = kvs
473        .iter()
474        .find(|kv| kv.key == PARQUET_METADATA_KEY)
475        .and_then(|kv| kv.value.as_ref())
476        .ok_or_else(|| {
477            error::IllegalConfigSnafu {
478                msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
479            }
480            .build()
481        })?;
482    let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
483        error::IllegalConfigSnafu {
484            msg: format!("invalid region metadata json: {e}"),
485        }
486        .build()
487    })?;
488    Ok(Arc::new(region))
489}
490
491async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
492    store::new_object_store(sc.store.clone(), &sc.data_home)
493        .await
494        .map_err(|e| {
495            error::IllegalConfigSnafu {
496                msg: format!("Failed to build object store: {e:?}"),
497            }
498            .build()
499        })
500}
501
502async fn build_access_layer_simple(
503    components: &FileDirComponents,
504    object_store: ObjectStore,
505    config: &mut MitoConfig,
506    data_home: &str,
507) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
508    let _ = config.index.sanitize(data_home, &config.inverted_index);
509    let puffin_manager = PuffinManagerFactory::new(
510        &config.index.aux_path,
511        config.index.staging_size.as_bytes(),
512        Some(config.index.write_buffer_size.as_bytes() as _),
513        config.index.staging_ttl,
514    )
515    .await
516    .map_err(|e| {
517        error::IllegalConfigSnafu {
518            msg: format!("Failed to build access layer: {e:?}"),
519        }
520        .build()
521    })?;
522
523    let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
524        .await
525        .map_err(|e| {
526            error::IllegalConfigSnafu {
527                msg: format!("Failed to build IntermediateManager: {e:?}"),
528            }
529            .build()
530        })?
531        .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
532
533    let cache_manager =
534        build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
535    let layer = AccessLayer::new(
536        components.table_dir(),
537        components.path_type,
538        object_store,
539        puffin_manager,
540        intermediate_manager,
541    );
542    Ok((Arc::new(layer), cache_manager))
543}
544
545async fn build_cache_manager(
546    config: &MitoConfig,
547    puffin_manager: PuffinManagerFactory,
548    intermediate_manager: IntermediateManager,
549) -> error::Result<CacheManagerRef> {
550    let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
551        .await
552        .map_err(|e| {
553            error::IllegalConfigSnafu {
554                msg: format!("Failed to build write cache: {e:?}"),
555            }
556            .build()
557        })?;
558    let cache_manager = Arc::new(
559        CacheManager::builder()
560            .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
561            .vector_cache_size(config.vector_cache_size.as_bytes())
562            .page_cache_size(config.page_cache_size.as_bytes())
563            .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
564            .index_metadata_size(config.index.metadata_cache_size.as_bytes())
565            .index_content_size(config.index.content_cache_size.as_bytes())
566            .index_content_page_size(config.index.content_cache_page_size.as_bytes())
567            .index_result_cache_size(config.index.result_cache_size.as_bytes())
568            .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
569            .write_cache(write_cache)
570            .build(),
571    );
572    Ok(cache_manager)
573}
574
575fn new_noop_file_purger() -> FilePurgerRef {
576    #[derive(Debug)]
577    struct Noop;
578    impl FilePurger for Noop {
579        fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
580    }
581    Arc::new(Noop)
582}
583
584async fn load_parquet_metadata(
585    object_store: ObjectStore,
586    path: &str,
587    file_size: u64,
588) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
589    use parquet::file::FOOTER_SIZE;
590    use parquet::file::metadata::ParquetMetaDataReader;
591    let actual_size = if file_size == 0 {
592        object_store.stat(path).await?.content_length()
593    } else {
594        file_size
595    };
596    if actual_size < FOOTER_SIZE as u64 {
597        return Err("file too small".into());
598    }
599    let prefetch: u64 = 64 * 1024;
600    let start = actual_size.saturating_sub(prefetch);
601    let buffer = object_store
602        .read_with(path)
603        .range(start..actual_size)
604        .await?
605        .to_vec();
606    let buffer_len = buffer.len();
607    let mut footer = [0; 8];
608    footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
609    let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
610    let metadata_len = footer.metadata_length() as u64;
611    if actual_size - (FOOTER_SIZE as u64) < metadata_len {
612        return Err("invalid footer/metadata length".into());
613    }
614    if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
615        let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
616        let meta = ParquetMetaDataReader::decode_metadata(
617            &buffer[metadata_start..buffer_len - FOOTER_SIZE],
618        )?;
619        Ok(meta)
620    } else {
621        let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
622        let data = object_store
623            .read_with(path)
624            .range(metadata_start..(actual_size - FOOTER_SIZE as u64))
625            .await?
626            .to_vec();
627        let meta = ParquetMetaDataReader::decode_metadata(&data)?;
628        Ok(meta)
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use std::path::PathBuf;
635    use std::str::FromStr;
636
637    use common_base::readable_size::ReadableSize;
638    use store_api::region_request::PathType;
639
640    use crate::datanode::objbench::{parse_config, parse_file_dir_components};
641
642    #[test]
643    fn test_parse_dir() {
644        let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
645        let c = parse_file_dir_components(meta_path).unwrap();
646        assert_eq!(
647            c.file_id.to_string(),
648            "00020380-009c-426d-953e-b4e34c15af34"
649        );
650        assert_eq!(c.catalog, "greptime");
651        assert_eq!(c.schema, "public");
652        assert_eq!(c.table_id, 1024);
653        assert_eq!(c.region_sequence, 0);
654        assert_eq!(c.path_type, PathType::Metadata);
655
656        let c = parse_file_dir_components(
657            "data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
658        ).unwrap();
659        assert_eq!(
660            c.file_id.to_string(),
661            "00020380-009c-426d-953e-b4e34c15af34"
662        );
663        assert_eq!(c.catalog, "greptime");
664        assert_eq!(c.schema, "public");
665        assert_eq!(c.table_id, 1024);
666        assert_eq!(c.region_sequence, 0);
667        assert_eq!(c.path_type, PathType::Data);
668
669        let c = parse_file_dir_components(
670            "data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
671        ).unwrap();
672        assert_eq!(
673            c.file_id.to_string(),
674            "00020380-009c-426d-953e-b4e34c15af34"
675        );
676        assert_eq!(c.catalog, "greptime");
677        assert_eq!(c.schema, "public");
678        assert_eq!(c.table_id, 1024);
679        assert_eq!(c.region_sequence, 0);
680        assert_eq!(c.path_type, PathType::Bare);
681    }
682
683    #[test]
684    fn test_parse_config() {
685        let path = "../../config/datanode.example.toml";
686        let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
687        assert_eq!(storage.data_home, "./greptimedb_data");
688        assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
689    }
690}