servers/query_handler/
sql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_query::Output;
20use datafusion_expr::LogicalPlan;
21use query::parser::PromQuery;
22use session::context::QueryContextRef;
23use snafu::ResultExt;
24use sql::statements::statement::Statement;
25
26use crate::error::{self, Result};
27
28pub type SqlQueryHandlerRef<E> = Arc<dyn SqlQueryHandler<Error = E> + Send + Sync>;
29pub type ServerSqlQueryHandlerRef = SqlQueryHandlerRef<error::Error>;
30use query::query_engine::DescribeResult;
31
32#[async_trait]
33pub trait SqlQueryHandler {
34    type Error: ErrorExt;
35
36    async fn do_query(
37        &self,
38        query: &str,
39        query_ctx: QueryContextRef,
40    ) -> Vec<std::result::Result<Output, Self::Error>>;
41
42    async fn do_exec_plan(
43        &self,
44        plan: LogicalPlan,
45        query_ctx: QueryContextRef,
46    ) -> std::result::Result<Output, Self::Error>;
47
48    async fn do_promql_query(
49        &self,
50        query: &PromQuery,
51        query_ctx: QueryContextRef,
52    ) -> Vec<std::result::Result<Output, Self::Error>>;
53
54    async fn do_describe(
55        &self,
56        stmt: Statement,
57        query_ctx: QueryContextRef,
58    ) -> std::result::Result<Option<DescribeResult>, Self::Error>;
59
60    async fn is_valid_schema(
61        &self,
62        catalog: &str,
63        schema: &str,
64    ) -> std::result::Result<bool, Self::Error>;
65}
66
67pub struct ServerSqlQueryHandlerAdapter<E>(SqlQueryHandlerRef<E>);
68
69impl<E> ServerSqlQueryHandlerAdapter<E> {
70    pub fn arc(handler: SqlQueryHandlerRef<E>) -> Arc<Self> {
71        Arc::new(Self(handler))
72    }
73}
74
75#[async_trait]
76impl<E> SqlQueryHandler for ServerSqlQueryHandlerAdapter<E>
77where
78    E: ErrorExt + Send + Sync + 'static,
79{
80    type Error = error::Error;
81
82    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
83        self.0
84            .do_query(query, query_ctx)
85            .await
86            .into_iter()
87            .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
88            .collect()
89    }
90
91    async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
92        self.0
93            .do_exec_plan(plan, query_ctx)
94            .await
95            .map_err(BoxedError::new)
96            .context(error::ExecutePlanSnafu)
97    }
98
99    async fn do_promql_query(
100        &self,
101        query: &PromQuery,
102        query_ctx: QueryContextRef,
103    ) -> Vec<Result<Output>> {
104        self.0
105            .do_promql_query(query, query_ctx)
106            .await
107            .into_iter()
108            .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
109            .collect()
110    }
111
112    async fn do_describe(
113        &self,
114        stmt: Statement,
115        query_ctx: QueryContextRef,
116    ) -> Result<Option<DescribeResult>> {
117        self.0
118            .do_describe(stmt, query_ctx)
119            .await
120            .map_err(BoxedError::new)
121            .context(error::DescribeStatementSnafu)
122    }
123
124    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
125        self.0
126            .is_valid_schema(catalog, schema)
127            .await
128            .map_err(BoxedError::new)
129            .context(error::CheckDatabaseValiditySnafu)
130    }
131}