promql/extension_plan/
planner.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use datafusion::error::Result as DfResult;
19use datafusion::execution::context::SessionState;
20use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
21use datafusion::physical_plan::ExecutionPlan;
22use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
23
24use crate::extension_plan::{
25 EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide,
26 SeriesNormalize, UnionDistinctOn,
27};
28
29pub struct PromExtensionPlanner;
30
31#[async_trait]
32impl ExtensionPlanner for PromExtensionPlanner {
33 async fn plan_extension(
34 &self,
35 planner: &dyn PhysicalPlanner,
36 node: &dyn UserDefinedLogicalNode,
37 _logical_inputs: &[&LogicalPlan],
38 physical_inputs: &[Arc<dyn ExecutionPlan>],
39 session_state: &SessionState,
40 ) -> DfResult<Option<Arc<dyn ExecutionPlan>>> {
41 if let Some(node) = node.as_any().downcast_ref::<SeriesNormalize>() {
42 Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
43 } else if let Some(node) = node.as_any().downcast_ref::<InstantManipulate>() {
44 Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
45 } else if let Some(node) = node.as_any().downcast_ref::<RangeManipulate>() {
46 Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
47 } else if let Some(node) = node.as_any().downcast_ref::<SeriesDivide>() {
48 Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
49 } else if let Some(node) = node.as_any().downcast_ref::<EmptyMetric>() {
50 Ok(Some(node.to_execution_plan(session_state, planner)?))
51 } else if let Some(node) = node.as_any().downcast_ref::<ScalarCalculate>() {
52 Ok(Some(node.to_execution_plan(physical_inputs[0].clone())?))
53 } else if let Some(node) = node.as_any().downcast_ref::<HistogramFold>() {
54 Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
55 } else if let Some(node) = node.as_any().downcast_ref::<UnionDistinctOn>() {
56 Ok(Some(node.to_execution_plan(
57 physical_inputs[0].clone(),
58 physical_inputs[1].clone(),
59 )))
60 } else {
61 Ok(None)
62 }
63 }
64}