1use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Instant;
19
20use clap::Parser;
21use colored::Colorize;
22use common_base::Plugins;
23use common_error::ext::{BoxedError, PlainError};
24use common_error::status_code::StatusCode;
25use common_meta::cache::{new_schema_cache, new_table_schema_cache};
26use common_meta::key::SchemaMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use common_wal::config::DatanodeWalConfig;
29use datafusion::execution::SessionStateBuilder;
30use datafusion::logical_expr::Expr as DfExpr;
31use datafusion_common::ToDFSchema;
32use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
33use futures::StreamExt;
34use futures::stream::FuturesUnordered;
35use log_store::kafka::log_store::KafkaLogStore;
36use log_store::noop::log_store::NoopLogStore;
37use log_store::raft_engine::log_store::RaftEngineLogStore;
38use mito2::config::MitoConfig;
39use mito2::engine::MitoEngine;
40use mito2::sst::file_ref::FileReferenceManager;
41use moka::future::CacheBuilder;
42use object_store::manager::ObjectStoreManager;
43use object_store::util::normalize_dir;
44use query::optimizer::parallelize_scan::ParallelizeScan;
45use serde::Deserialize;
46use snafu::{OptionExt, ResultExt};
47use sqlparser::ast::ExprWithAlias as SqlExprWithAlias;
48use sqlparser::dialect::GenericDialect;
49use sqlparser::parser::Parser as SqlParser;
50use store_api::metadata::RegionMetadata;
51use store_api::path_utils::WAL_DIR;
52use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine};
53use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
54use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
55use tokio::fs;
56
57use crate::datanode::objbench::{build_object_store, parse_config};
58use crate::error;
59
60#[derive(Debug, Parser)]
62pub struct ScanbenchCommand {
63 #[clap(long, value_name = "FILE")]
65 config: PathBuf,
66
67 #[clap(long)]
69 region_id: String,
70
71 #[clap(long)]
73 table_dir: String,
74
75 #[clap(long, default_value = "seq")]
77 scanner: String,
78
79 #[clap(long, value_name = "FILE")]
81 scan_config: Option<PathBuf>,
82
83 #[clap(long, default_value = "1")]
85 parallelism: usize,
86
87 #[clap(long, default_value = "1")]
89 iterations: usize,
90
91 #[clap(long, default_value = "bare")]
93 path_type: String,
94
95 #[clap(short, long, default_value_t = false)]
97 verbose: bool,
98
99 #[clap(long, value_name = "FILE")]
101 pprof_file: Option<PathBuf>,
102
103 #[clap(long, default_value_t = false)]
105 force_flat_format: bool,
106
107 #[clap(long, default_value_t = false)]
109 enable_wal: bool,
110}
111
112#[derive(Debug, Deserialize, Default)]
114struct ScanConfig {
115 projection: Option<Vec<usize>>,
116 projection_names: Option<Vec<String>>,
117 filters: Option<Vec<String>>,
118 series_row_selector: Option<String>,
119}
120
121fn resolve_projection(
122 scan_config: &ScanConfig,
123 metadata: Option<&RegionMetadata>,
124) -> error::Result<Option<Vec<usize>>> {
125 if scan_config.projection.is_some() && scan_config.projection_names.is_some() {
126 return Err(error::IllegalConfigSnafu {
127 msg: "scan config cannot contain both 'projection' and 'projection_names'".to_string(),
128 }
129 .build());
130 }
131
132 if let Some(projection) = &scan_config.projection {
133 return Ok(Some(projection.clone()));
134 }
135
136 if let Some(projection_names) = &scan_config.projection_names {
137 let metadata = metadata.context(error::IllegalConfigSnafu {
138 msg: "Missing region metadata while resolving 'projection_names'".to_string(),
139 })?;
140 let available_columns = metadata
141 .column_metadatas
142 .iter()
143 .map(|column| column.column_schema.name.as_str())
144 .collect::<Vec<_>>()
145 .join(", ");
146 let projection = projection_names
147 .iter()
148 .map(|name| {
149 metadata
150 .column_index_by_name(name)
151 .with_context(|| error::IllegalConfigSnafu {
152 msg: format!(
153 "Unknown column '{}' in projection_names, available columns: [{}]",
154 name, available_columns
155 ),
156 })
157 })
158 .collect::<error::Result<Vec<_>>>()?;
159 return Ok(Some(projection));
160 }
161
162 Ok(None)
163}
164
165fn parse_region_id(s: &str) -> error::Result<RegionId> {
166 if s.contains(':') {
167 let parts: Vec<&str> = s.splitn(2, ':').collect();
168 let table_id: u32 = parts[0].parse().map_err(|e| {
169 error::IllegalConfigSnafu {
170 msg: format!("invalid table_id in region_id '{}': {}", s, e),
171 }
172 .build()
173 })?;
174 let region_num: u32 = parts[1].parse().map_err(|e| {
175 error::IllegalConfigSnafu {
176 msg: format!("invalid region_num in region_id '{}': {}", s, e),
177 }
178 .build()
179 })?;
180 Ok(RegionId::new(table_id, region_num))
181 } else {
182 let id: u64 = s.parse().map_err(|e| {
183 error::IllegalConfigSnafu {
184 msg: format!("invalid region_id '{}': {}", s, e),
185 }
186 .build()
187 })?;
188 Ok(RegionId::from_u64(id))
189 }
190}
191
192fn parse_path_type(s: &str) -> error::Result<PathType> {
193 match s.to_lowercase().as_str() {
194 "bare" => Ok(PathType::Bare),
195 "data" => Ok(PathType::Data),
196 "metadata" => Ok(PathType::Metadata),
197 _ => Err(error::IllegalConfigSnafu {
198 msg: format!("invalid path_type '{}', expected: bare, data, metadata", s),
199 }
200 .build()),
201 }
202}
203
204fn resolve_filters(
205 scan_config: &ScanConfig,
206 metadata: &RegionMetadata,
207) -> error::Result<Vec<DfExpr>> {
208 let Some(filters) = &scan_config.filters else {
209 return Ok(Vec::new());
210 };
211
212 let df_schema = metadata
213 .schema
214 .arrow_schema()
215 .clone()
216 .to_dfschema()
217 .map_err(|e| {
218 error::IllegalConfigSnafu {
219 msg: format!("Failed to convert region schema to DataFusion schema: {e}"),
220 }
221 .build()
222 })?;
223
224 let state = SessionStateBuilder::new()
225 .with_config(Default::default())
226 .with_runtime_env(Default::default())
227 .with_default_features()
228 .build();
229
230 filters
231 .iter()
232 .enumerate()
233 .map(|(idx, filter)| {
234 let mut parser = SqlParser::new(&GenericDialect {})
235 .try_with_sql(filter)
236 .map_err(|e| {
237 error::IllegalConfigSnafu {
238 msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
239 }
240 .build()
241 })?;
242
243 let sql_expr = parser.parse_expr().map_err(|e| {
244 error::IllegalConfigSnafu {
245 msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
246 }
247 .build()
248 })?;
249
250 state
251 .create_logical_expr_from_sql_expr(
252 SqlExprWithAlias {
253 expr: sql_expr,
254 alias: None,
255 },
256 &df_schema,
257 )
258 .map_err(|e| {
259 error::IllegalConfigSnafu {
260 msg: format!(
261 "Failed to convert filter at index {idx} ('{filter}') to logical expr: {e}"
262 ),
263 }
264 .build()
265 })
266 })
267 .collect()
268}
269
270fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef {
271 struct NoopPartitionExprFetcher;
272
273 #[async_trait::async_trait]
274 impl mito2::region::opener::PartitionExprFetcher for NoopPartitionExprFetcher {
275 async fn fetch_expr(&self, _region_id: RegionId) -> Option<String> {
276 None
277 }
278 }
279
280 Arc::new(NoopPartitionExprFetcher)
281}
282
283struct EngineComponents {
284 data_home: String,
285 mito_config: MitoConfig,
286 object_store_manager: Arc<ObjectStoreManager>,
287 schema_metadata_manager: Arc<SchemaMetadataManager>,
288 file_ref_manager: Arc<FileReferenceManager>,
289 partition_expr_fetcher: mito2::region::opener::PartitionExprFetcherRef,
290}
291
292impl EngineComponents {
293 async fn build<S: store_api::logstore::LogStore>(
294 self,
295 log_store: Arc<S>,
296 ) -> error::Result<MitoEngine> {
297 MitoEngine::new(
298 &self.data_home,
299 self.mito_config,
300 log_store,
301 self.object_store_manager,
302 self.schema_metadata_manager,
303 self.file_ref_manager,
304 self.partition_expr_fetcher,
305 Plugins::default(),
306 )
307 .await
308 .map_err(BoxedError::new)
309 .context(error::BuildCliSnafu)
310 }
311}
312
313fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
314 let kv_backend = Arc::new(MemoryKvBackend::new());
315 let table_schema_cache = Arc::new(new_table_schema_cache(
316 "table_schema_name_cache".to_string(),
317 CacheBuilder::default().build(),
318 kv_backend.clone(),
319 ));
320 let schema_cache = Arc::new(new_schema_cache(
321 "schema_cache".to_string(),
322 CacheBuilder::default().build(),
323 kv_backend.clone(),
324 ));
325 Arc::new(SchemaMetadataManager::new(table_schema_cache, schema_cache))
326}
327
328impl ScanbenchCommand {
329 pub async fn run(&self) -> error::Result<()> {
330 if self.verbose {
331 common_telemetry::init_default_ut_logging();
332 }
333
334 println!("{}", "Starting scanbench...".cyan().bold());
335
336 let region_id = parse_region_id(&self.region_id)?;
337 let path_type = parse_path_type(&self.path_type)?;
338 println!(
339 "{} Region ID: {} (u64: {})",
340 "✓".green(),
341 self.region_id,
342 region_id.as_u64()
343 );
344
345 let (store_cfg, mito_config, wal_config) = parse_config(&self.config)?;
347 println!("{} Config parsed", "✓".green());
348
349 let object_store = build_object_store(&store_cfg).await?;
350 println!("{} Object store initialized", "✓".green());
351
352 let object_store_manager =
353 Arc::new(ObjectStoreManager::new("default", object_store.clone()));
354
355 let schema_metadata_manager = mock_schema_metadata_manager();
357 let file_ref_manager = Arc::new(FileReferenceManager::new(None));
358 let partition_expr_fetcher = noop_partition_expr_fetcher();
359
360 let components = EngineComponents {
362 data_home: store_cfg.data_home.clone(),
363 mito_config,
364 object_store_manager,
365 schema_metadata_manager,
366 file_ref_manager,
367 partition_expr_fetcher,
368 };
369
370 let engine = match &wal_config {
371 DatanodeWalConfig::RaftEngine(raft_engine_config) if self.enable_wal => {
372 let data_home = normalize_dir(&store_cfg.data_home);
373 let wal_dir = match &raft_engine_config.dir {
374 Some(dir) => dir.clone(),
375 None => format!("{}{WAL_DIR}", data_home),
376 };
377 fs::create_dir_all(&wal_dir).await.map_err(|e| {
378 error::IllegalConfigSnafu {
379 msg: format!("failed to create WAL directory {}: {e}", wal_dir),
380 }
381 .build()
382 })?;
383 let log_store = Arc::new(
384 RaftEngineLogStore::try_new(wal_dir, raft_engine_config)
385 .await
386 .map_err(BoxedError::new)
387 .context(error::BuildCliSnafu)?,
388 );
389 println!("{} Using RaftEngine WAL", "✓".green());
390 components.build(log_store).await?
391 }
392 DatanodeWalConfig::Kafka(kafka_config) if self.enable_wal => {
393 let log_store = Arc::new(
394 KafkaLogStore::try_new(kafka_config, None)
395 .await
396 .map_err(BoxedError::new)
397 .context(error::BuildCliSnafu)?,
398 );
399 println!("{} Using Kafka WAL", "✓".green());
400 components.build(log_store).await?
401 }
402 _ => {
403 let log_store = Arc::new(NoopLogStore);
404 println!(
405 "{} Using NoopLogStore (enable_wal={})",
406 "✓".green(),
407 self.enable_wal
408 );
409 components.build(log_store).await?
410 }
411 };
412
413 let open_request = RegionOpenRequest {
415 engine: "mito".to_string(),
416 table_dir: self.table_dir.clone(),
417 path_type,
418 options: HashMap::default(),
419 skip_wal_replay: !self.enable_wal,
420 checkpoint: None,
421 };
422
423 engine
424 .handle_request(region_id, RegionRequest::Open(open_request))
425 .await
426 .map_err(BoxedError::new)
427 .context(error::BuildCliSnafu)?;
428 println!("{} Region opened", "✓".green());
429
430 let scan_config = if let Some(path) = &self.scan_config {
432 let content = tokio::fs::read_to_string(path)
433 .await
434 .context(error::FileIoSnafu)?;
435 serde_json::from_str::<ScanConfig>(&content).context(error::SerdeJsonSnafu)?
436 } else {
437 ScanConfig::default()
438 };
439 let metadata = engine
440 .get_metadata(region_id)
441 .await
442 .map_err(BoxedError::new)
443 .context(error::BuildCliSnafu)?;
444 let projection = resolve_projection(&scan_config, Some(&metadata))?;
445 let filters = resolve_filters(&scan_config, &metadata)?;
446
447 let distribution = match self.scanner.as_str() {
449 "seq" => None,
450 "unordered" => Some(TimeSeriesDistribution::TimeWindowed),
451 "series" => Some(TimeSeriesDistribution::PerSeries),
452 other => {
453 return Err(error::IllegalConfigSnafu {
454 msg: format!(
455 "Unknown scanner type '{}', expected: seq, unordered, series",
456 other
457 ),
458 }
459 .build());
460 }
461 };
462
463 let series_row_selector = match scan_config.series_row_selector.as_deref() {
464 Some("last_row") => Some(TimeSeriesRowSelector::LastRow),
465 Some(other) => {
466 return Err(error::IllegalConfigSnafu {
467 msg: format!("Unknown series_row_selector '{}'", other),
468 }
469 .build());
470 }
471 None => None,
472 };
473
474 println!(
475 "{} Scanner: {}, Parallelism: {}, Iterations: {}, Force flat format: {}",
476 "ℹ".blue(),
477 self.scanner,
478 self.parallelism,
479 self.iterations,
480 self.force_flat_format,
481 );
482
483 #[cfg(unix)]
485 let profiler_guard = if self.pprof_file.is_some() {
486 println!("{} Starting profiling...", "⚡".yellow());
487 Some(
488 pprof::ProfilerGuardBuilder::default()
489 .frequency(99)
490 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
491 .build()
492 .map_err(|e| {
493 BoxedError::new(PlainError::new(
494 format!("Failed to start profiler: {e}"),
495 StatusCode::Unexpected,
496 ))
497 })
498 .context(error::BuildCliSnafu)?,
499 )
500 } else {
501 None
502 };
503
504 #[cfg(not(unix))]
505 if self.pprof_file.is_some() {
506 eprintln!(
507 "{}: Profiling is not supported on this platform",
508 "Warning".yellow()
509 );
510 }
511
512 let mut total_rows_all = 0u64;
513 let mut total_elapsed_all = std::time::Duration::ZERO;
514
515 for iteration in 0..self.iterations {
516 let request = ScanRequest {
517 projection: projection.clone(),
518 filters: filters.clone(),
519 series_row_selector,
520 distribution,
521 force_flat_format: self.force_flat_format,
522 ..Default::default()
523 };
524
525 let start = Instant::now();
526
527 let mut scanner = engine
529 .handle_query(region_id, request)
530 .await
531 .map_err(BoxedError::new)
532 .context(error::BuildCliSnafu)?;
533
534 let original_partitions = scanner.properties().partitions.clone();
536 let total_ranges: usize = original_partitions.iter().map(|p| p.len()).sum();
537
538 if self.verbose {
539 println!(
540 " {} Original partitions: {}, total ranges: {}",
541 "ℹ".blue(),
542 original_partitions.len(),
543 total_ranges
544 );
545 }
546
547 if self.parallelism > 1 {
548 let all_ranges: Vec<_> = original_partitions.into_iter().flatten().collect();
550
551 let mut partitions =
553 ParallelizeScan::assign_partition_range(all_ranges, self.parallelism);
554
555 for partition in &mut partitions {
557 partition.sort_by(|a, b| a.start.cmp(&b.start));
558 }
559
560 scanner
561 .prepare(
562 PrepareRequest::default()
563 .with_ranges(partitions)
564 .with_target_partitions(self.parallelism),
565 )
566 .map_err(BoxedError::new)
567 .context(error::BuildCliSnafu)?;
568 }
569
570 let num_partitions = scanner.properties().partitions.len();
572 let ctx = QueryScanContext::default();
573 let metrics_set = ExecutionPlanMetricsSet::new();
574
575 let mut scan_futures = FuturesUnordered::new();
576
577 for partition_idx in 0..num_partitions {
578 let mut stream = scanner
579 .scan_partition(&ctx, &metrics_set, partition_idx)
580 .map_err(BoxedError::new)
581 .context(error::BuildCliSnafu)?;
582
583 scan_futures.push(tokio::spawn(async move {
584 let mut rows = 0u64;
585 while let Some(batch_result) = stream.next().await {
586 match batch_result {
587 Ok(batch) => {
588 rows += batch.num_rows() as u64;
589 }
590 Err(e) => {
591 return Err(BoxedError::new(e));
592 }
593 }
594 }
595 Ok::<u64, BoxedError>(rows)
596 }));
597 }
598
599 let mut total_rows = 0u64;
600 while let Some(task) = scan_futures.next().await {
601 let result = task
602 .map_err(|e| {
603 BoxedError::new(PlainError::new(
604 format!("scan task failed: {e}"),
605 StatusCode::Unexpected,
606 ))
607 })
608 .context(error::BuildCliSnafu)?;
609 let rows = result.context(error::BuildCliSnafu)?;
610 total_rows += rows;
611 }
612
613 let elapsed = start.elapsed();
614 total_rows_all += total_rows;
615 total_elapsed_all += elapsed;
616
617 println!(
618 " [iter {}] {} rows in {:?} ({} partitions)",
619 iteration + 1,
620 total_rows.to_string().cyan(),
621 elapsed,
622 num_partitions,
623 );
624 }
625
626 #[cfg(unix)]
628 if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
629 println!("{} Generating flamegraph...", "🔥".yellow());
630 match guard.report().build() {
631 Ok(report) => {
632 let mut flamegraph_data = Vec::new();
633 if let Err(e) = report.flamegraph(&mut flamegraph_data) {
634 println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
635 } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
636 println!(
637 "{}: Failed to write flamegraph to {}: {}",
638 "Error".red(),
639 pprof_file.display(),
640 e
641 );
642 } else {
643 println!(
644 "{} Flamegraph saved to {}",
645 "✓".green(),
646 pprof_file.display().to_string().cyan()
647 );
648 }
649 }
650 Err(e) => {
651 println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
652 }
653 }
654 }
655
656 if self.iterations > 1 {
658 let avg_elapsed = total_elapsed_all / self.iterations as u32;
659 let avg_rows = total_rows_all / self.iterations as u64;
660 println!(
661 "\n{} Average: {} rows in {:?} over {} iterations",
662 "Summary".green().bold(),
663 avg_rows.to_string().cyan(),
664 avg_elapsed,
665 self.iterations,
666 );
667 }
668
669 println!("\n{}", "Benchmark completed!".green().bold());
670 Ok(())
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use sqlparser::ast::{BinaryOperator, Expr};
677 use sqlparser::dialect::GenericDialect;
678 use sqlparser::parser::Parser;
679
680 use super::{ScanConfig, resolve_projection};
681 use crate::error;
682
683 #[test]
684 fn test_parse_scan_config_projection_names() {
685 let json = r#"{"projection_names":["host","ts"]}"#;
686 let config: ScanConfig = serde_json::from_str(json).unwrap();
687
688 assert_eq!(
689 config.projection_names,
690 Some(vec!["host".to_string(), "ts".to_string()])
691 );
692 assert_eq!(config.projection, None);
693 }
694
695 #[test]
696 fn test_resolve_projection_by_indexes() -> error::Result<()> {
697 let config = ScanConfig {
698 projection: Some(vec![0, 2]),
699 projection_names: None,
700 filters: None,
701 series_row_selector: None,
702 };
703
704 let projection = resolve_projection(&config, None)?;
705 assert_eq!(projection, Some(vec![0, 2]));
706 Ok(())
707 }
708
709 #[test]
710 fn test_resolve_projection_by_names_without_metadata() {
711 let config = ScanConfig {
712 projection: None,
713 projection_names: Some(vec!["cpu".to_string(), "host".to_string()]),
714 filters: None,
715 series_row_selector: None,
716 };
717
718 let err = resolve_projection(&config, None).unwrap_err();
719 assert!(
720 err.to_string()
721 .contains("Missing region metadata while resolving 'projection_names'")
722 );
723 }
724
725 #[test]
726 fn test_resolve_projection_conflict_fields() {
727 let config = ScanConfig {
728 projection: Some(vec![0]),
729 projection_names: Some(vec!["host".to_string()]),
730 filters: None,
731 series_row_selector: None,
732 };
733
734 let err = resolve_projection(&config, None).unwrap_err();
735 let msg = err.to_string();
736 assert!(msg.contains("projection"));
737 assert!(msg.contains("projection_names"));
738 }
739
740 #[test]
741 fn test_sqlparser_parse_expr_string() {
742 let dialect = GenericDialect {};
743 let mut parser = Parser::new(&dialect)
744 .try_with_sql("host = 'web-1' AND cpu > 80")
745 .unwrap();
746
747 let expr = parser.parse_expr().unwrap();
748
749 match expr {
750 Expr::BinaryOp { op, .. } => assert_eq!(op, BinaryOperator::And),
751 other => panic!("expected BinaryOp, got: {other:?}"),
752 }
753 }
754
755 #[test]
756 fn test_parse_scan_config_filters() {
757 let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#;
758 let config: ScanConfig = serde_json::from_str(json).unwrap();
759
760 assert_eq!(
761 config.filters,
762 Some(vec!["host = 'web-1'".to_string(), "cpu > 80".to_string()])
763 );
764 }
765}