frontend/instance/
logs.rs1use std::ops::Deref;
16
17use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
18use client::Output;
19use common_error::ext::BoxedError;
20use log_query::LogQuery;
21use server_error::Result as ServerResult;
22use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
23use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
24use servers::query_handler::LogQueryHandler;
25use session::context::{QueryContext, QueryContextRef};
26use snafu::ResultExt;
27use tonic::async_trait;
28
29use crate::instance::Instance;
30
31#[async_trait]
32impl LogQueryHandler for Instance {
33 async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult<Output> {
34 let interceptor = self
35 .plugins
36 .get::<LogQueryInterceptorRef<server_error::Error>>();
37
38 self.plugins
39 .get::<PermissionCheckerRef>()
40 .as_ref()
41 .check_permission(ctx.current_user(), PermissionReq::LogQuery)
42 .context(AuthSnafu)?;
43
44 interceptor.as_ref().pre_query(&request, ctx.clone())?;
45
46 request
47 .time_filter
48 .canonicalize()
49 .map_err(BoxedError::new)
50 .context(ExecuteQuerySnafu)?;
51
52 let plan = self
53 .query_engine
54 .planner()
55 .plan_logs_query(request, ctx.clone())
56 .await
57 .map_err(BoxedError::new)
58 .context(ExecuteQuerySnafu)?;
59
60 let output = self
61 .statement_executor
62 .exec_plan(plan, ctx.clone())
63 .await
64 .map_err(BoxedError::new)
65 .context(ExecuteQuerySnafu)?;
66
67 Ok(interceptor.as_ref().post_query(output, ctx.clone())?)
68 }
69
70 fn catalog_manager(&self, _ctx: &QueryContext) -> ServerResult<&dyn catalog::CatalogManager> {
71 Ok(self.catalog_manager.deref())
72 }
73}