cmd/datanode/
scanbench.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Instant;
19
20use clap::Parser;
21use colored::Colorize;
22use common_base::Plugins;
23use common_error::ext::{BoxedError, PlainError};
24use common_error::status_code::StatusCode;
25use common_meta::cache::{new_schema_cache, new_table_schema_cache};
26use common_meta::key::SchemaMetadataManager;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use common_wal::config::DatanodeWalConfig;
29use datafusion::execution::SessionStateBuilder;
30use datafusion::logical_expr::Expr as DfExpr;
31use datafusion_common::ToDFSchema;
32use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
33use futures::StreamExt;
34use futures::stream::FuturesUnordered;
35use log_store::kafka::log_store::KafkaLogStore;
36use log_store::noop::log_store::NoopLogStore;
37use log_store::raft_engine::log_store::RaftEngineLogStore;
38use mito2::config::MitoConfig;
39use mito2::engine::MitoEngine;
40use mito2::sst::file_ref::FileReferenceManager;
41use moka::future::CacheBuilder;
42use object_store::manager::ObjectStoreManager;
43use object_store::util::normalize_dir;
44use query::optimizer::parallelize_scan::ParallelizeScan;
45use serde::Deserialize;
46use snafu::{OptionExt, ResultExt};
47use sqlparser::ast::ExprWithAlias as SqlExprWithAlias;
48use sqlparser::dialect::GenericDialect;
49use sqlparser::parser::Parser as SqlParser;
50use store_api::metadata::RegionMetadata;
51use store_api::path_utils::WAL_DIR;
52use store_api::region_engine::{PrepareRequest, QueryScanContext, RegionEngine};
53use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest};
54use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
55use tokio::fs;
56
57use crate::datanode::objbench::{build_object_store, parse_config};
58use crate::error;
59
60/// Scan benchmark command - benchmarks scanning a region directly from storage.
61#[derive(Debug, Parser)]
62pub struct ScanbenchCommand {
63    /// Path to config TOML file (same format as standalone/datanode config)
64    #[clap(long, value_name = "FILE")]
65    config: PathBuf,
66
67    /// Region ID: either numeric u64 (e.g. "4398046511104") or "table_id:region_num" (e.g. "1024:0")
68    #[clap(long)]
69    region_id: String,
70
71    /// Table directory relative to data home (e.g. "data/greptime/public/1024/")
72    #[clap(long)]
73    table_dir: String,
74
75    /// Scanner type: seq, unordered, series
76    #[clap(long, default_value = "seq")]
77    scanner: String,
78
79    /// Path to scan request JSON config file (optional)
80    #[clap(long, value_name = "FILE")]
81    scan_config: Option<PathBuf>,
82
83    /// Number of partitions for parallel scan (simulates parallelism)
84    #[clap(long, default_value = "1")]
85    parallelism: usize,
86
87    /// Number of iterations for benchmarking
88    #[clap(long, default_value = "1")]
89    iterations: usize,
90
91    /// Path type for the region: bare, data, metadata
92    #[clap(long, default_value = "bare")]
93    path_type: String,
94
95    /// Verbose output
96    #[clap(short, long, default_value_t = false)]
97    verbose: bool,
98
99    /// Output pprof flamegraph
100    #[clap(long, value_name = "FILE")]
101    pprof_file: Option<PathBuf>,
102
103    /// Force reading the region in flat format.
104    #[clap(long, default_value_t = false)]
105    force_flat_format: bool,
106
107    /// Enable WAL replay when opening the region.
108    #[clap(long, default_value_t = false)]
109    enable_wal: bool,
110}
111
112/// JSON config for scan request parameters.
113#[derive(Debug, Deserialize, Default)]
114struct ScanConfig {
115    projection: Option<Vec<usize>>,
116    projection_names: Option<Vec<String>>,
117    filters: Option<Vec<String>>,
118    series_row_selector: Option<String>,
119}
120
121fn resolve_projection(
122    scan_config: &ScanConfig,
123    metadata: Option<&RegionMetadata>,
124) -> error::Result<Option<Vec<usize>>> {
125    if scan_config.projection.is_some() && scan_config.projection_names.is_some() {
126        return Err(error::IllegalConfigSnafu {
127            msg: "scan config cannot contain both 'projection' and 'projection_names'".to_string(),
128        }
129        .build());
130    }
131
132    if let Some(projection) = &scan_config.projection {
133        return Ok(Some(projection.clone()));
134    }
135
136    if let Some(projection_names) = &scan_config.projection_names {
137        let metadata = metadata.context(error::IllegalConfigSnafu {
138            msg: "Missing region metadata while resolving 'projection_names'".to_string(),
139        })?;
140        let available_columns = metadata
141            .column_metadatas
142            .iter()
143            .map(|column| column.column_schema.name.as_str())
144            .collect::<Vec<_>>()
145            .join(", ");
146        let projection = projection_names
147            .iter()
148            .map(|name| {
149                metadata
150                    .column_index_by_name(name)
151                    .with_context(|| error::IllegalConfigSnafu {
152                        msg: format!(
153                            "Unknown column '{}' in projection_names, available columns: [{}]",
154                            name, available_columns
155                        ),
156                    })
157            })
158            .collect::<error::Result<Vec<_>>>()?;
159        return Ok(Some(projection));
160    }
161
162    Ok(None)
163}
164
165fn parse_region_id(s: &str) -> error::Result<RegionId> {
166    if s.contains(':') {
167        let parts: Vec<&str> = s.splitn(2, ':').collect();
168        let table_id: u32 = parts[0].parse().map_err(|e| {
169            error::IllegalConfigSnafu {
170                msg: format!("invalid table_id in region_id '{}': {}", s, e),
171            }
172            .build()
173        })?;
174        let region_num: u32 = parts[1].parse().map_err(|e| {
175            error::IllegalConfigSnafu {
176                msg: format!("invalid region_num in region_id '{}': {}", s, e),
177            }
178            .build()
179        })?;
180        Ok(RegionId::new(table_id, region_num))
181    } else {
182        let id: u64 = s.parse().map_err(|e| {
183            error::IllegalConfigSnafu {
184                msg: format!("invalid region_id '{}': {}", s, e),
185            }
186            .build()
187        })?;
188        Ok(RegionId::from_u64(id))
189    }
190}
191
192fn parse_path_type(s: &str) -> error::Result<PathType> {
193    match s.to_lowercase().as_str() {
194        "bare" => Ok(PathType::Bare),
195        "data" => Ok(PathType::Data),
196        "metadata" => Ok(PathType::Metadata),
197        _ => Err(error::IllegalConfigSnafu {
198            msg: format!("invalid path_type '{}', expected: bare, data, metadata", s),
199        }
200        .build()),
201    }
202}
203
204fn resolve_filters(
205    scan_config: &ScanConfig,
206    metadata: &RegionMetadata,
207) -> error::Result<Vec<DfExpr>> {
208    let Some(filters) = &scan_config.filters else {
209        return Ok(Vec::new());
210    };
211
212    let df_schema = metadata
213        .schema
214        .arrow_schema()
215        .clone()
216        .to_dfschema()
217        .map_err(|e| {
218            error::IllegalConfigSnafu {
219                msg: format!("Failed to convert region schema to DataFusion schema: {e}"),
220            }
221            .build()
222        })?;
223
224    let state = SessionStateBuilder::new()
225        .with_config(Default::default())
226        .with_runtime_env(Default::default())
227        .with_default_features()
228        .build();
229
230    filters
231        .iter()
232        .enumerate()
233        .map(|(idx, filter)| {
234            let mut parser = SqlParser::new(&GenericDialect {})
235                .try_with_sql(filter)
236                .map_err(|e| {
237                    error::IllegalConfigSnafu {
238                        msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
239                    }
240                    .build()
241                })?;
242
243            let sql_expr = parser.parse_expr().map_err(|e| {
244                error::IllegalConfigSnafu {
245                    msg: format!("Invalid filter at index {idx} ('{filter}'): {e}"),
246                }
247                .build()
248            })?;
249
250            state
251                .create_logical_expr_from_sql_expr(
252                    SqlExprWithAlias {
253                        expr: sql_expr,
254                        alias: None,
255                    },
256                    &df_schema,
257                )
258                .map_err(|e| {
259                    error::IllegalConfigSnafu {
260                        msg: format!(
261                            "Failed to convert filter at index {idx} ('{filter}') to logical expr: {e}"
262                        ),
263                    }
264                    .build()
265                })
266        })
267        .collect()
268}
269
270fn noop_partition_expr_fetcher() -> mito2::region::opener::PartitionExprFetcherRef {
271    struct NoopPartitionExprFetcher;
272
273    #[async_trait::async_trait]
274    impl mito2::region::opener::PartitionExprFetcher for NoopPartitionExprFetcher {
275        async fn fetch_expr(&self, _region_id: RegionId) -> Option<String> {
276            None
277        }
278    }
279
280    Arc::new(NoopPartitionExprFetcher)
281}
282
283struct EngineComponents {
284    data_home: String,
285    mito_config: MitoConfig,
286    object_store_manager: Arc<ObjectStoreManager>,
287    schema_metadata_manager: Arc<SchemaMetadataManager>,
288    file_ref_manager: Arc<FileReferenceManager>,
289    partition_expr_fetcher: mito2::region::opener::PartitionExprFetcherRef,
290}
291
292impl EngineComponents {
293    async fn build<S: store_api::logstore::LogStore>(
294        self,
295        log_store: Arc<S>,
296    ) -> error::Result<MitoEngine> {
297        MitoEngine::new(
298            &self.data_home,
299            self.mito_config,
300            log_store,
301            self.object_store_manager,
302            self.schema_metadata_manager,
303            self.file_ref_manager,
304            self.partition_expr_fetcher,
305            Plugins::default(),
306        )
307        .await
308        .map_err(BoxedError::new)
309        .context(error::BuildCliSnafu)
310    }
311}
312
313fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
314    let kv_backend = Arc::new(MemoryKvBackend::new());
315    let table_schema_cache = Arc::new(new_table_schema_cache(
316        "table_schema_name_cache".to_string(),
317        CacheBuilder::default().build(),
318        kv_backend.clone(),
319    ));
320    let schema_cache = Arc::new(new_schema_cache(
321        "schema_cache".to_string(),
322        CacheBuilder::default().build(),
323        kv_backend.clone(),
324    ));
325    Arc::new(SchemaMetadataManager::new(table_schema_cache, schema_cache))
326}
327
328impl ScanbenchCommand {
329    pub async fn run(&self) -> error::Result<()> {
330        if self.verbose {
331            common_telemetry::init_default_ut_logging();
332        }
333
334        println!("{}", "Starting scanbench...".cyan().bold());
335
336        let region_id = parse_region_id(&self.region_id)?;
337        let path_type = parse_path_type(&self.path_type)?;
338        println!(
339            "{} Region ID: {} (u64: {})",
340            "✓".green(),
341            self.region_id,
342            region_id.as_u64()
343        );
344
345        // Parse config and build object store
346        let (store_cfg, mito_config, wal_config) = parse_config(&self.config)?;
347        println!("{} Config parsed", "✓".green());
348
349        let object_store = build_object_store(&store_cfg).await?;
350        println!("{} Object store initialized", "✓".green());
351
352        let object_store_manager =
353            Arc::new(ObjectStoreManager::new("default", object_store.clone()));
354
355        // Create mock dependencies
356        let schema_metadata_manager = mock_schema_metadata_manager();
357        let file_ref_manager = Arc::new(FileReferenceManager::new(None));
358        let partition_expr_fetcher = noop_partition_expr_fetcher();
359
360        // Create MitoEngine with appropriate log store
361        let components = EngineComponents {
362            data_home: store_cfg.data_home.clone(),
363            mito_config,
364            object_store_manager,
365            schema_metadata_manager,
366            file_ref_manager,
367            partition_expr_fetcher,
368        };
369
370        let engine = match &wal_config {
371            DatanodeWalConfig::RaftEngine(raft_engine_config) if self.enable_wal => {
372                let data_home = normalize_dir(&store_cfg.data_home);
373                let wal_dir = match &raft_engine_config.dir {
374                    Some(dir) => dir.clone(),
375                    None => format!("{}{WAL_DIR}", data_home),
376                };
377                fs::create_dir_all(&wal_dir).await.map_err(|e| {
378                    error::IllegalConfigSnafu {
379                        msg: format!("failed to create WAL directory {}: {e}", wal_dir),
380                    }
381                    .build()
382                })?;
383                let log_store = Arc::new(
384                    RaftEngineLogStore::try_new(wal_dir, raft_engine_config)
385                        .await
386                        .map_err(BoxedError::new)
387                        .context(error::BuildCliSnafu)?,
388                );
389                println!("{} Using RaftEngine WAL", "✓".green());
390                components.build(log_store).await?
391            }
392            DatanodeWalConfig::Kafka(kafka_config) if self.enable_wal => {
393                let log_store = Arc::new(
394                    KafkaLogStore::try_new(kafka_config, None)
395                        .await
396                        .map_err(BoxedError::new)
397                        .context(error::BuildCliSnafu)?,
398                );
399                println!("{} Using Kafka WAL", "✓".green());
400                components.build(log_store).await?
401            }
402            _ => {
403                let log_store = Arc::new(NoopLogStore);
404                println!(
405                    "{} Using NoopLogStore (enable_wal={})",
406                    "✓".green(),
407                    self.enable_wal
408                );
409                components.build(log_store).await?
410            }
411        };
412
413        // Open region
414        let open_request = RegionOpenRequest {
415            engine: "mito".to_string(),
416            table_dir: self.table_dir.clone(),
417            path_type,
418            options: HashMap::default(),
419            skip_wal_replay: !self.enable_wal,
420            checkpoint: None,
421        };
422
423        engine
424            .handle_request(region_id, RegionRequest::Open(open_request))
425            .await
426            .map_err(BoxedError::new)
427            .context(error::BuildCliSnafu)?;
428        println!("{} Region opened", "✓".green());
429
430        // Load scan config
431        let scan_config = if let Some(path) = &self.scan_config {
432            let content = tokio::fs::read_to_string(path)
433                .await
434                .context(error::FileIoSnafu)?;
435            serde_json::from_str::<ScanConfig>(&content).context(error::SerdeJsonSnafu)?
436        } else {
437            ScanConfig::default()
438        };
439        let metadata = engine
440            .get_metadata(region_id)
441            .await
442            .map_err(BoxedError::new)
443            .context(error::BuildCliSnafu)?;
444        let projection = resolve_projection(&scan_config, Some(&metadata))?;
445        let filters = resolve_filters(&scan_config, &metadata)?;
446
447        // Build scan request
448        let distribution = match self.scanner.as_str() {
449            "seq" => None,
450            "unordered" => Some(TimeSeriesDistribution::TimeWindowed),
451            "series" => Some(TimeSeriesDistribution::PerSeries),
452            other => {
453                return Err(error::IllegalConfigSnafu {
454                    msg: format!(
455                        "Unknown scanner type '{}', expected: seq, unordered, series",
456                        other
457                    ),
458                }
459                .build());
460            }
461        };
462
463        let series_row_selector = match scan_config.series_row_selector.as_deref() {
464            Some("last_row") => Some(TimeSeriesRowSelector::LastRow),
465            Some(other) => {
466                return Err(error::IllegalConfigSnafu {
467                    msg: format!("Unknown series_row_selector '{}'", other),
468                }
469                .build());
470            }
471            None => None,
472        };
473
474        println!(
475            "{} Scanner: {}, Parallelism: {}, Iterations: {}, Force flat format: {}",
476            "ℹ".blue(),
477            self.scanner,
478            self.parallelism,
479            self.iterations,
480            self.force_flat_format,
481        );
482
483        // Start profiling if pprof_file is specified
484        #[cfg(unix)]
485        let profiler_guard = if self.pprof_file.is_some() {
486            println!("{} Starting profiling...", "⚡".yellow());
487            Some(
488                pprof::ProfilerGuardBuilder::default()
489                    .frequency(99)
490                    .blocklist(&["libc", "libgcc", "pthread", "vdso"])
491                    .build()
492                    .map_err(|e| {
493                        BoxedError::new(PlainError::new(
494                            format!("Failed to start profiler: {e}"),
495                            StatusCode::Unexpected,
496                        ))
497                    })
498                    .context(error::BuildCliSnafu)?,
499            )
500        } else {
501            None
502        };
503
504        #[cfg(not(unix))]
505        if self.pprof_file.is_some() {
506            eprintln!(
507                "{}: Profiling is not supported on this platform",
508                "Warning".yellow()
509            );
510        }
511
512        let mut total_rows_all = 0u64;
513        let mut total_elapsed_all = std::time::Duration::ZERO;
514
515        for iteration in 0..self.iterations {
516            let request = ScanRequest {
517                projection: projection.clone(),
518                filters: filters.clone(),
519                series_row_selector,
520                distribution,
521                force_flat_format: self.force_flat_format,
522                ..Default::default()
523            };
524
525            let start = Instant::now();
526
527            // Get scanner
528            let mut scanner = engine
529                .handle_query(region_id, request)
530                .await
531                .map_err(BoxedError::new)
532                .context(error::BuildCliSnafu)?;
533
534            // Get partition ranges and apply parallelism
535            let original_partitions = scanner.properties().partitions.clone();
536            let total_ranges: usize = original_partitions.iter().map(|p| p.len()).sum();
537
538            if self.verbose {
539                println!(
540                    "  {} Original partitions: {}, total ranges: {}",
541                    "ℹ".blue(),
542                    original_partitions.len(),
543                    total_ranges
544                );
545            }
546
547            if self.parallelism > 1 {
548                // Flatten all ranges
549                let all_ranges: Vec<_> = original_partitions.into_iter().flatten().collect();
550
551                // Distribute ranges across partitions
552                let mut partitions =
553                    ParallelizeScan::assign_partition_range(all_ranges, self.parallelism);
554
555                // Sort ranges within each partition by start time ascending
556                for partition in &mut partitions {
557                    partition.sort_by(|a, b| a.start.cmp(&b.start));
558                }
559
560                scanner
561                    .prepare(
562                        PrepareRequest::default()
563                            .with_ranges(partitions)
564                            .with_target_partitions(self.parallelism),
565                    )
566                    .map_err(BoxedError::new)
567                    .context(error::BuildCliSnafu)?;
568            }
569
570            // Scan all partitions
571            let num_partitions = scanner.properties().partitions.len();
572            let ctx = QueryScanContext::default();
573            let metrics_set = ExecutionPlanMetricsSet::new();
574
575            let mut scan_futures = FuturesUnordered::new();
576
577            for partition_idx in 0..num_partitions {
578                let mut stream = scanner
579                    .scan_partition(&ctx, &metrics_set, partition_idx)
580                    .map_err(BoxedError::new)
581                    .context(error::BuildCliSnafu)?;
582
583                scan_futures.push(tokio::spawn(async move {
584                    let mut rows = 0u64;
585                    while let Some(batch_result) = stream.next().await {
586                        match batch_result {
587                            Ok(batch) => {
588                                rows += batch.num_rows() as u64;
589                            }
590                            Err(e) => {
591                                return Err(BoxedError::new(e));
592                            }
593                        }
594                    }
595                    Ok::<u64, BoxedError>(rows)
596                }));
597            }
598
599            let mut total_rows = 0u64;
600            while let Some(task) = scan_futures.next().await {
601                let result = task
602                    .map_err(|e| {
603                        BoxedError::new(PlainError::new(
604                            format!("scan task failed: {e}"),
605                            StatusCode::Unexpected,
606                        ))
607                    })
608                    .context(error::BuildCliSnafu)?;
609                let rows = result.context(error::BuildCliSnafu)?;
610                total_rows += rows;
611            }
612
613            let elapsed = start.elapsed();
614            total_rows_all += total_rows;
615            total_elapsed_all += elapsed;
616
617            println!(
618                "  [iter {}] {} rows in {:?} ({} partitions)",
619                iteration + 1,
620                total_rows.to_string().cyan(),
621                elapsed,
622                num_partitions,
623            );
624        }
625
626        // Stop profiling and generate flamegraph if enabled
627        #[cfg(unix)]
628        if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
629            println!("{} Generating flamegraph...", "🔥".yellow());
630            match guard.report().build() {
631                Ok(report) => {
632                    let mut flamegraph_data = Vec::new();
633                    if let Err(e) = report.flamegraph(&mut flamegraph_data) {
634                        println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
635                    } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
636                        println!(
637                            "{}: Failed to write flamegraph to {}: {}",
638                            "Error".red(),
639                            pprof_file.display(),
640                            e
641                        );
642                    } else {
643                        println!(
644                            "{} Flamegraph saved to {}",
645                            "✓".green(),
646                            pprof_file.display().to_string().cyan()
647                        );
648                    }
649                }
650                Err(e) => {
651                    println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
652                }
653            }
654        }
655
656        // Summary
657        if self.iterations > 1 {
658            let avg_elapsed = total_elapsed_all / self.iterations as u32;
659            let avg_rows = total_rows_all / self.iterations as u64;
660            println!(
661                "\n{} Average: {} rows in {:?} over {} iterations",
662                "Summary".green().bold(),
663                avg_rows.to_string().cyan(),
664                avg_elapsed,
665                self.iterations,
666            );
667        }
668
669        println!("\n{}", "Benchmark completed!".green().bold());
670        Ok(())
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use sqlparser::ast::{BinaryOperator, Expr};
677    use sqlparser::dialect::GenericDialect;
678    use sqlparser::parser::Parser;
679
680    use super::{ScanConfig, resolve_projection};
681    use crate::error;
682
683    #[test]
684    fn test_parse_scan_config_projection_names() {
685        let json = r#"{"projection_names":["host","ts"]}"#;
686        let config: ScanConfig = serde_json::from_str(json).unwrap();
687
688        assert_eq!(
689            config.projection_names,
690            Some(vec!["host".to_string(), "ts".to_string()])
691        );
692        assert_eq!(config.projection, None);
693    }
694
695    #[test]
696    fn test_resolve_projection_by_indexes() -> error::Result<()> {
697        let config = ScanConfig {
698            projection: Some(vec![0, 2]),
699            projection_names: None,
700            filters: None,
701            series_row_selector: None,
702        };
703
704        let projection = resolve_projection(&config, None)?;
705        assert_eq!(projection, Some(vec![0, 2]));
706        Ok(())
707    }
708
709    #[test]
710    fn test_resolve_projection_by_names_without_metadata() {
711        let config = ScanConfig {
712            projection: None,
713            projection_names: Some(vec!["cpu".to_string(), "host".to_string()]),
714            filters: None,
715            series_row_selector: None,
716        };
717
718        let err = resolve_projection(&config, None).unwrap_err();
719        assert!(
720            err.to_string()
721                .contains("Missing region metadata while resolving 'projection_names'")
722        );
723    }
724
725    #[test]
726    fn test_resolve_projection_conflict_fields() {
727        let config = ScanConfig {
728            projection: Some(vec![0]),
729            projection_names: Some(vec!["host".to_string()]),
730            filters: None,
731            series_row_selector: None,
732        };
733
734        let err = resolve_projection(&config, None).unwrap_err();
735        let msg = err.to_string();
736        assert!(msg.contains("projection"));
737        assert!(msg.contains("projection_names"));
738    }
739
740    #[test]
741    fn test_sqlparser_parse_expr_string() {
742        let dialect = GenericDialect {};
743        let mut parser = Parser::new(&dialect)
744            .try_with_sql("host = 'web-1' AND cpu > 80")
745            .unwrap();
746
747        let expr = parser.parse_expr().unwrap();
748
749        match expr {
750            Expr::BinaryOp { op, .. } => assert_eq!(op, BinaryOperator::And),
751            other => panic!("expected BinaryOp, got: {other:?}"),
752        }
753    }
754
755    #[test]
756    fn test_parse_scan_config_filters() {
757        let json = r#"{"filters":["host = 'web-1'","cpu > 80"]}"#;
758        let config: ScanConfig = serde_json::from_str(json).unwrap();
759
760        assert_eq!(
761            config.filters,
762            Some(vec!["host = 'web-1'".to_string(), "cpu > 80".to_string()])
763        );
764    }
765}