1use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Instant;
19
20use clap::Parser;
21use colored::Colorize;
22use common_base::Plugins;
23use common_error::ext::{BoxedError, PlainError};
24use common_error::status_code::StatusCode;
25use common_meta::cache::{new_schema_cache, new_table_schema_cache};
26use common_meta::key::SchemaMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use common_wal::config::DatanodeWalConfig;
29use datafusion::execution::SessionStateBuilder;
30use datafusion::logical_expr::{BinaryExpr, Expr as DfExpr, ExprSchemable, Operator};
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchemaRef, ScalarValue, ToDFSchema};
33use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
34use datatypes::arrow::compute;
35use futures::StreamExt;
36use futures::stream::FuturesUnordered;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::noop::log_store::NoopLogStore;
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use mito2::config::MitoConfig;
41use mito2::engine::MitoEngine;
42use mito2::sst::file_ref::FileReferenceManager;
43use moka::future::CacheBuilder;
44use object_store::manager::ObjectStoreManager;
45use object_store::util::normalize_dir;
46use query::optimizer::parallelize_scan::ParallelizeScan;
47use serde::Deserialize;
48use snafu::{OptionExt, ResultExt};
49use sqlparser::ast::ExprWithAlias as SqlExprWithAlias;
50use sqlparser::dialect::GenericDialect;
51use sqlparser::parser::Parser as SqlParser;
52use store_api::metadata::RegionMetadata;
53use store_api::path_utils::WAL_DIR;
54use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine};
55use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
56use store_api::storage::{
57 ProjectionInput, RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
58};
59use tokio::fs;
60
61use crate::datanode::objbench::{build_object_store, parse_config};
62use crate::error;
63
64#[derive(Debug, Parser)]
66pub struct ScanbenchCommand {
67 #[clap(long, value_name = "FILE")]
69 config: PathBuf,
70
71 #[clap(long)]
73 region_id: String,
74
75 #[clap(long)]
77 table_dir: String,
78
79 #[clap(long, default_value = "seq")]
81 scanner: String,
82
83 #[clap(long, value_name = "FILE")]
85 scan_config: Option<PathBuf>,
86
87 #[clap(long, default_value = "1")]
89 parallelism: usize,
90
91 #[clap(long, default_value = "1")]
93 iterations: usize,
94
95 #[clap(long, default_value = "bare")]
97 path_type: String,
98
99 #[clap(short, long, default_value_t = false)]
101 verbose: bool,
102
103 #[clap(long, value_name = "FILE")]
105 pprof_file: Option<PathBuf>,
106
107 #[clap(long, default_value_t = false)]
109 enable_wal: bool,
110
111 #[clap(long, default_value_t = false)]
113 pprof_after_warmup: bool,
114}
115
116#[derive(Debug, Deserialize, Default)]
118struct ScanConfig {
119 projection: Option<Vec<usize>>,
120 projection_names: Option<Vec<String>>,
121 filters: Option<Vec<String>>,
122 series_row_selector: Option<String>,
123}
124
125fn resolve_projection(
126 scan_config: &ScanConfig,
127 metadata: Option<&RegionMetadata>,
128) -> error::Result<Option<Vec<usize>>> {
129 if scan_config.projection.is_some() && scan_config.projection_names.is_some() {
130 return Err(error::IllegalConfigSnafu {
131 msg: "scan config cannot contain both 'projection' and 'projection_names'".to_string(),
132 }
133 .build());
134 }
135
136 if let Some(projection) = &scan_config.projection {
137 return Ok(Some(projection.clone()));
138 }
139
140 if let Some(projection_names) = &scan_config.projection_names {
141 let metadata = metadata.context(error::IllegalConfigSnafu {
142 msg: "Missing region metadata while resolving 'projection_names'".to_string(),
143 })?;
144 let available_columns = metadata
145 .column_metadatas
146 .iter()
147 .map(|column| column.column_schema.name.as_str())
148 .collect::<Vec<_>>()
149 .join(", ");
150 let projection = projection_names
151 .iter()
152 .map(|name| {
153 metadata
154 .column_index_by_name(name)
155 .with_context(|| error::IllegalConfigSnafu {
156 msg: format!(
157 "Unknown column '{}' in projection_names, available columns: [{}]",
158 name, available_columns
159 ),
160 })
161 })
162 .collect::<error::Result<Vec<_>>>()?;
163 return Ok(Some(projection));
164 }
165
166 Ok(None)
167}
168
169fn format_bytes(bytes: u64) -> String {
170 const KIB: u64 = 1024;
171 const MIB: u64 = 1024 * KIB;
172 const GIB: u64 = 1024 * MIB;
173 if bytes >= GIB {
174 format!("{:.2} GiB", bytes as f64 / GIB as f64)
175 } else if bytes >= MIB {
176 format!("{:.2} MiB", bytes as f64 / MIB as f64)
177 } else if bytes >= KIB {
178 format!("{:.2} KiB", bytes as f64 / KIB as f64)
179 } else {
180 format!("{} B", bytes)
181 }
182}
183
184fn parse_region_id(s: &str) -> error::Result<RegionId> {
185 if s.contains(':') {
186 let parts: Vec<&str> = s.splitn(2, ':').collect();
187 let table_id: u32 = parts[0].parse().map_err(|e| {
188 error::IllegalConfigSnafu {
189 msg: format!("invalid table_id in region_id '{}': {}", s, e),
190 }
191 .build()
192 })?;
193 let region_num: u32 = parts[1].parse().map_err(|e| {
194 error::IllegalConfigSnafu {
195 msg: format!("invalid region_num in region_id '{}': {}", s, e),
196 }
197 .build()
198 })?;
199 Ok(RegionId::new(table_id, region_num))
200 } else {
201 let id: u64 = s.parse().map_err(|e| {
202 error::IllegalConfigSnafu {
203 msg: format!("invalid region_id '{}': {}", s, e),
204 }
205 .build()
206 })?;
207 Ok(RegionId::from_u64(id))
208 }
209}
210
211fn parse_path_type(s: &str) -> error::Result<PathType> {
212 match s.to_lowercase().as_str() {
213 "bare" => Ok(PathType::Bare),
214 "data" => Ok(PathType::Data),
215 "metadata" => Ok(PathType::Metadata),
216 _ => Err(error::IllegalConfigSnafu {
217 msg: format!("invalid path_type '{}', expected: bare, data, metadata", s),
218 }
219 .build()),
220 }
221}
222
223struct LiteralTypeCaster {
225 schema: DFSchemaRef,
226}
227
228impl TreeNodeRewriter for LiteralTypeCaster {
229 type Node = DfExpr;
230
231 fn f_up(&mut self, expr: DfExpr) -> datafusion_common::Result<Transformed<DfExpr>> {
232 let DfExpr::BinaryExpr(BinaryExpr { left, op, right }) = &expr else {
233 return Ok(Transformed::no(expr));
234 };
235
236 if !matches!(
237 op,
238 Operator::Eq
239 | Operator::NotEq
240 | Operator::Lt
241 | Operator::LtEq
242 | Operator::Gt
243 | Operator::GtEq
244 ) {
245 return Ok(Transformed::no(expr));
246 }
247
248 let (col_expr, lit_expr, col_left) = match (left.as_ref(), right.as_ref()) {
249 (col @ DfExpr::Column(_), lit @ DfExpr::Literal(_, _)) => (col, lit, true),
250 (lit @ DfExpr::Literal(_, _), col @ DfExpr::Column(_)) => (col, lit, false),
251 _ => return Ok(Transformed::no(expr)),
252 };
253
254 let col_type = col_expr.get_type(self.schema.as_ref())?;
255 let DfExpr::Literal(scalar, _) = lit_expr else {
256 unreachable!()
257 };
258
259 if scalar.data_type() == col_type {
260 return Ok(Transformed::no(expr));
261 }
262
263 let lit_array = scalar.to_array()?;
264 let casted = compute::cast(lit_array.as_ref(), &col_type).map_err(|e| {
265 datafusion_common::DataFusionError::Internal(format!(
266 "Failed to cast literal {:?} to {:?}: {}",
267 scalar, col_type, e
268 ))
269 })?;
270 let casted_scalar = ScalarValue::try_from_array(&casted, 0)?;
271
272 let new_lit = DfExpr::Literal(casted_scalar, None);
273 let (new_left, new_right) = if col_left {
274 (left.clone(), Box::new(new_lit))
275 } else {
276 (Box::new(new_lit), right.clone())
277 };
278
279 Ok(Transformed::yes(DfExpr::BinaryExpr(BinaryExpr {
280 left: new_left,
281 op: *op,
282 right: new_right,
283 })))
284 }
285}
286
287fn convert_literal_types(
288 exprs: Vec<DfExpr>,
289 schema: &DFSchemaRef,
290) -> datafusion_common::Result<Vec<DfExpr>> {
291 use datafusion_common::tree_node::TreeNode;
292
293 let mut caster = LiteralTypeCaster {
294 schema: schema.clone(),
295 };
296 exprs
297 .into_iter()
298 .map(|e| e.rewrite(&mut caster).map(|x| x.data))
299 .collect()
300}
301
302fn resolve_filters(
303 scan_config: &ScanConfig,
304 metadata: &RegionMetadata,
305) -> error::Result<Vec<DfExpr>> {
306 let Some(filters) = &scan_config.filters else {
307 return Ok(Vec::new());
308 };
309
310 let df_schema = metadata
311 .schema
312 .arrow_schema()
313 .clone()
314 .to_dfschema()
315 .map_err(|e| {
316 error::IllegalConfigSnafu {
317 msg: format!("Failed to convert region schema to DataFusion schema: {e}"),
318 }
319 .build()
320 })?;
321
322 let state = SessionStateBuilder::new()
323 .with_config(Default::default())
324 .with_runtime_env(Default::default())
325 .with_default_features()
326 .build();
327
328 let exprs: Vec<DfExpr> = filters
329 .iter()
330 .enumerate()
331 .map(|(idx, filter)| {
332 let mut parser = SqlParser::new(&GenericDialect {})
333 .try_with_sql(filter)
334 .map_err(|e| {
335 error::IllegalConfigSnafu {
336 msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
337 }
338 .build()
339 })?;
340
341 let sql_expr = parser.parse_expr().map_err(|e| {
342 error::IllegalConfigSnafu {
343 msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
344 }
345 .build()
346 })?;
347
348 state
349 .create_logical_expr_from_sql_expr(
350 SqlExprWithAlias {
351 expr: sql_expr,
352 alias: None,
353 },
354 &df_schema,
355 )
356 .map_err(|e| {
357 error::IllegalConfigSnafu {
358 msg: format!(
359 "Failed to convert filter at index {idx} ('{filter}') to logical expr: {e}"
360 ),
361 }
362 .build()
363 })
364 })
365 .collect::<error::Result<Vec<_>>>()?;
366
367 let df_schema_ref = Arc::new(df_schema);
368 convert_literal_types(exprs, &df_schema_ref).map_err(|e| {
369 error::IllegalConfigSnafu {
370 msg: format!("Failed to convert filter expression types: {e}"),
371 }
372 .build()
373 })
374}
375
376fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef {
377 struct NoopPartitionExprFetcher;
378
379 #[async_trait::async_trait]
380 impl mito2::region::opener::PartitionExprFetcher for NoopPartitionExprFetcher {
381 async fn fetch_expr(&self, _region_id: RegionId) -> Option<String> {
382 None
383 }
384 }
385
386 Arc::new(NoopPartitionExprFetcher)
387}
388
389struct EngineComponents {
390 data_home: String,
391 mito_config: MitoConfig,
392 object_store_manager: Arc<ObjectStoreManager>,
393 schema_metadata_manager: Arc<SchemaMetadataManager>,
394 file_ref_manager: Arc<FileReferenceManager>,
395 partition_expr_fetcher: mito2::region::opener::PartitionExprFetcherRef,
396}
397
398impl EngineComponents {
399 async fn build<S: store_api::logstore::LogStore>(
400 self,
401 log_store: Arc<S>,
402 ) -> error::Result<MitoEngine> {
403 MitoEngine::new(
404 &self.data_home,
405 self.mito_config,
406 log_store,
407 self.object_store_manager,
408 self.schema_metadata_manager,
409 self.file_ref_manager,
410 self.partition_expr_fetcher,
411 Plugins::default(),
412 )
413 .await
414 .map_err(BoxedError::new)
415 .context(error::BuildCliSnafu)
416 }
417}
418
419fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
420 let kv_backend = Arc::new(MemoryKvBackend::new());
421 let table_schema_cache = Arc::new(new_table_schema_cache(
422 "table_schema_name_cache".to_string(),
423 CacheBuilder::default().build(),
424 kv_backend.clone(),
425 ));
426 let schema_cache = Arc::new(new_schema_cache(
427 "schema_cache".to_string(),
428 CacheBuilder::default().build(),
429 kv_backend.clone(),
430 ));
431 Arc::new(SchemaMetadataManager::new(table_schema_cache, schema_cache))
432}
433
434impl ScanbenchCommand {
435 pub async fn run(&self) -> error::Result<()> {
436 if self.verbose {
437 common_telemetry::init_default_ut_logging();
438 }
439
440 println!("{}", "Starting scanbench...".cyan().bold());
441
442 let region_id = parse_region_id(&self.region_id)?;
443 let path_type = parse_path_type(&self.path_type)?;
444 println!(
445 "{} Region ID: {} (u64: {})",
446 "✓".green(),
447 self.region_id,
448 region_id.as_u64()
449 );
450
451 let (store_cfg, mito_config, wal_config) = parse_config(&self.config)?;
453 println!("{} Config parsed", "✓".green());
454
455 let object_store = build_object_store(&store_cfg).await?;
456 println!("{} Object store initialized", "✓".green());
457
458 let object_store_manager =
459 Arc::new(ObjectStoreManager::new("default", object_store.clone()));
460
461 let schema_metadata_manager = mock_schema_metadata_manager();
463 let file_ref_manager = Arc::new(FileReferenceManager::new(None));
464 let partition_expr_fetcher = noop_partition_expr_fetcher();
465
466 let components = EngineComponents {
468 data_home: store_cfg.data_home.clone(),
469 mito_config,
470 object_store_manager,
471 schema_metadata_manager,
472 file_ref_manager,
473 partition_expr_fetcher,
474 };
475
476 let engine = match &wal_config {
477 DatanodeWalConfig::RaftEngine(raft_engine_config) if self.enable_wal => {
478 let data_home = normalize_dir(&store_cfg.data_home);
479 let wal_dir = match &raft_engine_config.dir {
480 Some(dir) => dir.clone(),
481 None => format!("{}{WAL_DIR}", data_home),
482 };
483 fs::create_dir_all(&wal_dir).await.map_err(|e| {
484 error::IllegalConfigSnafu {
485 msg: format!("failed to create WAL directory {}: {e}", wal_dir),
486 }
487 .build()
488 })?;
489 let log_store = Arc::new(
490 RaftEngineLogStore::try_new(wal_dir, raft_engine_config)
491 .await
492 .map_err(BoxedError::new)
493 .context(error::BuildCliSnafu)?,
494 );
495 println!("{} Using RaftEngine WAL", "✓".green());
496 components.build(log_store).await?
497 }
498 DatanodeWalConfig::Kafka(kafka_config) if self.enable_wal => {
499 let log_store = Arc::new(
500 KafkaLogStore::try_new(kafka_config, None)
501 .await
502 .map_err(BoxedError::new)
503 .context(error::BuildCliSnafu)?,
504 );
505 println!("{} Using Kafka WAL", "✓".green());
506 components.build(log_store).await?
507 }
508 _ => {
509 let log_store = Arc::new(NoopLogStore);
510 println!(
511 "{} Using NoopLogStore (enable_wal={})",
512 "✓".green(),
513 self.enable_wal
514 );
515 components.build(log_store).await?
516 }
517 };
518
519 let open_request = RegionOpenRequest {
521 engine: "mito".to_string(),
522 table_dir: self.table_dir.clone(),
523 path_type,
524 options: HashMap::default(),
525 skip_wal_replay: !self.enable_wal,
526 checkpoint: None,
527 };
528
529 engine
530 .handle_request(region_id, RegionRequest::Open(open_request))
531 .await
532 .map_err(BoxedError::new)
533 .context(error::BuildCliSnafu)?;
534 println!("{} Region opened", "✓".green());
535
536 let scan_config = if let Some(path) = &self.scan_config {
538 let content = tokio::fs::read_to_string(path)
539 .await
540 .context(error::FileIoSnafu)?;
541 serde_json::from_str::<ScanConfig>(&content).context(error::SerdeJsonSnafu)?
542 } else {
543 ScanConfig::default()
544 };
545 let metadata = engine
546 .get_metadata(region_id)
547 .await
548 .map_err(BoxedError::new)
549 .context(error::BuildCliSnafu)?;
550 let projection = resolve_projection(&scan_config, Some(&metadata))?;
551 let filters = resolve_filters(&scan_config, &metadata)?;
552
553 let distribution = match self.scanner.as_str() {
555 "seq" => None,
556 "unordered" => Some(TimeSeriesDistribution::TimeWindowed),
557 "series" => Some(TimeSeriesDistribution::PerSeries),
558 other => {
559 return Err(error::IllegalConfigSnafu {
560 msg: format!(
561 "Unknown scanner type '{}', expected: seq, unordered, series",
562 other
563 ),
564 }
565 .build());
566 }
567 };
568
569 let series_row_selector = match scan_config.series_row_selector.as_deref() {
570 Some("last_row") => Some(TimeSeriesRowSelector::LastRow),
571 Some(other) => {
572 return Err(error::IllegalConfigSnafu {
573 msg: format!("Unknown series_row_selector '{}'", other),
574 }
575 .build());
576 }
577 None => None,
578 };
579
580 println!(
581 "{} Scanner: {}, Parallelism: {}, Iterations: {}",
582 "ℹ".blue(),
583 self.scanner,
584 self.parallelism,
585 self.iterations,
586 );
587
588 #[cfg(unix)]
590 let mut profiler_guard = if self.pprof_file.is_some() && !self.pprof_after_warmup {
591 println!("{} Starting profiling...", "⚡".yellow());
592 Some(
593 pprof::ProfilerGuardBuilder::default()
594 .frequency(99)
595 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
596 .build()
597 .map_err(|e| {
598 BoxedError::new(PlainError::new(
599 format!("Failed to start profiler: {e}"),
600 StatusCode::Unexpected,
601 ))
602 })
603 .context(error::BuildCliSnafu)?,
604 )
605 } else {
606 None
607 };
608
609 #[cfg(not(unix))]
610 if self.pprof_file.is_some() {
611 eprintln!(
612 "{}: Profiling is not supported on this platform",
613 "Warning".yellow()
614 );
615 }
616
617 let mut total_rows_all = 0u64;
618 let mut total_elapsed_all = std::time::Duration::ZERO;
619
620 let projection_input = projection.map(ProjectionInput::new);
621 for iteration in 0..self.iterations {
622 let request = ScanRequest {
623 projection_input: projection_input.clone(),
624 filters: filters.clone(),
625 series_row_selector,
626 distribution,
627 ..Default::default()
628 };
629
630 let start = Instant::now();
631
632 let mut scanner = engine
634 .handle_query(region_id, request)
635 .await
636 .map_err(BoxedError::new)
637 .context(error::BuildCliSnafu)?;
638
639 let original_partitions = scanner.properties().partitions.clone();
641 let total_ranges: usize = original_partitions.iter().map(|p| p.len()).sum();
642
643 if self.verbose {
644 println!(
645 " {} Original partitions: {}, total ranges: {}",
646 "ℹ".blue(),
647 original_partitions.len(),
648 total_ranges
649 );
650 }
651
652 if self.parallelism > 1 {
653 let all_ranges: Vec<_> = original_partitions.into_iter().flatten().collect();
655
656 let mut partitions =
658 ParallelizeScan::assign_partition_range(all_ranges, self.parallelism);
659
660 for partition in &mut partitions {
662 partition.sort_by_key(|a| a.start);
663 }
664
665 scanner
666 .prepare(
667 PrepareRequest::default()
668 .with_ranges(partitions)
669 .with_target_partitions(self.parallelism),
670 )
671 .map_err(BoxedError::new)
672 .context(error::BuildCliSnafu)?;
673 }
674
675 let num_partitions = scanner.properties().partitions.len();
677 let ctx = QueryScanContext {
678 explain_verbose: self.verbose,
679 };
680 let metrics_set = ExecutionPlanMetricsSet::new();
681
682 let mut scan_futures = FuturesUnordered::new();
683
684 for partition_idx in 0..num_partitions {
685 let mut stream = scanner
686 .scan_partition(&ctx, &metrics_set, partition_idx)
687 .map_err(BoxedError::new)
688 .context(error::BuildCliSnafu)?;
689
690 scan_futures.push(tokio::spawn(async move {
691 let mut rows = 0u64;
692 let mut array_mem_size = 0u64;
693 let mut estimated_size = 0u64;
694 while let Some(batch_result) = stream.next().await {
695 match batch_result {
696 Ok(batch) => {
697 rows += batch.num_rows() as u64;
698 let df_batch = batch.df_record_batch();
699 array_mem_size += df_batch.get_array_memory_size() as u64;
700 estimated_size +=
701 mito2::memtable::record_batch_estimated_size(df_batch) as u64;
702 }
703 Err(e) => {
704 return Err(BoxedError::new(e));
705 }
706 }
707 }
708 Ok::<(u64, u64, u64), BoxedError>((rows, array_mem_size, estimated_size))
709 }));
710 }
711
712 let mut total_rows = 0u64;
713 let mut total_array_mem_size = 0u64;
714 let mut total_estimated_size = 0u64;
715 while let Some(task) = scan_futures.next().await {
716 let result = task
717 .map_err(|e| {
718 BoxedError::new(PlainError::new(
719 format!("scan task failed: {e}"),
720 StatusCode::Unexpected,
721 ))
722 })
723 .context(error::BuildCliSnafu)?;
724 let (rows, array_mem_size, estimated_size) =
725 result.context(error::BuildCliSnafu)?;
726 total_rows += rows;
727 total_array_mem_size += array_mem_size;
728 total_estimated_size += estimated_size;
729 }
730
731 let elapsed = start.elapsed();
732 total_rows_all += total_rows;
733 total_elapsed_all += elapsed;
734
735 println!(
736 " [iter {}] {} rows in {:?} ({} partitions), array_mem_size: {}, estimated_size: {}",
737 iteration + 1,
738 total_rows.to_string().cyan(),
739 elapsed,
740 num_partitions,
741 format_bytes(total_array_mem_size),
742 format_bytes(total_estimated_size),
743 );
744
745 #[cfg(unix)]
747 if iteration == 0
748 && self.pprof_after_warmup
749 && self.pprof_file.is_some()
750 && profiler_guard.is_none()
751 {
752 println!(
753 "{} Starting profiling after warmup iteration...",
754 "⚡".yellow()
755 );
756 profiler_guard = Some(
757 pprof::ProfilerGuardBuilder::default()
758 .frequency(99)
759 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
760 .build()
761 .map_err(|e| {
762 BoxedError::new(PlainError::new(
763 format!("Failed to start profiler: {e}"),
764 StatusCode::Unexpected,
765 ))
766 })
767 .context(error::BuildCliSnafu)?,
768 );
769 }
770 }
771
772 #[cfg(unix)]
774 if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
775 println!("{} Generating flamegraph...", "🔥".yellow());
776 match guard.report().build() {
777 Ok(report) => {
778 let mut flamegraph_data = Vec::new();
779 if let Err(e) = report.flamegraph(&mut flamegraph_data) {
780 println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
781 } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
782 println!(
783 "{}: Failed to write flamegraph to {}: {}",
784 "Error".red(),
785 pprof_file.display(),
786 e
787 );
788 } else {
789 println!(
790 "{} Flamegraph saved to {}",
791 "✓".green(),
792 pprof_file.display().to_string().cyan()
793 );
794 }
795 }
796 Err(e) => {
797 println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
798 }
799 }
800 }
801
802 if self.iterations > 1 {
804 let avg_elapsed = total_elapsed_all / self.iterations as u32;
805 let avg_rows = total_rows_all / self.iterations as u64;
806 println!(
807 "\n{} Average: {} rows in {:?} over {} iterations",
808 "Summary".green().bold(),
809 avg_rows.to_string().cyan(),
810 avg_elapsed,
811 self.iterations,
812 );
813 }
814
815 println!("\n{}", "Benchmark completed!".green().bold());
816 Ok(())
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use datatypes::prelude::ConcreteDataType;
823 use datatypes::schema::ColumnSchema;
824 use sqlparser::ast::{BinaryOperator, Expr};
825 use sqlparser::dialect::GenericDialect;
826 use sqlparser::parser::Parser;
827 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
828 use store_api::storage::RegionId;
829
830 use super::{ScanConfig, resolve_filters, resolve_projection};
831 use crate::error;
832
833 #[test]
834 fn test_parse_scan_config_projection_names() {
835 let json = r#"{"projection_names":["host","ts"]}"#;
836 let config: ScanConfig = serde_json::from_str(json).unwrap();
837
838 assert_eq!(
839 config.projection_names,
840 Some(vec!["host".to_string(), "ts".to_string()])
841 );
842 assert_eq!(config.projection, None);
843 }
844
845 #[test]
846 fn test_resolve_projection_by_indexes() -> error::Result<()> {
847 let config = ScanConfig {
848 projection: Some(vec![0, 2]),
849 projection_names: None,
850 filters: None,
851 series_row_selector: None,
852 };
853
854 let projection = resolve_projection(&config, None)?;
855 assert_eq!(projection, Some(vec![0, 2]));
856 Ok(())
857 }
858
859 #[test]
860 fn test_resolve_projection_by_names_without_metadata() {
861 let config = ScanConfig {
862 projection: None,
863 projection_names: Some(vec!["cpu".to_string(), "host".to_string()]),
864 filters: None,
865 series_row_selector: None,
866 };
867
868 let err = resolve_projection(&config, None).unwrap_err();
869 assert!(
870 err.to_string()
871 .contains("Missing region metadata while resolving 'projection_names'")
872 );
873 }
874
875 #[test]
876 fn test_resolve_projection_conflict_fields() {
877 let config = ScanConfig {
878 projection: Some(vec![0]),
879 projection_names: Some(vec!["host".to_string()]),
880 filters: None,
881 series_row_selector: None,
882 };
883
884 let err = resolve_projection(&config, None).unwrap_err();
885 let msg = err.to_string();
886 assert!(msg.contains("projection"));
887 assert!(msg.contains("projection_names"));
888 }
889
890 #[test]
891 fn test_sqlparser_parse_expr_string() {
892 let dialect = GenericDialect {};
893 let mut parser = Parser::new(&dialect)
894 .try_with_sql("host = 'web-1' AND cpu > 80")
895 .unwrap();
896
897 let expr = parser.parse_expr().unwrap();
898
899 match expr {
900 Expr::BinaryOp { op, .. } => assert_eq!(op, BinaryOperator::And),
901 other => panic!("expected BinaryOp, got: {other:?}"),
902 }
903 }
904
905 #[test]
906 fn test_resolve_filters_uint32_type_conversion() {
907 use api::v1::SemanticType;
908
909 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 0));
910 builder
911 .push_column_metadata(ColumnMetadata {
912 column_schema: ColumnSchema::new(
913 "table_id",
914 ConcreteDataType::uint32_datatype(),
915 false,
916 ),
917 semantic_type: SemanticType::Tag,
918 column_id: 1,
919 })
920 .push_column_metadata(ColumnMetadata {
921 column_schema: ColumnSchema::new(
922 "ts",
923 ConcreteDataType::timestamp_millisecond_datatype(),
924 false,
925 ),
926 semantic_type: SemanticType::Timestamp,
927 column_id: 2,
928 })
929 .primary_key(vec![1]);
930 let metadata = builder.build().unwrap();
931
932 let config = ScanConfig {
933 projection: None,
934 projection_names: None,
935 filters: Some(vec!["table_id = 1117".to_string()]),
936 series_row_selector: None,
937 };
938
939 let exprs = resolve_filters(&config, &metadata).unwrap();
940 assert_eq!(exprs.len(), 1);
941 let expr_str = format!("{}", exprs[0]);
943 assert!(
944 expr_str.contains("UInt32(1117)"),
945 "Expected UInt32(1117) in expression, got: {expr_str}"
946 );
947 }
948
949 #[test]
950 fn test_parse_scan_config_filters() {
951 let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#;
952 let config: ScanConfig = serde_json::from_str(json).unwrap();
953
954 assert_eq!(
955 config.filters,
956 Some(vec!["host = 'web-1'".to_string(), "cpu > 80".to_string()])
957 );
958 }
959}