1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod context;
mod default_serializer;
pub mod options;
mod state;
use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function::FunctionRef;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::handlers::{
    FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
use session::context::QueryContextRef;
use table::TableRef;

use crate::dataframe::DataFrame;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::planner::LogicalPlanner;
pub use crate::query_engine::context::QueryEngineContext;
pub use crate::query_engine::state::QueryEngineState;
use crate::region_query::RegionQueryHandlerRef;

/// Describe statement result
#[derive(Debug)]
pub struct DescribeResult {
    /// The schema of statement
    pub schema: Schema,
    /// The logical plan for statement
    pub logical_plan: LogicalPlan,
}

#[async_trait]
pub trait QueryEngine: Send + Sync {
    /// Returns the query engine as Any
    /// so that it can be downcast to a specific implementation.
    fn as_any(&self) -> &dyn Any;

    /// Returns the logical planner
    fn planner(&self) -> Arc<dyn LogicalPlanner>;

    /// Returns the query engine name.
    fn name(&self) -> &str;

    /// Describe the given [`LogicalPlan`].
    async fn describe(
        &self,
        plan: LogicalPlan,
        query_ctx: QueryContextRef,
    ) -> Result<DescribeResult>;

    /// Execute the given [`LogicalPlan`].
    async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output>;

    /// Register a [`ScalarUdf`].
    fn register_udf(&self, udf: ScalarUdf);

    /// Register an aggregate function.
    ///
    /// # Panics
    /// Will panic if the function with same name is already registered.
    fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);

    /// Register a SQL function.
    /// Will override if the function with same name is already registered.
    fn register_function(&self, func: FunctionRef);

    /// Create a DataFrame from a table.
    fn read_table(&self, table: TableRef) -> Result<DataFrame>;

    /// Create a [`QueryEngineContext`].
    fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext;

    /// Retrieve the query engine state [`QueryEngineState`]
    fn engine_state(&self) -> &QueryEngineState;
}

pub struct QueryEngineFactory {
    query_engine: Arc<dyn QueryEngine>,
}

impl QueryEngineFactory {
    pub fn new(
        catalog_manager: CatalogManagerRef,
        region_query_handler: Option<RegionQueryHandlerRef>,
        table_mutation_handler: Option<TableMutationHandlerRef>,
        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
        flow_service_handler: Option<FlowServiceHandlerRef>,
        with_dist_planner: bool,
    ) -> Self {
        Self::new_with_plugins(
            catalog_manager,
            region_query_handler,
            table_mutation_handler,
            procedure_service_handler,
            flow_service_handler,
            with_dist_planner,
            Default::default(),
        )
    }

    pub fn new_with_plugins(
        catalog_manager: CatalogManagerRef,
        region_query_handler: Option<RegionQueryHandlerRef>,
        table_mutation_handler: Option<TableMutationHandlerRef>,
        procedure_service_handler: Option<ProcedureServiceHandlerRef>,
        flow_service_handler: Option<FlowServiceHandlerRef>,
        with_dist_planner: bool,
        plugins: Plugins,
    ) -> Self {
        let state = Arc::new(QueryEngineState::new(
            catalog_manager,
            region_query_handler,
            table_mutation_handler,
            procedure_service_handler,
            flow_service_handler,
            with_dist_planner,
            plugins.clone(),
        ));
        let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
        register_functions(&query_engine);
        Self { query_engine }
    }

    pub fn query_engine(&self) -> QueryEngineRef {
        self.query_engine.clone()
    }
}

/// Register all functions implemented by GreptimeDB
fn register_functions(query_engine: &Arc<DatafusionQueryEngine>) {
    for func in FUNCTION_REGISTRY.functions() {
        query_engine.register_function(func);
    }

    for accumulator in FUNCTION_REGISTRY.aggregate_functions() {
        query_engine.register_aggregate_function(accumulator);
    }
}

pub type QueryEngineRef = Arc<dyn QueryEngine>;

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_query_engine_factory() {
        let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
        let factory = QueryEngineFactory::new(catalog_list, None, None, None, None, false);

        let engine = factory.query_engine();

        assert_eq!("datafusion", engine.name());
    }
}