frontend/instance/
log_handler.rs1use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use async_trait::async_trait;
19use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
20use client::Output;
21use common_error::ext::BoxedError;
22use datatypes::timestamp::TimestampNanosecond;
23use pipeline::pipeline_operator::PipelineOperator;
24use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
25use servers::error::{
26 AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu,
27 PipelineSnafu, Result as ServerResult,
28};
29use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
30use servers::query_handler::PipelineHandler;
31use session::context::{QueryContext, QueryContextRef};
32use snafu::ResultExt;
33use table::Table;
34
35use crate::instance::Instance;
36
37#[async_trait]
38impl PipelineHandler for Instance {
39 async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
40 self.plugins
41 .get::<PermissionCheckerRef>()
42 .as_ref()
43 .check_permission(ctx.current_user(), PermissionReq::LogWrite)
44 .context(AuthSnafu)?;
45
46 let log = self
47 .plugins
48 .get::<LogIngestInterceptorRef<ServerError>>()
49 .as_ref()
50 .pre_ingest(log, ctx.clone())?;
51
52 self.handle_log_inserts(log, ctx).await
53 }
54
55 async fn get_pipeline(
56 &self,
57 name: &str,
58 version: PipelineVersion,
59 query_ctx: QueryContextRef,
60 ) -> ServerResult<Arc<Pipeline>> {
61 self.pipeline_operator
62 .get_pipeline(query_ctx, name, version)
63 .await
64 .context(PipelineSnafu)
65 }
66
67 async fn insert_pipeline(
68 &self,
69 name: &str,
70 content_type: &str,
71 pipeline: &str,
72 query_ctx: QueryContextRef,
73 ) -> ServerResult<PipelineInfo> {
74 self.pipeline_operator
75 .insert_pipeline(name, content_type, pipeline, query_ctx)
76 .await
77 .context(PipelineSnafu)
78 }
79
80 async fn delete_pipeline(
81 &self,
82 name: &str,
83 version: PipelineVersion,
84 ctx: QueryContextRef,
85 ) -> ServerResult<Option<()>> {
86 self.pipeline_operator
87 .delete_pipeline(name, version, ctx)
88 .await
89 .context(PipelineSnafu)
90 }
91
92 async fn get_table(
93 &self,
94 table: &str,
95 query_ctx: &QueryContext,
96 ) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
97 let catalog = query_ctx.current_catalog();
98 let schema = query_ctx.current_schema();
99 self.catalog_manager
100 .table(catalog, &schema, table, None)
101 .await
102 }
103
104 fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline> {
105 PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
106 }
107
108 async fn get_pipeline_str(
109 &self,
110 name: &str,
111 version: PipelineVersion,
112 query_ctx: QueryContextRef,
113 ) -> ServerResult<(String, TimestampNanosecond)> {
114 self.pipeline_operator
115 .get_pipeline_str(name, version, query_ctx)
116 .await
117 .context(PipelineSnafu)
118 }
119}
120
121impl Instance {
122 pub async fn handle_log_inserts(
123 &self,
124 log: RowInsertRequests,
125 ctx: QueryContextRef,
126 ) -> ServerResult<Output> {
127 let _guard = if let Some(limiter) = &self.limiter {
128 let result = limiter.limit_row_inserts(&log);
129 if result.is_none() {
130 return InFlightWriteBytesExceededSnafu.fail();
131 }
132 result
133 } else {
134 None
135 };
136
137 self.inserter
138 .handle_log_inserts(log, ctx, self.statement_executor.as_ref())
139 .await
140 .map_err(BoxedError::new)
141 .context(ExecuteGrpcRequestSnafu)
142 }
143
144 pub async fn handle_trace_inserts(
145 &self,
146 rows: RowInsertRequests,
147 ctx: QueryContextRef,
148 ) -> ServerResult<Output> {
149 let _guard = if let Some(limiter) = &self.limiter {
150 let result = limiter.limit_row_inserts(&rows);
151 if result.is_none() {
152 return InFlightWriteBytesExceededSnafu.fail();
153 }
154 result
155 } else {
156 None
157 };
158
159 self.inserter
160 .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
161 .await
162 .map_err(BoxedError::new)
163 .context(ExecuteGrpcRequestSnafu)
164 }
165}