servers/query_handler/
sql.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_query::Output;
20use datafusion_expr::LogicalPlan;
21use query::parser::PromQuery;
22use session::context::QueryContextRef;
23use snafu::ResultExt;
24use sql::statements::statement::Statement;
25
26use crate::error::{self, Result};
27
28pub type SqlQueryHandlerRef<E> = Arc<dyn SqlQueryHandler<Error = E> + Send + Sync>;
29pub type ServerSqlQueryHandlerRef = SqlQueryHandlerRef<error::Error>;
30use query::query_engine::DescribeResult;
31
32#[async_trait]
33pub trait SqlQueryHandler {
34 type Error: ErrorExt;
35
36 async fn do_query(
37 &self,
38 query: &str,
39 query_ctx: QueryContextRef,
40 ) -> Vec<std::result::Result<Output, Self::Error>>;
41
42 async fn do_exec_plan(
43 &self,
44 stmt: Option<Statement>,
45 plan: LogicalPlan,
46 query_ctx: QueryContextRef,
47 ) -> std::result::Result<Output, Self::Error>;
48
49 async fn do_promql_query(
50 &self,
51 query: &PromQuery,
52 query_ctx: QueryContextRef,
53 ) -> Vec<std::result::Result<Output, Self::Error>>;
54
55 async fn do_describe(
56 &self,
57 stmt: Statement,
58 query_ctx: QueryContextRef,
59 ) -> std::result::Result<Option<DescribeResult>, Self::Error>;
60
61 async fn is_valid_schema(
62 &self,
63 catalog: &str,
64 schema: &str,
65 ) -> std::result::Result<bool, Self::Error>;
66}
67
68pub struct ServerSqlQueryHandlerAdapter<E>(SqlQueryHandlerRef<E>);
69
70impl<E> ServerSqlQueryHandlerAdapter<E> {
71 pub fn arc(handler: SqlQueryHandlerRef<E>) -> Arc<Self> {
72 Arc::new(Self(handler))
73 }
74}
75
76#[async_trait]
77impl<E> SqlQueryHandler for ServerSqlQueryHandlerAdapter<E>
78where
79 E: ErrorExt + Send + Sync + 'static,
80{
81 type Error = error::Error;
82
83 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
84 self.0
85 .do_query(query, query_ctx)
86 .await
87 .into_iter()
88 .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
89 .collect()
90 }
91
92 async fn do_exec_plan(
93 &self,
94 stmt: Option<Statement>,
95 plan: LogicalPlan,
96 query_ctx: QueryContextRef,
97 ) -> Result<Output> {
98 self.0
99 .do_exec_plan(stmt, plan, query_ctx)
100 .await
101 .map_err(BoxedError::new)
102 .context(error::ExecutePlanSnafu)
103 }
104
105 async fn do_promql_query(
106 &self,
107 query: &PromQuery,
108 query_ctx: QueryContextRef,
109 ) -> Vec<Result<Output>> {
110 self.0
111 .do_promql_query(query, query_ctx)
112 .await
113 .into_iter()
114 .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
115 .collect()
116 }
117
118 async fn do_describe(
119 &self,
120 stmt: Statement,
121 query_ctx: QueryContextRef,
122 ) -> Result<Option<DescribeResult>> {
123 self.0
124 .do_describe(stmt, query_ctx)
125 .await
126 .map_err(BoxedError::new)
127 .context(error::DescribeStatementSnafu)
128 }
129
130 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
131 self.0
132 .is_valid_schema(catalog, schema)
133 .await
134 .map_err(BoxedError::new)
135 .context(error::CheckDatabaseValiditySnafu)
136 }
137}