frontend/instance/
log_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use async_trait::async_trait;
19use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
20use client::Output;
21use common_error::ext::BoxedError;
22use datatypes::timestamp::TimestampNanosecond;
23use pipeline::pipeline_operator::PipelineOperator;
24use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
25use servers::error::{
26    AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu,
27    PipelineSnafu, Result as ServerResult,
28};
29use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
30use servers::query_handler::PipelineHandler;
31use session::context::{QueryContext, QueryContextRef};
32use snafu::ResultExt;
33use table::Table;
34
35use crate::instance::Instance;
36
37#[async_trait]
38impl PipelineHandler for Instance {
39    async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
40        self.plugins
41            .get::<PermissionCheckerRef>()
42            .as_ref()
43            .check_permission(ctx.current_user(), PermissionReq::LogWrite)
44            .context(AuthSnafu)?;
45
46        let log = self
47            .plugins
48            .get::<LogIngestInterceptorRef<ServerError>>()
49            .as_ref()
50            .pre_ingest(log, ctx.clone())?;
51
52        self.handle_log_inserts(log, ctx).await
53    }
54
55    async fn get_pipeline(
56        &self,
57        name: &str,
58        version: PipelineVersion,
59        query_ctx: QueryContextRef,
60    ) -> ServerResult<Arc<Pipeline>> {
61        self.pipeline_operator
62            .get_pipeline(query_ctx, name, version)
63            .await
64            .context(PipelineSnafu)
65    }
66
67    async fn insert_pipeline(
68        &self,
69        name: &str,
70        content_type: &str,
71        pipeline: &str,
72        query_ctx: QueryContextRef,
73    ) -> ServerResult<PipelineInfo> {
74        self.pipeline_operator
75            .insert_pipeline(name, content_type, pipeline, query_ctx)
76            .await
77            .context(PipelineSnafu)
78    }
79
80    async fn delete_pipeline(
81        &self,
82        name: &str,
83        version: PipelineVersion,
84        ctx: QueryContextRef,
85    ) -> ServerResult<Option<()>> {
86        self.pipeline_operator
87            .delete_pipeline(name, version, ctx)
88            .await
89            .context(PipelineSnafu)
90    }
91
92    async fn get_table(
93        &self,
94        table: &str,
95        query_ctx: &QueryContext,
96    ) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
97        let catalog = query_ctx.current_catalog();
98        let schema = query_ctx.current_schema();
99        self.catalog_manager
100            .table(catalog, &schema, table, None)
101            .await
102    }
103
104    fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline> {
105        PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
106    }
107
108    async fn get_pipeline_str(
109        &self,
110        name: &str,
111        version: PipelineVersion,
112        query_ctx: QueryContextRef,
113    ) -> ServerResult<(String, TimestampNanosecond)> {
114        self.pipeline_operator
115            .get_pipeline_str(name, version, query_ctx)
116            .await
117            .context(PipelineSnafu)
118    }
119}
120
121impl Instance {
122    pub async fn handle_log_inserts(
123        &self,
124        log: RowInsertRequests,
125        ctx: QueryContextRef,
126    ) -> ServerResult<Output> {
127        let _guard = if let Some(limiter) = &self.limiter {
128            let result = limiter.limit_row_inserts(&log);
129            if result.is_none() {
130                return InFlightWriteBytesExceededSnafu.fail();
131            }
132            result
133        } else {
134            None
135        };
136
137        self.inserter
138            .handle_log_inserts(log, ctx, self.statement_executor.as_ref())
139            .await
140            .map_err(BoxedError::new)
141            .context(ExecuteGrpcRequestSnafu)
142    }
143
144    pub async fn handle_trace_inserts(
145        &self,
146        rows: RowInsertRequests,
147        ctx: QueryContextRef,
148    ) -> ServerResult<Output> {
149        let _guard = if let Some(limiter) = &self.limiter {
150            let result = limiter.limit_row_inserts(&rows);
151            if result.is_none() {
152                return InFlightWriteBytesExceededSnafu.fail();
153            }
154            result
155        } else {
156            None
157        };
158
159        self.inserter
160            .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
161            .await
162            .map_err(BoxedError::new)
163            .context(ExecuteGrpcRequestSnafu)
164    }
165}