frontend/instance/
log_handler.rs1use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use async_trait::async_trait;
19use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
20use client::Output;
21use common_error::ext::BoxedError;
22use datatypes::timestamp::TimestampNanosecond;
23use pipeline::pipeline_operator::PipelineOperator;
24use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
25use servers::error::{
26 AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu,
27 Result as ServerResult,
28};
29use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
30use servers::query_handler::PipelineHandler;
31use session::context::{QueryContext, QueryContextRef};
32use snafu::ResultExt;
33use table::Table;
34
35use crate::instance::Instance;
36
37#[async_trait]
38impl PipelineHandler for Instance {
39 async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
40 self.plugins
41 .get::<PermissionCheckerRef>()
42 .as_ref()
43 .check_permission(ctx.current_user(), PermissionReq::LogWrite)
44 .context(AuthSnafu)?;
45
46 let log = self
47 .plugins
48 .get::<LogIngestInterceptorRef<ServerError>>()
49 .as_ref()
50 .pre_ingest(log, ctx.clone())?;
51
52 self.handle_log_inserts(log, ctx).await
53 }
54
55 async fn get_pipeline(
56 &self,
57 name: &str,
58 version: PipelineVersion,
59 query_ctx: QueryContextRef,
60 ) -> ServerResult<Arc<Pipeline>> {
61 self.pipeline_operator
62 .get_pipeline(query_ctx, name, version)
63 .await
64 .context(PipelineSnafu)
65 }
66
67 async fn insert_pipeline(
68 &self,
69 name: &str,
70 content_type: &str,
71 pipeline: &str,
72 query_ctx: QueryContextRef,
73 ) -> ServerResult<PipelineInfo> {
74 self.pipeline_operator
75 .insert_pipeline(name, content_type, pipeline, query_ctx)
76 .await
77 .context(PipelineSnafu)
78 }
79
80 async fn delete_pipeline(
81 &self,
82 name: &str,
83 version: PipelineVersion,
84 ctx: QueryContextRef,
85 ) -> ServerResult<Option<()>> {
86 self.pipeline_operator
87 .delete_pipeline(name, version, ctx)
88 .await
89 .context(PipelineSnafu)
90 }
91
92 async fn get_table(
93 &self,
94 table: &str,
95 query_ctx: &QueryContext,
96 ) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
97 let catalog = query_ctx.current_catalog();
98 let schema = query_ctx.current_schema();
99 self.catalog_manager
100 .table(catalog, &schema, table, None)
101 .await
102 }
103
104 fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline> {
105 PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
106 }
107
108 async fn get_pipeline_str(
109 &self,
110 name: &str,
111 version: PipelineVersion,
112 query_ctx: QueryContextRef,
113 ) -> ServerResult<(String, TimestampNanosecond)> {
114 self.pipeline_operator
115 .get_pipeline_str(name, version, query_ctx)
116 .await
117 .context(PipelineSnafu)
118 }
119}
120
121impl Instance {
122 pub async fn handle_log_inserts(
123 &self,
124 log: RowInsertRequests,
125 ctx: QueryContextRef,
126 ) -> ServerResult<Output> {
127 let _guard = if let Some(limiter) = &self.limiter {
128 Some(
129 limiter
130 .limit_row_inserts(&log)
131 .await
132 .map_err(BoxedError::new)
133 .context(OtherSnafu)?,
134 )
135 } else {
136 None
137 };
138
139 self.inserter
140 .handle_log_inserts(log, ctx, self.statement_executor.as_ref())
141 .await
142 .map_err(BoxedError::new)
143 .context(ExecuteGrpcRequestSnafu)
144 }
145
146 pub async fn handle_trace_inserts(
147 &self,
148 rows: RowInsertRequests,
149 ctx: QueryContextRef,
150 ) -> ServerResult<Output> {
151 let _guard = if let Some(limiter) = &self.limiter {
152 Some(
153 limiter
154 .limit_row_inserts(&rows)
155 .await
156 .map_err(BoxedError::new)
157 .context(OtherSnafu)?,
158 )
159 } else {
160 None
161 };
162
163 self.inserter
164 .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
165 .await
166 .map_err(BoxedError::new)
167 .context(ExecuteGrpcRequestSnafu)
168 }
169}