servers/query_handler/
sql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_query::Output;
20use datafusion_expr::LogicalPlan;
21use query::parser::PromQuery;
22use session::context::QueryContextRef;
23use snafu::ResultExt;
24use sql::statements::statement::Statement;
25
26use crate::error::{self, Result};
27
28pub type SqlQueryHandlerRef<E> = Arc<dyn SqlQueryHandler<Error = E> + Send + Sync>;
29pub type ServerSqlQueryHandlerRef = SqlQueryHandlerRef<error::Error>;
30use query::query_engine::DescribeResult;
31
32#[async_trait]
33pub trait SqlQueryHandler {
34    type Error: ErrorExt;
35
36    async fn do_query(
37        &self,
38        query: &str,
39        query_ctx: QueryContextRef,
40    ) -> Vec<std::result::Result<Output, Self::Error>>;
41
42    async fn do_exec_plan(
43        &self,
44        stmt: Option<Statement>,
45        plan: LogicalPlan,
46        query_ctx: QueryContextRef,
47    ) -> std::result::Result<Output, Self::Error>;
48
49    async fn do_promql_query(
50        &self,
51        query: &PromQuery,
52        query_ctx: QueryContextRef,
53    ) -> Vec<std::result::Result<Output, Self::Error>>;
54
55    async fn do_describe(
56        &self,
57        stmt: Statement,
58        query_ctx: QueryContextRef,
59    ) -> std::result::Result<Option<DescribeResult>, Self::Error>;
60
61    async fn is_valid_schema(
62        &self,
63        catalog: &str,
64        schema: &str,
65    ) -> std::result::Result<bool, Self::Error>;
66}
67
68pub struct ServerSqlQueryHandlerAdapter<E>(SqlQueryHandlerRef<E>);
69
70impl<E> ServerSqlQueryHandlerAdapter<E> {
71    pub fn arc(handler: SqlQueryHandlerRef<E>) -> Arc<Self> {
72        Arc::new(Self(handler))
73    }
74}
75
76#[async_trait]
77impl<E> SqlQueryHandler for ServerSqlQueryHandlerAdapter<E>
78where
79    E: ErrorExt + Send + Sync + 'static,
80{
81    type Error = error::Error;
82
83    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
84        self.0
85            .do_query(query, query_ctx)
86            .await
87            .into_iter()
88            .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
89            .collect()
90    }
91
92    async fn do_exec_plan(
93        &self,
94        stmt: Option<Statement>,
95        plan: LogicalPlan,
96        query_ctx: QueryContextRef,
97    ) -> Result<Output> {
98        self.0
99            .do_exec_plan(stmt, plan, query_ctx)
100            .await
101            .map_err(BoxedError::new)
102            .context(error::ExecutePlanSnafu)
103    }
104
105    async fn do_promql_query(
106        &self,
107        query: &PromQuery,
108        query_ctx: QueryContextRef,
109    ) -> Vec<Result<Output>> {
110        self.0
111            .do_promql_query(query, query_ctx)
112            .await
113            .into_iter()
114            .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
115            .collect()
116    }
117
118    async fn do_describe(
119        &self,
120        stmt: Statement,
121        query_ctx: QueryContextRef,
122    ) -> Result<Option<DescribeResult>> {
123        self.0
124            .do_describe(stmt, query_ctx)
125            .await
126            .map_err(BoxedError::new)
127            .context(error::DescribeStatementSnafu)
128    }
129
130    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
131        self.0
132            .is_valid_schema(catalog, schema)
133            .await
134            .map_err(BoxedError::new)
135            .context(error::CheckDatabaseValiditySnafu)
136    }
137}