frontend/instance/
log_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use async_trait::async_trait;
19use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
20use client::Output;
21use common_error::ext::BoxedError;
22use datatypes::timestamp::TimestampNanosecond;
23use pipeline::pipeline_operator::PipelineOperator;
24use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
25use servers::error::{
26    AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
27};
28use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
29use servers::query_handler::PipelineHandler;
30use session::context::{QueryContext, QueryContextRef};
31use snafu::ResultExt;
32use table::Table;
33
34use crate::instance::Instance;
35
36#[async_trait]
37impl PipelineHandler for Instance {
38    async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
39        self.plugins
40            .get::<PermissionCheckerRef>()
41            .as_ref()
42            .check_permission(ctx.current_user(), PermissionReq::LogWrite)
43            .context(AuthSnafu)?;
44
45        let log = self
46            .plugins
47            .get::<LogIngestInterceptorRef<ServerError>>()
48            .as_ref()
49            .pre_ingest(log, ctx.clone())?;
50
51        self.handle_log_inserts(log, ctx).await
52    }
53
54    async fn get_pipeline(
55        &self,
56        name: &str,
57        version: PipelineVersion,
58        query_ctx: QueryContextRef,
59    ) -> ServerResult<Arc<Pipeline>> {
60        self.pipeline_operator
61            .get_pipeline(query_ctx, name, version)
62            .await
63            .context(PipelineSnafu)
64    }
65
66    async fn insert_pipeline(
67        &self,
68        name: &str,
69        content_type: &str,
70        pipeline: &str,
71        query_ctx: QueryContextRef,
72    ) -> ServerResult<PipelineInfo> {
73        self.pipeline_operator
74            .insert_pipeline(name, content_type, pipeline, query_ctx)
75            .await
76            .context(PipelineSnafu)
77    }
78
79    async fn delete_pipeline(
80        &self,
81        name: &str,
82        version: PipelineVersion,
83        ctx: QueryContextRef,
84    ) -> ServerResult<Option<()>> {
85        self.pipeline_operator
86            .delete_pipeline(name, version, ctx)
87            .await
88            .context(PipelineSnafu)
89    }
90
91    async fn get_table(
92        &self,
93        table: &str,
94        query_ctx: &QueryContext,
95    ) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
96        let catalog = query_ctx.current_catalog();
97        let schema = query_ctx.current_schema();
98        self.catalog_manager
99            .table(catalog, &schema, table, None)
100            .await
101    }
102
103    fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline> {
104        PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
105    }
106
107    async fn get_pipeline_str(
108        &self,
109        name: &str,
110        version: PipelineVersion,
111        query_ctx: QueryContextRef,
112    ) -> ServerResult<(String, TimestampNanosecond)> {
113        self.pipeline_operator
114            .get_pipeline_str(name, version, query_ctx)
115            .await
116            .context(PipelineSnafu)
117    }
118}
119
120impl Instance {
121    pub async fn handle_log_inserts(
122        &self,
123        log: RowInsertRequests,
124        ctx: QueryContextRef,
125    ) -> ServerResult<Output> {
126        self.inserter
127            .handle_log_inserts(log, ctx, self.statement_executor.as_ref())
128            .await
129            .map_err(BoxedError::new)
130            .context(ExecuteGrpcRequestSnafu)
131    }
132
133    pub async fn handle_trace_inserts(
134        &self,
135        rows: RowInsertRequests,
136        ctx: QueryContextRef,
137    ) -> ServerResult<Output> {
138        self.inserter
139            .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
140            .await
141            .map_err(BoxedError::new)
142            .context(ExecuteGrpcRequestSnafu)
143    }
144}