servers/query_handler/
sql.rs1use std::sync::Arc;
16
17use async_trait::async_trait;
18use common_error::ext::{BoxedError, ErrorExt};
19use common_query::Output;
20use datafusion_expr::LogicalPlan;
21use query::parser::PromQuery;
22use session::context::QueryContextRef;
23use snafu::ResultExt;
24use sql::statements::statement::Statement;
25
26use crate::error::{self, Result};
27
28pub type SqlQueryHandlerRef<E> = Arc<dyn SqlQueryHandler<Error = E> + Send + Sync>;
29pub type ServerSqlQueryHandlerRef = SqlQueryHandlerRef<error::Error>;
30use query::query_engine::DescribeResult;
31
32#[async_trait]
33pub trait SqlQueryHandler {
34 type Error: ErrorExt;
35
36 async fn do_query(
37 &self,
38 query: &str,
39 query_ctx: QueryContextRef,
40 ) -> Vec<std::result::Result<Output, Self::Error>>;
41
42 async fn do_exec_plan(
43 &self,
44 plan: LogicalPlan,
45 query_ctx: QueryContextRef,
46 ) -> std::result::Result<Output, Self::Error>;
47
48 async fn do_promql_query(
49 &self,
50 query: &PromQuery,
51 query_ctx: QueryContextRef,
52 ) -> Vec<std::result::Result<Output, Self::Error>>;
53
54 async fn do_describe(
55 &self,
56 stmt: Statement,
57 query_ctx: QueryContextRef,
58 ) -> std::result::Result<Option<DescribeResult>, Self::Error>;
59
60 async fn is_valid_schema(
61 &self,
62 catalog: &str,
63 schema: &str,
64 ) -> std::result::Result<bool, Self::Error>;
65}
66
67pub struct ServerSqlQueryHandlerAdapter<E>(SqlQueryHandlerRef<E>);
68
69impl<E> ServerSqlQueryHandlerAdapter<E> {
70 pub fn arc(handler: SqlQueryHandlerRef<E>) -> Arc<Self> {
71 Arc::new(Self(handler))
72 }
73}
74
75#[async_trait]
76impl<E> SqlQueryHandler for ServerSqlQueryHandlerAdapter<E>
77where
78 E: ErrorExt + Send + Sync + 'static,
79{
80 type Error = error::Error;
81
82 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
83 self.0
84 .do_query(query, query_ctx)
85 .await
86 .into_iter()
87 .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
88 .collect()
89 }
90
91 async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
92 self.0
93 .do_exec_plan(plan, query_ctx)
94 .await
95 .map_err(BoxedError::new)
96 .context(error::ExecutePlanSnafu)
97 }
98
99 async fn do_promql_query(
100 &self,
101 query: &PromQuery,
102 query_ctx: QueryContextRef,
103 ) -> Vec<Result<Output>> {
104 self.0
105 .do_promql_query(query, query_ctx)
106 .await
107 .into_iter()
108 .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
109 .collect()
110 }
111
112 async fn do_describe(
113 &self,
114 stmt: Statement,
115 query_ctx: QueryContextRef,
116 ) -> Result<Option<DescribeResult>> {
117 self.0
118 .do_describe(stmt, query_ctx)
119 .await
120 .map_err(BoxedError::new)
121 .context(error::DescribeStatementSnafu)
122 }
123
124 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
125 self.0
126 .is_valid_schema(catalog, schema)
127 .await
128 .map_err(BoxedError::new)
129 .context(error::CheckDatabaseValiditySnafu)
130 }
131}