1use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Instant;
19
20use clap::Parser;
21use colored::Colorize;
22use common_base::Plugins;
23use common_error::ext::{BoxedError, PlainError};
24use common_error::status_code::StatusCode;
25use common_meta::cache::{new_schema_cache, new_table_schema_cache};
26use common_meta::key::SchemaMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use common_wal::config::DatanodeWalConfig;
29use datafusion::execution::SessionStateBuilder;
30use datafusion::logical_expr::{BinaryExpr, Expr as DfExpr, ExprSchemable, Operator};
31use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
32use datafusion_common::{DFSchemaRef, ScalarValue, ToDFSchema};
33use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
34use datatypes::arrow::compute;
35use futures::StreamExt;
36use futures::stream::FuturesUnordered;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::noop::log_store::NoopLogStore;
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use mito2::config::MitoConfig;
41use mito2::engine::MitoEngine;
42use mito2::sst::file_ref::FileReferenceManager;
43use moka::future::CacheBuilder;
44use object_store::manager::ObjectStoreManager;
45use object_store::util::normalize_dir;
46use query::optimizer::parallelize_scan::ParallelizeScan;
47use serde::Deserialize;
48use snafu::{OptionExt, ResultExt};
49use sqlparser::ast::ExprWithAlias as SqlExprWithAlias;
50use sqlparser::dialect::GenericDialect;
51use sqlparser::parser::Parser as SqlParser;
52use store_api::metadata::RegionMetadata;
53use store_api::path_utils::WAL_DIR;
54use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine};
55use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
56use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
57use tokio::fs;
58
59use crate::datanode::objbench::{build_object_store, parse_config};
60use crate::error;
61
62#[derive(Debug, Parser)]
64pub struct ScanbenchCommand {
65 #[clap(long, value_name = "FILE")]
67 config: PathBuf,
68
69 #[clap(long)]
71 region_id: String,
72
73 #[clap(long)]
75 table_dir: String,
76
77 #[clap(long, default_value = "seq")]
79 scanner: String,
80
81 #[clap(long, value_name = "FILE")]
83 scan_config: Option<PathBuf>,
84
85 #[clap(long, default_value = "1")]
87 parallelism: usize,
88
89 #[clap(long, default_value = "1")]
91 iterations: usize,
92
93 #[clap(long, default_value = "bare")]
95 path_type: String,
96
97 #[clap(short, long, default_value_t = false)]
99 verbose: bool,
100
101 #[clap(long, value_name = "FILE")]
103 pprof_file: Option<PathBuf>,
104
105 #[clap(long, default_value_t = false)]
107 enable_wal: bool,
108
109 #[clap(long, default_value_t = false)]
111 pprof_after_warmup: bool,
112}
113
114#[derive(Debug, Deserialize, Default)]
116struct ScanConfig {
117 projection: Option<Vec<usize>>,
118 projection_names: Option<Vec<String>>,
119 filters: Option<Vec<String>>,
120 series_row_selector: Option<String>,
121}
122
123fn resolve_projection(
124 scan_config: &ScanConfig,
125 metadata: Option<&RegionMetadata>,
126) -> error::Result<Option<Vec<usize>>> {
127 if scan_config.projection.is_some() && scan_config.projection_names.is_some() {
128 return Err(error::IllegalConfigSnafu {
129 msg: "scan config cannot contain both 'projection' and 'projection_names'".to_string(),
130 }
131 .build());
132 }
133
134 if let Some(projection) = &scan_config.projection {
135 return Ok(Some(projection.clone()));
136 }
137
138 if let Some(projection_names) = &scan_config.projection_names {
139 let metadata = metadata.context(error::IllegalConfigSnafu {
140 msg: "Missing region metadata while resolving 'projection_names'".to_string(),
141 })?;
142 let available_columns = metadata
143 .column_metadatas
144 .iter()
145 .map(|column| column.column_schema.name.as_str())
146 .collect::<Vec<_>>()
147 .join(", ");
148 let projection = projection_names
149 .iter()
150 .map(|name| {
151 metadata
152 .column_index_by_name(name)
153 .with_context(|| error::IllegalConfigSnafu {
154 msg: format!(
155 "Unknown column '{}' in projection_names, available columns: [{}]",
156 name, available_columns
157 ),
158 })
159 })
160 .collect::<error::Result<Vec<_>>>()?;
161 return Ok(Some(projection));
162 }
163
164 Ok(None)
165}
166
167fn format_bytes(bytes: u64) -> String {
168 const KIB: u64 = 1024;
169 const MIB: u64 = 1024 * KIB;
170 const GIB: u64 = 1024 * MIB;
171 if bytes >= GIB {
172 format!("{:.2} GiB", bytes as f64 / GIB as f64)
173 } else if bytes >= MIB {
174 format!("{:.2} MiB", bytes as f64 / MIB as f64)
175 } else if bytes >= KIB {
176 format!("{:.2} KiB", bytes as f64 / KIB as f64)
177 } else {
178 format!("{} B", bytes)
179 }
180}
181
182fn parse_region_id(s: &str) -> error::Result<RegionId> {
183 if s.contains(':') {
184 let parts: Vec<&str> = s.splitn(2, ':').collect();
185 let table_id: u32 = parts[0].parse().map_err(|e| {
186 error::IllegalConfigSnafu {
187 msg: format!("invalid table_id in region_id '{}': {}", s, e),
188 }
189 .build()
190 })?;
191 let region_num: u32 = parts[1].parse().map_err(|e| {
192 error::IllegalConfigSnafu {
193 msg: format!("invalid region_num in region_id '{}': {}", s, e),
194 }
195 .build()
196 })?;
197 Ok(RegionId::new(table_id, region_num))
198 } else {
199 let id: u64 = s.parse().map_err(|e| {
200 error::IllegalConfigSnafu {
201 msg: format!("invalid region_id '{}': {}", s, e),
202 }
203 .build()
204 })?;
205 Ok(RegionId::from_u64(id))
206 }
207}
208
209fn parse_path_type(s: &str) -> error::Result<PathType> {
210 match s.to_lowercase().as_str() {
211 "bare" => Ok(PathType::Bare),
212 "data" => Ok(PathType::Data),
213 "metadata" => Ok(PathType::Metadata),
214 _ => Err(error::IllegalConfigSnafu {
215 msg: format!("invalid path_type '{}', expected: bare, data, metadata", s),
216 }
217 .build()),
218 }
219}
220
221struct LiteralTypeCaster {
223 schema: DFSchemaRef,
224}
225
226impl TreeNodeRewriter for LiteralTypeCaster {
227 type Node = DfExpr;
228
229 fn f_up(&mut self, expr: DfExpr) -> datafusion_common::Result<Transformed<DfExpr>> {
230 let DfExpr::BinaryExpr(BinaryExpr { left, op, right }) = &expr else {
231 return Ok(Transformed::no(expr));
232 };
233
234 if !matches!(
235 op,
236 Operator::Eq
237 | Operator::NotEq
238 | Operator::Lt
239 | Operator::LtEq
240 | Operator::Gt
241 | Operator::GtEq
242 ) {
243 return Ok(Transformed::no(expr));
244 }
245
246 let (col_expr, lit_expr, col_left) = match (left.as_ref(), right.as_ref()) {
247 (col @ DfExpr::Column(_), lit @ DfExpr::Literal(_, _)) => (col, lit, true),
248 (lit @ DfExpr::Literal(_, _), col @ DfExpr::Column(_)) => (col, lit, false),
249 _ => return Ok(Transformed::no(expr)),
250 };
251
252 let col_type = col_expr.get_type(self.schema.as_ref())?;
253 let DfExpr::Literal(scalar, _) = lit_expr else {
254 unreachable!()
255 };
256
257 if scalar.data_type() == col_type {
258 return Ok(Transformed::no(expr));
259 }
260
261 let lit_array = scalar.to_array()?;
262 let casted = compute::cast(lit_array.as_ref(), &col_type).map_err(|e| {
263 datafusion_common::DataFusionError::Internal(format!(
264 "Failed to cast literal {:?} to {:?}: {}",
265 scalar, col_type, e
266 ))
267 })?;
268 let casted_scalar = ScalarValue::try_from_array(&casted, 0)?;
269
270 let new_lit = DfExpr::Literal(casted_scalar, None);
271 let (new_left, new_right) = if col_left {
272 (left.clone(), Box::new(new_lit))
273 } else {
274 (Box::new(new_lit), right.clone())
275 };
276
277 Ok(Transformed::yes(DfExpr::BinaryExpr(BinaryExpr {
278 left: new_left,
279 op: *op,
280 right: new_right,
281 })))
282 }
283}
284
285fn convert_literal_types(
286 exprs: Vec<DfExpr>,
287 schema: &DFSchemaRef,
288) -> datafusion_common::Result<Vec<DfExpr>> {
289 use datafusion_common::tree_node::TreeNode;
290
291 let mut caster = LiteralTypeCaster {
292 schema: schema.clone(),
293 };
294 exprs
295 .into_iter()
296 .map(|e| e.rewrite(&mut caster).map(|x| x.data))
297 .collect()
298}
299
300fn resolve_filters(
301 scan_config: &ScanConfig,
302 metadata: &RegionMetadata,
303) -> error::Result<Vec<DfExpr>> {
304 let Some(filters) = &scan_config.filters else {
305 return Ok(Vec::new());
306 };
307
308 let df_schema = metadata
309 .schema
310 .arrow_schema()
311 .clone()
312 .to_dfschema()
313 .map_err(|e| {
314 error::IllegalConfigSnafu {
315 msg: format!("Failed to convert region schema to DataFusion schema: {e}"),
316 }
317 .build()
318 })?;
319
320 let state = SessionStateBuilder::new()
321 .with_config(Default::default())
322 .with_runtime_env(Default::default())
323 .with_default_features()
324 .build();
325
326 let exprs: Vec<DfExpr> = filters
327 .iter()
328 .enumerate()
329 .map(|(idx, filter)| {
330 let mut parser = SqlParser::new(&GenericDialect {})
331 .try_with_sql(filter)
332 .map_err(|e| {
333 error::IllegalConfigSnafu {
334 msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
335 }
336 .build()
337 })?;
338
339 let sql_expr = parser.parse_expr().map_err(|e| {
340 error::IllegalConfigSnafu {
341 msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
342 }
343 .build()
344 })?;
345
346 state
347 .create_logical_expr_from_sql_expr(
348 SqlExprWithAlias {
349 expr: sql_expr,
350 alias: None,
351 },
352 &df_schema,
353 )
354 .map_err(|e| {
355 error::IllegalConfigSnafu {
356 msg: format!(
357 "Failed to convert filter at index {idx} ('{filter}') to logical expr: {e}"
358 ),
359 }
360 .build()
361 })
362 })
363 .collect::<error::Result<Vec<_>>>()?;
364
365 let df_schema_ref = Arc::new(df_schema);
366 convert_literal_types(exprs, &df_schema_ref).map_err(|e| {
367 error::IllegalConfigSnafu {
368 msg: format!("Failed to convert filter expression types: {e}"),
369 }
370 .build()
371 })
372}
373
374fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef {
375 struct NoopPartitionExprFetcher;
376
377 #[async_trait::async_trait]
378 impl mito2::region::opener::PartitionExprFetcher for NoopPartitionExprFetcher {
379 async fn fetch_expr(&self, _region_id: RegionId) -> Option<String> {
380 None
381 }
382 }
383
384 Arc::new(NoopPartitionExprFetcher)
385}
386
387struct EngineComponents {
388 data_home: String,
389 mito_config: MitoConfig,
390 object_store_manager: Arc<ObjectStoreManager>,
391 schema_metadata_manager: Arc<SchemaMetadataManager>,
392 file_ref_manager: Arc<FileReferenceManager>,
393 partition_expr_fetcher: mito2::region::opener::PartitionExprFetcherRef,
394}
395
396impl EngineComponents {
397 async fn build<S: store_api::logstore::LogStore>(
398 self,
399 log_store: Arc<S>,
400 ) -> error::Result<MitoEngine> {
401 MitoEngine::new(
402 &self.data_home,
403 self.mito_config,
404 log_store,
405 self.object_store_manager,
406 self.schema_metadata_manager,
407 self.file_ref_manager,
408 self.partition_expr_fetcher,
409 Plugins::default(),
410 )
411 .await
412 .map_err(BoxedError::new)
413 .context(error::BuildCliSnafu)
414 }
415}
416
417fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
418 let kv_backend = Arc::new(MemoryKvBackend::new());
419 let table_schema_cache = Arc::new(new_table_schema_cache(
420 "table_schema_name_cache".to_string(),
421 CacheBuilder::default().build(),
422 kv_backend.clone(),
423 ));
424 let schema_cache = Arc::new(new_schema_cache(
425 "schema_cache".to_string(),
426 CacheBuilder::default().build(),
427 kv_backend.clone(),
428 ));
429 Arc::new(SchemaMetadataManager::new(table_schema_cache, schema_cache))
430}
431
432impl ScanbenchCommand {
433 pub async fn run(&self) -> error::Result<()> {
434 if self.verbose {
435 common_telemetry::init_default_ut_logging();
436 }
437
438 println!("{}", "Starting scanbench...".cyan().bold());
439
440 let region_id = parse_region_id(&self.region_id)?;
441 let path_type = parse_path_type(&self.path_type)?;
442 println!(
443 "{} Region ID: {} (u64: {})",
444 "✓".green(),
445 self.region_id,
446 region_id.as_u64()
447 );
448
449 let (store_cfg, mito_config, wal_config) = parse_config(&self.config)?;
451 println!("{} Config parsed", "✓".green());
452
453 let object_store = build_object_store(&store_cfg).await?;
454 println!("{} Object store initialized", "✓".green());
455
456 let object_store_manager =
457 Arc::new(ObjectStoreManager::new("default", object_store.clone()));
458
459 let schema_metadata_manager = mock_schema_metadata_manager();
461 let file_ref_manager = Arc::new(FileReferenceManager::new(None));
462 let partition_expr_fetcher = noop_partition_expr_fetcher();
463
464 let components = EngineComponents {
466 data_home: store_cfg.data_home.clone(),
467 mito_config,
468 object_store_manager,
469 schema_metadata_manager,
470 file_ref_manager,
471 partition_expr_fetcher,
472 };
473
474 let engine = match &wal_config {
475 DatanodeWalConfig::RaftEngine(raft_engine_config) if self.enable_wal => {
476 let data_home = normalize_dir(&store_cfg.data_home);
477 let wal_dir = match &raft_engine_config.dir {
478 Some(dir) => dir.clone(),
479 None => format!("{}{WAL_DIR}", data_home),
480 };
481 fs::create_dir_all(&wal_dir).await.map_err(|e| {
482 error::IllegalConfigSnafu {
483 msg: format!("failed to create WAL directory {}: {e}", wal_dir),
484 }
485 .build()
486 })?;
487 let log_store = Arc::new(
488 RaftEngineLogStore::try_new(wal_dir, raft_engine_config)
489 .await
490 .map_err(BoxedError::new)
491 .context(error::BuildCliSnafu)?,
492 );
493 println!("{} Using RaftEngine WAL", "✓".green());
494 components.build(log_store).await?
495 }
496 DatanodeWalConfig::Kafka(kafka_config) if self.enable_wal => {
497 let log_store = Arc::new(
498 KafkaLogStore::try_new(kafka_config, None)
499 .await
500 .map_err(BoxedError::new)
501 .context(error::BuildCliSnafu)?,
502 );
503 println!("{} Using Kafka WAL", "✓".green());
504 components.build(log_store).await?
505 }
506 _ => {
507 let log_store = Arc::new(NoopLogStore);
508 println!(
509 "{} Using NoopLogStore (enable_wal={})",
510 "✓".green(),
511 self.enable_wal
512 );
513 components.build(log_store).await?
514 }
515 };
516
517 let open_request = RegionOpenRequest {
519 engine: "mito".to_string(),
520 table_dir: self.table_dir.clone(),
521 path_type,
522 options: HashMap::default(),
523 skip_wal_replay: !self.enable_wal,
524 checkpoint: None,
525 };
526
527 engine
528 .handle_request(region_id, RegionRequest::Open(open_request))
529 .await
530 .map_err(BoxedError::new)
531 .context(error::BuildCliSnafu)?;
532 println!("{} Region opened", "✓".green());
533
534 let scan_config = if let Some(path) = &self.scan_config {
536 let content = tokio::fs::read_to_string(path)
537 .await
538 .context(error::FileIoSnafu)?;
539 serde_json::from_str::<ScanConfig>(&content).context(error::SerdeJsonSnafu)?
540 } else {
541 ScanConfig::default()
542 };
543 let metadata = engine
544 .get_metadata(region_id)
545 .await
546 .map_err(BoxedError::new)
547 .context(error::BuildCliSnafu)?;
548 let projection = resolve_projection(&scan_config, Some(&metadata))?;
549 let filters = resolve_filters(&scan_config, &metadata)?;
550
551 let distribution = match self.scanner.as_str() {
553 "seq" => None,
554 "unordered" => Some(TimeSeriesDistribution::TimeWindowed),
555 "series" => Some(TimeSeriesDistribution::PerSeries),
556 other => {
557 return Err(error::IllegalConfigSnafu {
558 msg: format!(
559 "Unknown scanner type '{}', expected: seq, unordered, series",
560 other
561 ),
562 }
563 .build());
564 }
565 };
566
567 let series_row_selector = match scan_config.series_row_selector.as_deref() {
568 Some("last_row") => Some(TimeSeriesRowSelector::LastRow),
569 Some(other) => {
570 return Err(error::IllegalConfigSnafu {
571 msg: format!("Unknown series_row_selector '{}'", other),
572 }
573 .build());
574 }
575 None => None,
576 };
577
578 println!(
579 "{} Scanner: {}, Parallelism: {}, Iterations: {}",
580 "ℹ".blue(),
581 self.scanner,
582 self.parallelism,
583 self.iterations,
584 );
585
586 #[cfg(unix)]
588 let mut profiler_guard = if self.pprof_file.is_some() && !self.pprof_after_warmup {
589 println!("{} Starting profiling...", "⚡".yellow());
590 Some(
591 pprof::ProfilerGuardBuilder::default()
592 .frequency(99)
593 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
594 .build()
595 .map_err(|e| {
596 BoxedError::new(PlainError::new(
597 format!("Failed to start profiler: {e}"),
598 StatusCode::Unexpected,
599 ))
600 })
601 .context(error::BuildCliSnafu)?,
602 )
603 } else {
604 None
605 };
606
607 #[cfg(not(unix))]
608 if self.pprof_file.is_some() {
609 eprintln!(
610 "{}: Profiling is not supported on this platform",
611 "Warning".yellow()
612 );
613 }
614
615 let mut total_rows_all = 0u64;
616 let mut total_elapsed_all = std::time::Duration::ZERO;
617
618 for iteration in 0..self.iterations {
619 let request = ScanRequest {
620 projection: projection.clone(),
621 filters: filters.clone(),
622 series_row_selector,
623 distribution,
624 ..Default::default()
625 };
626
627 let start = Instant::now();
628
629 let mut scanner = engine
631 .handle_query(region_id, request)
632 .await
633 .map_err(BoxedError::new)
634 .context(error::BuildCliSnafu)?;
635
636 let original_partitions = scanner.properties().partitions.clone();
638 let total_ranges: usize = original_partitions.iter().map(|p| p.len()).sum();
639
640 if self.verbose {
641 println!(
642 " {} Original partitions: {}, total ranges: {}",
643 "ℹ".blue(),
644 original_partitions.len(),
645 total_ranges
646 );
647 }
648
649 if self.parallelism > 1 {
650 let all_ranges: Vec<_> = original_partitions.into_iter().flatten().collect();
652
653 let mut partitions =
655 ParallelizeScan::assign_partition_range(all_ranges, self.parallelism);
656
657 for partition in &mut partitions {
659 partition.sort_by_key(|a| a.start);
660 }
661
662 scanner
663 .prepare(
664 PrepareRequest::default()
665 .with_ranges(partitions)
666 .with_target_partitions(self.parallelism),
667 )
668 .map_err(BoxedError::new)
669 .context(error::BuildCliSnafu)?;
670 }
671
672 let num_partitions = scanner.properties().partitions.len();
674 let ctx = QueryScanContext {
675 explain_verbose: self.verbose,
676 };
677 let metrics_set = ExecutionPlanMetricsSet::new();
678
679 let mut scan_futures = FuturesUnordered::new();
680
681 for partition_idx in 0..num_partitions {
682 let mut stream = scanner
683 .scan_partition(&ctx, &metrics_set, partition_idx)
684 .map_err(BoxedError::new)
685 .context(error::BuildCliSnafu)?;
686
687 scan_futures.push(tokio::spawn(async move {
688 let mut rows = 0u64;
689 let mut array_mem_size = 0u64;
690 let mut estimated_size = 0u64;
691 while let Some(batch_result) = stream.next().await {
692 match batch_result {
693 Ok(batch) => {
694 rows += batch.num_rows() as u64;
695 let df_batch = batch.df_record_batch();
696 array_mem_size += df_batch.get_array_memory_size() as u64;
697 estimated_size +=
698 mito2::memtable::record_batch_estimated_size(df_batch) as u64;
699 }
700 Err(e) => {
701 return Err(BoxedError::new(e));
702 }
703 }
704 }
705 Ok::<(u64, u64, u64), BoxedError>((rows, array_mem_size, estimated_size))
706 }));
707 }
708
709 let mut total_rows = 0u64;
710 let mut total_array_mem_size = 0u64;
711 let mut total_estimated_size = 0u64;
712 while let Some(task) = scan_futures.next().await {
713 let result = task
714 .map_err(|e| {
715 BoxedError::new(PlainError::new(
716 format!("scan task failed: {e}"),
717 StatusCode::Unexpected,
718 ))
719 })
720 .context(error::BuildCliSnafu)?;
721 let (rows, array_mem_size, estimated_size) =
722 result.context(error::BuildCliSnafu)?;
723 total_rows += rows;
724 total_array_mem_size += array_mem_size;
725 total_estimated_size += estimated_size;
726 }
727
728 let elapsed = start.elapsed();
729 total_rows_all += total_rows;
730 total_elapsed_all += elapsed;
731
732 println!(
733 " [iter {}] {} rows in {:?} ({} partitions), array_mem_size: {}, estimated_size: {}",
734 iteration + 1,
735 total_rows.to_string().cyan(),
736 elapsed,
737 num_partitions,
738 format_bytes(total_array_mem_size),
739 format_bytes(total_estimated_size),
740 );
741
742 #[cfg(unix)]
744 if iteration == 0
745 && self.pprof_after_warmup
746 && self.pprof_file.is_some()
747 && profiler_guard.is_none()
748 {
749 println!(
750 "{} Starting profiling after warmup iteration...",
751 "⚡".yellow()
752 );
753 profiler_guard = Some(
754 pprof::ProfilerGuardBuilder::default()
755 .frequency(99)
756 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
757 .build()
758 .map_err(|e| {
759 BoxedError::new(PlainError::new(
760 format!("Failed to start profiler: {e}"),
761 StatusCode::Unexpected,
762 ))
763 })
764 .context(error::BuildCliSnafu)?,
765 );
766 }
767 }
768
769 #[cfg(unix)]
771 if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
772 println!("{} Generating flamegraph...", "🔥".yellow());
773 match guard.report().build() {
774 Ok(report) => {
775 let mut flamegraph_data = Vec::new();
776 if let Err(e) = report.flamegraph(&mut flamegraph_data) {
777 println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
778 } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
779 println!(
780 "{}: Failed to write flamegraph to {}: {}",
781 "Error".red(),
782 pprof_file.display(),
783 e
784 );
785 } else {
786 println!(
787 "{} Flamegraph saved to {}",
788 "✓".green(),
789 pprof_file.display().to_string().cyan()
790 );
791 }
792 }
793 Err(e) => {
794 println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
795 }
796 }
797 }
798
799 if self.iterations > 1 {
801 let avg_elapsed = total_elapsed_all / self.iterations as u32;
802 let avg_rows = total_rows_all / self.iterations as u64;
803 println!(
804 "\n{} Average: {} rows in {:?} over {} iterations",
805 "Summary".green().bold(),
806 avg_rows.to_string().cyan(),
807 avg_elapsed,
808 self.iterations,
809 );
810 }
811
812 println!("\n{}", "Benchmark completed!".green().bold());
813 Ok(())
814 }
815}
816
817#[cfg(test)]
818mod tests {
819 use datatypes::prelude::ConcreteDataType;
820 use datatypes::schema::ColumnSchema;
821 use sqlparser::ast::{BinaryOperator, Expr};
822 use sqlparser::dialect::GenericDialect;
823 use sqlparser::parser::Parser;
824 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
825 use store_api::storage::RegionId;
826
827 use super::{ScanConfig, resolve_filters, resolve_projection};
828 use crate::error;
829
830 #[test]
831 fn test_parse_scan_config_projection_names() {
832 let json = r#"{"projection_names":["host","ts"]}"#;
833 let config: ScanConfig = serde_json::from_str(json).unwrap();
834
835 assert_eq!(
836 config.projection_names,
837 Some(vec!["host".to_string(), "ts".to_string()])
838 );
839 assert_eq!(config.projection, None);
840 }
841
842 #[test]
843 fn test_resolve_projection_by_indexes() -> error::Result<()> {
844 let config = ScanConfig {
845 projection: Some(vec![0, 2]),
846 projection_names: None,
847 filters: None,
848 series_row_selector: None,
849 };
850
851 let projection = resolve_projection(&config, None)?;
852 assert_eq!(projection, Some(vec![0, 2]));
853 Ok(())
854 }
855
856 #[test]
857 fn test_resolve_projection_by_names_without_metadata() {
858 let config = ScanConfig {
859 projection: None,
860 projection_names: Some(vec!["cpu".to_string(), "host".to_string()]),
861 filters: None,
862 series_row_selector: None,
863 };
864
865 let err = resolve_projection(&config, None).unwrap_err();
866 assert!(
867 err.to_string()
868 .contains("Missing region metadata while resolving 'projection_names'")
869 );
870 }
871
872 #[test]
873 fn test_resolve_projection_conflict_fields() {
874 let config = ScanConfig {
875 projection: Some(vec![0]),
876 projection_names: Some(vec!["host".to_string()]),
877 filters: None,
878 series_row_selector: None,
879 };
880
881 let err = resolve_projection(&config, None).unwrap_err();
882 let msg = err.to_string();
883 assert!(msg.contains("projection"));
884 assert!(msg.contains("projection_names"));
885 }
886
887 #[test]
888 fn test_sqlparser_parse_expr_string() {
889 let dialect = GenericDialect {};
890 let mut parser = Parser::new(&dialect)
891 .try_with_sql("host = 'web-1' AND cpu > 80")
892 .unwrap();
893
894 let expr = parser.parse_expr().unwrap();
895
896 match expr {
897 Expr::BinaryOp { op, .. } => assert_eq!(op, BinaryOperator::And),
898 other => panic!("expected BinaryOp, got: {other:?}"),
899 }
900 }
901
902 #[test]
903 fn test_resolve_filters_uint32_type_conversion() {
904 use api::v1::SemanticType;
905
906 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 0));
907 builder
908 .push_column_metadata(ColumnMetadata {
909 column_schema: ColumnSchema::new(
910 "table_id",
911 ConcreteDataType::uint32_datatype(),
912 false,
913 ),
914 semantic_type: SemanticType::Tag,
915 column_id: 1,
916 })
917 .push_column_metadata(ColumnMetadata {
918 column_schema: ColumnSchema::new(
919 "ts",
920 ConcreteDataType::timestamp_millisecond_datatype(),
921 false,
922 ),
923 semantic_type: SemanticType::Timestamp,
924 column_id: 2,
925 })
926 .primary_key(vec![1]);
927 let metadata = builder.build().unwrap();
928
929 let config = ScanConfig {
930 projection: None,
931 projection_names: None,
932 filters: Some(vec!["table_id = 1117".to_string()]),
933 series_row_selector: None,
934 };
935
936 let exprs = resolve_filters(&config, &metadata).unwrap();
937 assert_eq!(exprs.len(), 1);
938 let expr_str = format!("{}", exprs[0]);
940 assert!(
941 expr_str.contains("UInt32(1117)"),
942 "Expected UInt32(1117) in expression, got: {expr_str}"
943 );
944 }
945
946 #[test]
947 fn test_parse_scan_config_filters() {
948 let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#;
949 let config: ScanConfig = serde_json::from_str(json).unwrap();
950
951 assert_eq!(
952 config.filters,
953 Some(vec!["host = 'web-1'".to_string(), "cpu > 80".to_string()])
954 );
955 }
956}