Skip to main content

cmd/datanode/
scanbench.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Instant;
19
20use clap::Parser;
21use colored::Colorize;
22use common_base::Plugins;
23use common_error::ext::{BoxedError, PlainError};
24use common_error::status_code::StatusCode;
25use common_meta::cache::{new_schema_cache, new_table_schema_cache};
26use common_meta::key::SchemaMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use common_wal::config::DatanodeWalConfig;
29use datafusion::execution::SessionStateBuilder;
30use datafusion::logical_expr::{BinaryExpr, Expr as DfExpr, ExprSchemable, Operator};
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchemaRef, ScalarValue, ToDFSchema};
33use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
34use datatypes::arrow::compute;
35use futures::StreamExt;
36use futures::stream::FuturesUnordered;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::noop::log_store::NoopLogStore;
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use mito2::config::MitoConfig;
41use mito2::engine::MitoEngine;
42use mito2::sst::file_ref::FileReferenceManager;
43use moka::future::CacheBuilder;
44use object_store::manager::ObjectStoreManager;
45use object_store::util::normalize_dir;
46use query::optimizer::parallelize_scan::ParallelizeScan;
47use serde::Deserialize;
48use snafu::{OptionExt, ResultExt};
49use sqlparser::ast::ExprWithAlias as SqlExprWithAlias;
50use sqlparser::dialect::GenericDialect;
51use sqlparser::parser::Parser as SqlParser;
52use store_api::metadata::RegionMetadata;
53use store_api::path_utils::WAL_DIR;
54use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine};
55use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
56use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
57use tokio::fs;
58
59use crate::datanode::objbench::{build_object_store, parse_config};
60use crate::error;
61
62/// Scan benchmark command - benchmarks scanning a region directly from storage.
63#[derive(Debug, Parser)]
64pub struct ScanbenchCommand {
65    /// Path to config TOML file (same format as standalone/datanode config)
66    #[clap(long, value_name = "FILE")]
67    config: PathBuf,
68
69    /// Region ID: either numeric u64 (e.g. "4398046511104") or "table_id:region_num" (e.g. "1024:0")
70    #[clap(long)]
71    region_id: String,
72
73    /// Table directory relative to data home (e.g. "data/greptime/public/1024/")
74    #[clap(long)]
75    table_dir: String,
76
77    /// Scanner type: seq, unordered, series
78    #[clap(long, default_value = "seq")]
79    scanner: String,
80
81    /// Path to scan request JSON config file (optional)
82    #[clap(long, value_name = "FILE")]
83    scan_config: Option<PathBuf>,
84
85    /// Number of partitions for parallel scan (simulates parallelism)
86    #[clap(long, default_value = "1")]
87    parallelism: usize,
88
89    /// Number of iterations for benchmarking
90    #[clap(long, default_value = "1")]
91    iterations: usize,
92
93    /// Path type for the region: bare, data, metadata
94    #[clap(long, default_value = "bare")]
95    path_type: String,
96
97    /// Verbose output
98    #[clap(short, long, default_value_t = false)]
99    verbose: bool,
100
101    /// Output pprof flamegraph
102    #[clap(long, value_name = "FILE")]
103    pprof_file: Option<PathBuf>,
104
105    /// Enable WAL replay when opening the region.
106    #[clap(long, default_value_t = false)]
107    enable_wal: bool,
108
109    /// Start pprof after the first iteration (use first iteration as warmup).
110    #[clap(long, default_value_t = false)]
111    pprof_after_warmup: bool,
112}
113
114/// JSON config for scan request parameters.
115#[derive(Debug, Deserialize, Default)]
116struct ScanConfig {
117    projection: Option<Vec<usize>>,
118    projection_names: Option<Vec<String>>,
119    filters: Option<Vec<String>>,
120    series_row_selector: Option<String>,
121}
122
123fn resolve_projection(
124    scan_config: &ScanConfig,
125    metadata: Option<&RegionMetadata>,
126) -> error::Result<Option<Vec<usize>>> {
127    if scan_config.projection.is_some() && scan_config.projection_names.is_some() {
128        return Err(error::IllegalConfigSnafu {
129            msg: "scan config cannot contain both 'projection' and 'projection_names'".to_string(),
130        }
131        .build());
132    }
133
134    if let Some(projection) = &scan_config.projection {
135        return Ok(Some(projection.clone()));
136    }
137
138    if let Some(projection_names) = &scan_config.projection_names {
139        let metadata = metadata.context(error::IllegalConfigSnafu {
140            msg: "Missing region metadata while resolving 'projection_names'".to_string(),
141        })?;
142        let available_columns = metadata
143            .column_metadatas
144            .iter()
145            .map(|column| column.column_schema.name.as_str())
146            .collect::<Vec<_>>()
147            .join(", ");
148        let projection = projection_names
149            .iter()
150            .map(|name| {
151                metadata
152                    .column_index_by_name(name)
153                    .with_context(|| error::IllegalConfigSnafu {
154                        msg: format!(
155                            "Unknown column '{}' in projection_names, available columns: [{}]",
156                            name, available_columns
157                        ),
158                    })
159            })
160            .collect::<error::Result<Vec<_>>>()?;
161        return Ok(Some(projection));
162    }
163
164    Ok(None)
165}
166
167fn format_bytes(bytes: u64) -> String {
168    const KIB: u64 = 1024;
169    const MIB: u64 = 1024 * KIB;
170    const GIB: u64 = 1024 * MIB;
171    if bytes >= GIB {
172        format!("{:.2} GiB", bytes as f64 / GIB as f64)
173    } else if bytes >= MIB {
174        format!("{:.2} MiB", bytes as f64 / MIB as f64)
175    } else if bytes >= KIB {
176        format!("{:.2} KiB", bytes as f64 / KIB as f64)
177    } else {
178        format!("{} B", bytes)
179    }
180}
181
182fn parse_region_id(s: &str) -> error::Result<RegionId> {
183    if s.contains(':') {
184        let parts: Vec<&str> = s.splitn(2, ':').collect();
185        let table_id: u32 = parts[0].parse().map_err(|e| {
186            error::IllegalConfigSnafu {
187                msg: format!("invalid table_id in region_id '{}': {}", s, e),
188            }
189            .build()
190        })?;
191        let region_num: u32 = parts[1].parse().map_err(|e| {
192            error::IllegalConfigSnafu {
193                msg: format!("invalid region_num in region_id '{}': {}", s, e),
194            }
195            .build()
196        })?;
197        Ok(RegionId::new(table_id, region_num))
198    } else {
199        let id: u64 = s.parse().map_err(|e| {
200            error::IllegalConfigSnafu {
201                msg: format!("invalid region_id '{}': {}", s, e),
202            }
203            .build()
204        })?;
205        Ok(RegionId::from_u64(id))
206    }
207}
208
209fn parse_path_type(s: &str) -> error::Result<PathType> {
210    match s.to_lowercase().as_str() {
211        "bare" => Ok(PathType::Bare),
212        "data" => Ok(PathType::Data),
213        "metadata" => Ok(PathType::Metadata),
214        _ => Err(error::IllegalConfigSnafu {
215            msg: format!("invalid path_type '{}', expected: bare, data, metadata", s),
216        }
217        .build()),
218    }
219}
220
221/// Rewrites literal values in comparison expressions to match the column's arrow type.
222struct LiteralTypeCaster {
223    schema: DFSchemaRef,
224}
225
226impl TreeNodeRewriter for LiteralTypeCaster {
227    type Node = DfExpr;
228
229    fn f_up(&mut self, expr: DfExpr) -> datafusion_common::Result<Transformed<DfExpr>> {
230        let DfExpr::BinaryExpr(BinaryExpr { left, op, right }) = &expr else {
231            return Ok(Transformed::no(expr));
232        };
233
234        if !matches!(
235            op,
236            Operator::Eq
237                | Operator::NotEq
238                | Operator::Lt
239                | Operator::LtEq
240                | Operator::Gt
241                | Operator::GtEq
242        ) {
243            return Ok(Transformed::no(expr));
244        }
245
246        let (col_expr, lit_expr, col_left) = match (left.as_ref(), right.as_ref()) {
247            (col @ DfExpr::Column(_), lit @ DfExpr::Literal(_, _)) => (col, lit, true),
248            (lit @ DfExpr::Literal(_, _), col @ DfExpr::Column(_)) => (col, lit, false),
249            _ => return Ok(Transformed::no(expr)),
250        };
251
252        let col_type = col_expr.get_type(self.schema.as_ref())?;
253        let DfExpr::Literal(scalar, _) = lit_expr else {
254            unreachable!()
255        };
256
257        if scalar.data_type() == col_type {
258            return Ok(Transformed::no(expr));
259        }
260
261        let lit_array = scalar.to_array()?;
262        let casted = compute::cast(lit_array.as_ref(), &col_type).map_err(|e| {
263            datafusion_common::DataFusionError::Internal(format!(
264                "Failed to cast literal {:?} to {:?}: {}",
265                scalar, col_type, e
266            ))
267        })?;
268        let casted_scalar = ScalarValue::try_from_array(&casted, 0)?;
269
270        let new_lit = DfExpr::Literal(casted_scalar, None);
271        let (new_left, new_right) = if col_left {
272            (left.clone(), Box::new(new_lit))
273        } else {
274            (Box::new(new_lit), right.clone())
275        };
276
277        Ok(Transformed::yes(DfExpr::BinaryExpr(BinaryExpr {
278            left: new_left,
279            op: *op,
280            right: new_right,
281        })))
282    }
283}
284
285fn convert_literal_types(
286    exprs: Vec<DfExpr>,
287    schema: &DFSchemaRef,
288) -> datafusion_common::Result<Vec<DfExpr>> {
289    use datafusion_common::tree_node::TreeNode;
290
291    let mut caster = LiteralTypeCaster {
292        schema: schema.clone(),
293    };
294    exprs
295        .into_iter()
296        .map(|e| e.rewrite(&mut caster).map(|x| x.data))
297        .collect()
298}
299
300fn resolve_filters(
301    scan_config: &ScanConfig,
302    metadata: &RegionMetadata,
303) -> error::Result<Vec<DfExpr>> {
304    let Some(filters) = &scan_config.filters else {
305        return Ok(Vec::new());
306    };
307
308    let df_schema = metadata
309        .schema
310        .arrow_schema()
311        .clone()
312        .to_dfschema()
313        .map_err(|e| {
314            error::IllegalConfigSnafu {
315                msg: format!("Failed to convert region schema to DataFusion schema: {e}"),
316            }
317            .build()
318        })?;
319
320    let state = SessionStateBuilder::new()
321        .with_config(Default::default())
322        .with_runtime_env(Default::default())
323        .with_default_features()
324        .build();
325
326    let exprs: Vec<DfExpr> = filters
327        .iter()
328        .enumerate()
329        .map(|(idx, filter)| {
330            let mut parser = SqlParser::new(&GenericDialect {})
331                .try_with_sql(filter)
332                .map_err(|e| {
333                    error::IllegalConfigSnafu {
334                        msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
335                    }
336                    .build()
337                })?;
338
339            let sql_expr = parser.parse_expr().map_err(|e| {
340                error::IllegalConfigSnafu {
341                    msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
342                }
343                .build()
344            })?;
345
346            state
347                .create_logical_expr_from_sql_expr(
348                    SqlExprWithAlias {
349                        expr: sql_expr,
350                        alias: None,
351                    },
352                    &df_schema,
353                )
354                .map_err(|e| {
355                    error::IllegalConfigSnafu {
356                        msg: format!(
357                            "Failed to convert filter at index {idx} ('{filter}') to logical expr: {e}"
358                        ),
359                    }
360                    .build()
361                })
362        })
363        .collect::<error::Result<Vec<_>>>()?;
364
365    let df_schema_ref = Arc::new(df_schema);
366    convert_literal_types(exprs, &df_schema_ref).map_err(|e| {
367        error::IllegalConfigSnafu {
368            msg: format!("Failed to convert filter expression types: {e}"),
369        }
370        .build()
371    })
372}
373
374fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef {
375    struct NoopPartitionExprFetcher;
376
377    #[async_trait::async_trait]
378    impl mito2::region::opener::PartitionExprFetcher for NoopPartitionExprFetcher {
379        async fn fetch_expr(&self, _region_id: RegionId) -> Option<String> {
380            None
381        }
382    }
383
384    Arc::new(NoopPartitionExprFetcher)
385}
386
387struct EngineComponents {
388    data_home: String,
389    mito_config: MitoConfig,
390    object_store_manager: Arc<ObjectStoreManager>,
391    schema_metadata_manager: Arc<SchemaMetadataManager>,
392    file_ref_manager: Arc<FileReferenceManager>,
393    partition_expr_fetcher: mito2::region::opener::PartitionExprFetcherRef,
394}
395
396impl EngineComponents {
397    async fn build<S: store_api::logstore::LogStore>(
398        self,
399        log_store: Arc<S>,
400    ) -> error::Result<MitoEngine> {
401        MitoEngine::new(
402            &self.data_home,
403            self.mito_config,
404            log_store,
405            self.object_store_manager,
406            self.schema_metadata_manager,
407            self.file_ref_manager,
408            self.partition_expr_fetcher,
409            Plugins::default(),
410        )
411        .await
412        .map_err(BoxedError::new)
413        .context(error::BuildCliSnafu)
414    }
415}
416
417fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
418    let kv_backend = Arc::new(MemoryKvBackend::new());
419    let table_schema_cache = Arc::new(new_table_schema_cache(
420        "table_schema_name_cache".to_string(),
421        CacheBuilder::default().build(),
422        kv_backend.clone(),
423    ));
424    let schema_cache = Arc::new(new_schema_cache(
425        "schema_cache".to_string(),
426        CacheBuilder::default().build(),
427        kv_backend.clone(),
428    ));
429    Arc::new(SchemaMetadataManager::new(table_schema_cache, schema_cache))
430}
431
432impl ScanbenchCommand {
433    pub async fn run(&self) -> error::Result<()> {
434        if self.verbose {
435            common_telemetry::init_default_ut_logging();
436        }
437
438        println!("{}", "Starting scanbench...".cyan().bold());
439
440        let region_id = parse_region_id(&self.region_id)?;
441        let path_type = parse_path_type(&self.path_type)?;
442        println!(
443            "{} Region ID: {} (u64: {})",
444            "✓".green(),
445            self.region_id,
446            region_id.as_u64()
447        );
448
449        // Parse config and build object store
450        let (store_cfg, mito_config, wal_config) = parse_config(&self.config)?;
451        println!("{} Config parsed", "✓".green());
452
453        let object_store = build_object_store(&store_cfg).await?;
454        println!("{} Object store initialized", "✓".green());
455
456        let object_store_manager =
457            Arc::new(ObjectStoreManager::new("default", object_store.clone()));
458
459        // Create mock dependencies
460        let schema_metadata_manager = mock_schema_metadata_manager();
461        let file_ref_manager = Arc::new(FileReferenceManager::new(None));
462        let partition_expr_fetcher = noop_partition_expr_fetcher();
463
464        // Create MitoEngine with appropriate log store
465        let components = EngineComponents {
466            data_home: store_cfg.data_home.clone(),
467            mito_config,
468            object_store_manager,
469            schema_metadata_manager,
470            file_ref_manager,
471            partition_expr_fetcher,
472        };
473
474        let engine = match &wal_config {
475            DatanodeWalConfig::RaftEngine(raft_engine_config) if self.enable_wal => {
476                let data_home = normalize_dir(&store_cfg.data_home);
477                let wal_dir = match &raft_engine_config.dir {
478                    Some(dir) => dir.clone(),
479                    None => format!("{}{WAL_DIR}", data_home),
480                };
481                fs::create_dir_all(&wal_dir).await.map_err(|e| {
482                    error::IllegalConfigSnafu {
483                        msg: format!("failed to create WAL directory {}: {e}", wal_dir),
484                    }
485                    .build()
486                })?;
487                let log_store = Arc::new(
488                    RaftEngineLogStore::try_new(wal_dir, raft_engine_config)
489                        .await
490                        .map_err(BoxedError::new)
491                        .context(error::BuildCliSnafu)?,
492                );
493                println!("{} Using RaftEngine WAL", "✓".green());
494                components.build(log_store).await?
495            }
496            DatanodeWalConfig::Kafka(kafka_config) if self.enable_wal => {
497                let log_store = Arc::new(
498                    KafkaLogStore::try_new(kafka_config, None)
499                        .await
500                        .map_err(BoxedError::new)
501                        .context(error::BuildCliSnafu)?,
502                );
503                println!("{} Using Kafka WAL", "✓".green());
504                components.build(log_store).await?
505            }
506            _ => {
507                let log_store = Arc::new(NoopLogStore);
508                println!(
509                    "{} Using NoopLogStore (enable_wal={})",
510                    "✓".green(),
511                    self.enable_wal
512                );
513                components.build(log_store).await?
514            }
515        };
516
517        // Open region
518        let open_request = RegionOpenRequest {
519            engine: "mito".to_string(),
520            table_dir: self.table_dir.clone(),
521            path_type,
522            options: HashMap::default(),
523            skip_wal_replay: !self.enable_wal,
524            checkpoint: None,
525        };
526
527        engine
528            .handle_request(region_id, RegionRequest::Open(open_request))
529            .await
530            .map_err(BoxedError::new)
531            .context(error::BuildCliSnafu)?;
532        println!("{} Region opened", "✓".green());
533
534        // Load scan config
535        let scan_config = if let Some(path) = &self.scan_config {
536            let content = tokio::fs::read_to_string(path)
537                .await
538                .context(error::FileIoSnafu)?;
539            serde_json::from_str::<ScanConfig>(&content).context(error::SerdeJsonSnafu)?
540        } else {
541            ScanConfig::default()
542        };
543        let metadata = engine
544            .get_metadata(region_id)
545            .await
546            .map_err(BoxedError::new)
547            .context(error::BuildCliSnafu)?;
548        let projection = resolve_projection(&scan_config, Some(&metadata))?;
549        let filters = resolve_filters(&scan_config, &metadata)?;
550
551        // Build scan request
552        let distribution = match self.scanner.as_str() {
553            "seq" => None,
554            "unordered" => Some(TimeSeriesDistribution::TimeWindowed),
555            "series" => Some(TimeSeriesDistribution::PerSeries),
556            other => {
557                return Err(error::IllegalConfigSnafu {
558                    msg: format!(
559                        "Unknown scanner type '{}', expected: seq, unordered, series",
560                        other
561                    ),
562                }
563                .build());
564            }
565        };
566
567        let series_row_selector = match scan_config.series_row_selector.as_deref() {
568            Some("last_row") => Some(TimeSeriesRowSelector::LastRow),
569            Some(other) => {
570                return Err(error::IllegalConfigSnafu {
571                    msg: format!("Unknown series_row_selector '{}'", other),
572                }
573                .build());
574            }
575            None => None,
576        };
577
578        println!(
579            "{} Scanner: {}, Parallelism: {}, Iterations: {}",
580            "ℹ".blue(),
581            self.scanner,
582            self.parallelism,
583            self.iterations,
584        );
585
586        // Start profiling if pprof_file is specified (unless pprof_after_warmup is set)
587        #[cfg(unix)]
588        let mut profiler_guard = if self.pprof_file.is_some() && !self.pprof_after_warmup {
589            println!("{} Starting profiling...", "⚡".yellow());
590            Some(
591                pprof::ProfilerGuardBuilder::default()
592                    .frequency(99)
593                    .blocklist(&["libc", "libgcc", "pthread", "vdso"])
594                    .build()
595                    .map_err(|e| {
596                        BoxedError::new(PlainError::new(
597                            format!("Failed to start profiler: {e}"),
598                            StatusCode::Unexpected,
599                        ))
600                    })
601                    .context(error::BuildCliSnafu)?,
602            )
603        } else {
604            None
605        };
606
607        #[cfg(not(unix))]
608        if self.pprof_file.is_some() {
609            eprintln!(
610                "{}: Profiling is not supported on this platform",
611                "Warning".yellow()
612            );
613        }
614
615        let mut total_rows_all = 0u64;
616        let mut total_elapsed_all = std::time::Duration::ZERO;
617
618        for iteration in 0..self.iterations {
619            let request = ScanRequest {
620                projection: projection.clone(),
621                filters: filters.clone(),
622                series_row_selector,
623                distribution,
624                ..Default::default()
625            };
626
627            let start = Instant::now();
628
629            // Get scanner
630            let mut scanner = engine
631                .handle_query(region_id, request)
632                .await
633                .map_err(BoxedError::new)
634                .context(error::BuildCliSnafu)?;
635
636            // Get partition ranges and apply parallelism
637            let original_partitions = scanner.properties().partitions.clone();
638            let total_ranges: usize = original_partitions.iter().map(|p| p.len()).sum();
639
640            if self.verbose {
641                println!(
642                    "  {} Original partitions: {}, total ranges: {}",
643                    "ℹ".blue(),
644                    original_partitions.len(),
645                    total_ranges
646                );
647            }
648
649            if self.parallelism > 1 {
650                // Flatten all ranges
651                let all_ranges: Vec<_> = original_partitions.into_iter().flatten().collect();
652
653                // Distribute ranges across partitions
654                let mut partitions =
655                    ParallelizeScan::assign_partition_range(all_ranges, self.parallelism);
656
657                // Sort ranges within each partition by start time ascending
658                for partition in &mut partitions {
659                    partition.sort_by_key(|a| a.start);
660                }
661
662                scanner
663                    .prepare(
664                        PrepareRequest::default()
665                            .with_ranges(partitions)
666                            .with_target_partitions(self.parallelism),
667                    )
668                    .map_err(BoxedError::new)
669                    .context(error::BuildCliSnafu)?;
670            }
671
672            // Scan all partitions
673            let num_partitions = scanner.properties().partitions.len();
674            let ctx = QueryScanContext {
675                explain_verbose: self.verbose,
676            };
677            let metrics_set = ExecutionPlanMetricsSet::new();
678
679            let mut scan_futures = FuturesUnordered::new();
680
681            for partition_idx in 0..num_partitions {
682                let mut stream = scanner
683                    .scan_partition(&ctx, &metrics_set, partition_idx)
684                    .map_err(BoxedError::new)
685                    .context(error::BuildCliSnafu)?;
686
687                scan_futures.push(tokio::spawn(async move {
688                    let mut rows = 0u64;
689                    let mut array_mem_size = 0u64;
690                    let mut estimated_size = 0u64;
691                    while let Some(batch_result) = stream.next().await {
692                        match batch_result {
693                            Ok(batch) => {
694                                rows += batch.num_rows() as u64;
695                                let df_batch = batch.df_record_batch();
696                                array_mem_size += df_batch.get_array_memory_size() as u64;
697                                estimated_size +=
698                                    mito2::memtable::record_batch_estimated_size(df_batch) as u64;
699                            }
700                            Err(e) => {
701                                return Err(BoxedError::new(e));
702                            }
703                        }
704                    }
705                    Ok::<(u64, u64, u64), BoxedError>((rows, array_mem_size, estimated_size))
706                }));
707            }
708
709            let mut total_rows = 0u64;
710            let mut total_array_mem_size = 0u64;
711            let mut total_estimated_size = 0u64;
712            while let Some(task) = scan_futures.next().await {
713                let result = task
714                    .map_err(|e| {
715                        BoxedError::new(PlainError::new(
716                            format!("scan task failed: {e}"),
717                            StatusCode::Unexpected,
718                        ))
719                    })
720                    .context(error::BuildCliSnafu)?;
721                let (rows, array_mem_size, estimated_size) =
722                    result.context(error::BuildCliSnafu)?;
723                total_rows += rows;
724                total_array_mem_size += array_mem_size;
725                total_estimated_size += estimated_size;
726            }
727
728            let elapsed = start.elapsed();
729            total_rows_all += total_rows;
730            total_elapsed_all += elapsed;
731
732            println!(
733                "  [iter {}] {} rows in {:?} ({} partitions), array_mem_size: {}, estimated_size: {}",
734                iteration + 1,
735                total_rows.to_string().cyan(),
736                elapsed,
737                num_partitions,
738                format_bytes(total_array_mem_size),
739                format_bytes(total_estimated_size),
740            );
741
742            // Start profiling after the first iteration (warmup) if pprof_after_warmup is set
743            #[cfg(unix)]
744            if iteration == 0
745                && self.pprof_after_warmup
746                && self.pprof_file.is_some()
747                && profiler_guard.is_none()
748            {
749                println!(
750                    "{} Starting profiling after warmup iteration...",
751                    "⚡".yellow()
752                );
753                profiler_guard = Some(
754                    pprof::ProfilerGuardBuilder::default()
755                        .frequency(99)
756                        .blocklist(&["libc", "libgcc", "pthread", "vdso"])
757                        .build()
758                        .map_err(|e| {
759                            BoxedError::new(PlainError::new(
760                                format!("Failed to start profiler: {e}"),
761                                StatusCode::Unexpected,
762                            ))
763                        })
764                        .context(error::BuildCliSnafu)?,
765                );
766            }
767        }
768
769        // Stop profiling and generate flamegraph if enabled
770        #[cfg(unix)]
771        if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
772            println!("{} Generating flamegraph...", "🔥".yellow());
773            match guard.report().build() {
774                Ok(report) => {
775                    let mut flamegraph_data = Vec::new();
776                    if let Err(e) = report.flamegraph(&mut flamegraph_data) {
777                        println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
778                    } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
779                        println!(
780                            "{}: Failed to write flamegraph to {}: {}",
781                            "Error".red(),
782                            pprof_file.display(),
783                            e
784                        );
785                    } else {
786                        println!(
787                            "{} Flamegraph saved to {}",
788                            "✓".green(),
789                            pprof_file.display().to_string().cyan()
790                        );
791                    }
792                }
793                Err(e) => {
794                    println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
795                }
796            }
797        }
798
799        // Summary
800        if self.iterations > 1 {
801            let avg_elapsed = total_elapsed_all / self.iterations as u32;
802            let avg_rows = total_rows_all / self.iterations as u64;
803            println!(
804                "\n{} Average: {} rows in {:?} over {} iterations",
805                "Summary".green().bold(),
806                avg_rows.to_string().cyan(),
807                avg_elapsed,
808                self.iterations,
809            );
810        }
811
812        println!("\n{}", "Benchmark completed!".green().bold());
813        Ok(())
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use datatypes::prelude::ConcreteDataType;
820    use datatypes::schema::ColumnSchema;
821    use sqlparser::ast::{BinaryOperator, Expr};
822    use sqlparser::dialect::GenericDialect;
823    use sqlparser::parser::Parser;
824    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
825    use store_api::storage::RegionId;
826
827    use super::{ScanConfig, resolve_filters, resolve_projection};
828    use crate::error;
829
830    #[test]
831    fn test_parse_scan_config_projection_names() {
832        let json = r#"{"projection_names":["host","ts"]}"#;
833        let config: ScanConfig = serde_json::from_str(json).unwrap();
834
835        assert_eq!(
836            config.projection_names,
837            Some(vec!["host".to_string(), "ts".to_string()])
838        );
839        assert_eq!(config.projection, None);
840    }
841
842    #[test]
843    fn test_resolve_projection_by_indexes() -> error::Result<()> {
844        let config = ScanConfig {
845            projection: Some(vec![0, 2]),
846            projection_names: None,
847            filters: None,
848            series_row_selector: None,
849        };
850
851        let projection = resolve_projection(&config, None)?;
852        assert_eq!(projection, Some(vec![0, 2]));
853        Ok(())
854    }
855
856    #[test]
857    fn test_resolve_projection_by_names_without_metadata() {
858        let config = ScanConfig {
859            projection: None,
860            projection_names: Some(vec!["cpu".to_string(), "host".to_string()]),
861            filters: None,
862            series_row_selector: None,
863        };
864
865        let err = resolve_projection(&config, None).unwrap_err();
866        assert!(
867            err.to_string()
868                .contains("Missing region metadata while resolving 'projection_names'")
869        );
870    }
871
872    #[test]
873    fn test_resolve_projection_conflict_fields() {
874        let config = ScanConfig {
875            projection: Some(vec![0]),
876            projection_names: Some(vec!["host".to_string()]),
877            filters: None,
878            series_row_selector: None,
879        };
880
881        let err = resolve_projection(&config, None).unwrap_err();
882        let msg = err.to_string();
883        assert!(msg.contains("projection"));
884        assert!(msg.contains("projection_names"));
885    }
886
887    #[test]
888    fn test_sqlparser_parse_expr_string() {
889        let dialect = GenericDialect {};
890        let mut parser = Parser::new(&dialect)
891            .try_with_sql("host = 'web-1' AND cpu > 80")
892            .unwrap();
893
894        let expr = parser.parse_expr().unwrap();
895
896        match expr {
897            Expr::BinaryOp { op, .. } => assert_eq!(op, BinaryOperator::And),
898            other => panic!("expected BinaryOp, got: {other:?}"),
899        }
900    }
901
902    #[test]
903    fn test_resolve_filters_uint32_type_conversion() {
904        use api::v1::SemanticType;
905
906        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 0));
907        builder
908            .push_column_metadata(ColumnMetadata {
909                column_schema: ColumnSchema::new(
910                    "table_id",
911                    ConcreteDataType::uint32_datatype(),
912                    false,
913                ),
914                semantic_type: SemanticType::Tag,
915                column_id: 1,
916            })
917            .push_column_metadata(ColumnMetadata {
918                column_schema: ColumnSchema::new(
919                    "ts",
920                    ConcreteDataType::timestamp_millisecond_datatype(),
921                    false,
922                ),
923                semantic_type: SemanticType::Timestamp,
924                column_id: 2,
925            })
926            .primary_key(vec![1]);
927        let metadata = builder.build().unwrap();
928
929        let config = ScanConfig {
930            projection: None,
931            projection_names: None,
932            filters: Some(vec!["table_id = 1117".to_string()]),
933            series_row_selector: None,
934        };
935
936        let exprs = resolve_filters(&config, &metadata).unwrap();
937        assert_eq!(exprs.len(), 1);
938        // The expression should contain a UInt32 literal after type conversion.
939        let expr_str = format!("{}", exprs[0]);
940        assert!(
941            expr_str.contains("UInt32(1117)"),
942            "Expected UInt32(1117) in expression, got: {expr_str}"
943        );
944    }
945
946    #[test]
947    fn test_parse_scan_config_filters() {
948        let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#;
949        let config: ScanConfig = serde_json::from_str(json).unwrap();
950
951        assert_eq!(
952            config.filters,
953            Some(vec!["host = 'web-1'".to_string(), "cpu > 80".to_string()])
954        );
955    }
956}