frontend/instance/
log_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use async_trait::async_trait;
19use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
20use client::Output;
21use common_error::ext::BoxedError;
22use datatypes::timestamp::TimestampNanosecond;
23use pipeline::pipeline_operator::PipelineOperator;
24use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
25use servers::error::{
26    AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu,
27    Result as ServerResult,
28};
29use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
30use servers::query_handler::PipelineHandler;
31use session::context::{QueryContext, QueryContextRef};
32use snafu::ResultExt;
33use table::Table;
34
35use crate::instance::Instance;
36
37#[async_trait]
38impl PipelineHandler for Instance {
39    async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
40        self.plugins
41            .get::<PermissionCheckerRef>()
42            .as_ref()
43            .check_permission(ctx.current_user(), PermissionReq::LogWrite)
44            .context(AuthSnafu)?;
45
46        let log = self
47            .plugins
48            .get::<LogIngestInterceptorRef<ServerError>>()
49            .as_ref()
50            .pre_ingest(log, ctx.clone())?;
51
52        self.handle_log_inserts(log, ctx).await
53    }
54
55    async fn get_pipeline(
56        &self,
57        name: &str,
58        version: PipelineVersion,
59        query_ctx: QueryContextRef,
60    ) -> ServerResult<Arc<Pipeline>> {
61        self.pipeline_operator
62            .get_pipeline(query_ctx, name, version)
63            .await
64            .context(PipelineSnafu)
65    }
66
67    async fn insert_pipeline(
68        &self,
69        name: &str,
70        content_type: &str,
71        pipeline: &str,
72        query_ctx: QueryContextRef,
73    ) -> ServerResult<PipelineInfo> {
74        self.pipeline_operator
75            .insert_pipeline(name, content_type, pipeline, query_ctx)
76            .await
77            .context(PipelineSnafu)
78    }
79
80    async fn delete_pipeline(
81        &self,
82        name: &str,
83        version: PipelineVersion,
84        ctx: QueryContextRef,
85    ) -> ServerResult<Option<()>> {
86        self.pipeline_operator
87            .delete_pipeline(name, version, ctx)
88            .await
89            .context(PipelineSnafu)
90    }
91
92    async fn get_table(
93        &self,
94        table: &str,
95        query_ctx: &QueryContext,
96    ) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
97        let catalog = query_ctx.current_catalog();
98        let schema = query_ctx.current_schema();
99        self.catalog_manager
100            .table(catalog, &schema, table, None)
101            .await
102    }
103
104    fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline> {
105        PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
106    }
107
108    async fn get_pipeline_str(
109        &self,
110        name: &str,
111        version: PipelineVersion,
112        query_ctx: QueryContextRef,
113    ) -> ServerResult<(String, TimestampNanosecond)> {
114        self.pipeline_operator
115            .get_pipeline_str(name, version, query_ctx)
116            .await
117            .context(PipelineSnafu)
118    }
119}
120
121impl Instance {
122    pub async fn handle_log_inserts(
123        &self,
124        log: RowInsertRequests,
125        ctx: QueryContextRef,
126    ) -> ServerResult<Output> {
127        let _guard = if let Some(limiter) = &self.limiter {
128            Some(
129                limiter
130                    .limit_row_inserts(&log)
131                    .await
132                    .map_err(BoxedError::new)
133                    .context(OtherSnafu)?,
134            )
135        } else {
136            None
137        };
138
139        self.inserter
140            .handle_log_inserts(log, ctx, self.statement_executor.as_ref())
141            .await
142            .map_err(BoxedError::new)
143            .context(ExecuteGrpcRequestSnafu)
144    }
145
146    pub async fn handle_trace_inserts(
147        &self,
148        rows: RowInsertRequests,
149        ctx: QueryContextRef,
150    ) -> ServerResult<Output> {
151        let _guard = if let Some(limiter) = &self.limiter {
152            Some(
153                limiter
154                    .limit_row_inserts(&rows)
155                    .await
156                    .map_err(BoxedError::new)
157                    .context(OtherSnafu)?,
158            )
159        } else {
160            None
161        };
162
163        self.inserter
164            .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
165            .await
166            .map_err(BoxedError::new)
167            .context(ExecuteGrpcRequestSnafu)
168    }
169}