frontend/instance/
logs.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
18use client::Output;
19use common_error::ext::BoxedError;
20use log_query::LogQuery;
21use server_error::Result as ServerResult;
22use servers::error::{self as server_error, AuthSnafu, ExecuteQuerySnafu};
23use servers::interceptor::{LogQueryInterceptor, LogQueryInterceptorRef};
24use servers::query_handler::LogQueryHandler;
25use session::context::{QueryContext, QueryContextRef};
26use snafu::ResultExt;
27use tonic::async_trait;
28
29use crate::instance::Instance;
30
31#[async_trait]
32impl LogQueryHandler for Instance {
33    async fn query(&self, mut request: LogQuery, ctx: QueryContextRef) -> ServerResult<Output> {
34        let interceptor = self
35            .plugins
36            .get::<LogQueryInterceptorRef<server_error::Error>>();
37
38        self.plugins
39            .get::<PermissionCheckerRef>()
40            .as_ref()
41            .check_permission(ctx.current_user(), PermissionReq::LogQuery)
42            .context(AuthSnafu)?;
43
44        interceptor.as_ref().pre_query(&request, ctx.clone())?;
45
46        request
47            .time_filter
48            .canonicalize()
49            .map_err(BoxedError::new)
50            .context(ExecuteQuerySnafu)?;
51
52        let plan = self
53            .query_engine
54            .planner()
55            .plan_logs_query(request, ctx.clone())
56            .await
57            .map_err(BoxedError::new)
58            .context(ExecuteQuerySnafu)?;
59
60        let output = self
61            .statement_executor
62            .exec_plan(plan, ctx.clone())
63            .await
64            .map_err(BoxedError::new)
65            .context(ExecuteQuerySnafu)?;
66
67        Ok(interceptor.as_ref().post_query(output, ctx.clone())?)
68    }
69
70    fn catalog_manager(&self, _ctx: &QueryContext) -> ServerResult<&dyn catalog::CatalogManager> {
71        Ok(self.catalog_manager.deref())
72    }
73}