servers/
interceptor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::sync::Arc;
17
18use api::prom_store::remote::ReadRequest;
19use api::v1::greptime_request::Request;
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use common_error::ext::ErrorExt;
23use common_query::Output;
24use datafusion_expr::LogicalPlan;
25use log_query::LogQuery;
26use pipeline::PipelineMap;
27use query::parser::PromQuery;
28use session::context::QueryContextRef;
29use sql::statements::statement::Statement;
30
31/// SqlQueryInterceptor can track life cycle of a sql query and customize or
32/// abort its execution at given point.
33pub trait SqlQueryInterceptor {
34    type Error: ErrorExt;
35
36    /// Called before a query string is parsed into sql statements.
37    /// The implementation is allowed to change the sql string if needed.
38    fn pre_parsing<'a>(
39        &self,
40        query: &'a str,
41        _query_ctx: QueryContextRef,
42    ) -> Result<Cow<'a, str>, Self::Error> {
43        Ok(Cow::Borrowed(query))
44    }
45
46    /// Called after sql is parsed into statements. This interceptor is called
47    /// on each statement and the implementation can alter the statement or
48    /// abort execution by raising an error.
49    fn post_parsing(
50        &self,
51        statements: Vec<Statement>,
52        _query_ctx: QueryContextRef,
53    ) -> Result<Vec<Statement>, Self::Error> {
54        Ok(statements)
55    }
56
57    /// Called before sql is actually executed. This hook is not called at the moment.
58    fn pre_execute(
59        &self,
60        _statement: &Statement,
61        _plan: Option<&LogicalPlan>,
62        _query_ctx: QueryContextRef,
63    ) -> Result<(), Self::Error> {
64        Ok(())
65    }
66
67    /// Called after execution finished. The implementation can modify the
68    /// output if needed.
69    fn post_execute(
70        &self,
71        output: Output,
72        _query_ctx: QueryContextRef,
73    ) -> Result<Output, Self::Error> {
74        Ok(output)
75    }
76}
77
78pub type SqlQueryInterceptorRef<E> =
79    Arc<dyn SqlQueryInterceptor<Error = E> + Send + Sync + 'static>;
80
81impl<E> SqlQueryInterceptor for Option<&SqlQueryInterceptorRef<E>>
82where
83    E: ErrorExt,
84{
85    type Error = E;
86
87    fn pre_parsing<'a>(
88        &self,
89        query: &'a str,
90        query_ctx: QueryContextRef,
91    ) -> Result<Cow<'a, str>, Self::Error> {
92        if let Some(this) = self {
93            this.pre_parsing(query, query_ctx)
94        } else {
95            Ok(Cow::Borrowed(query))
96        }
97    }
98
99    fn post_parsing(
100        &self,
101        statements: Vec<Statement>,
102        query_ctx: QueryContextRef,
103    ) -> Result<Vec<Statement>, Self::Error> {
104        if let Some(this) = self {
105            this.post_parsing(statements, query_ctx)
106        } else {
107            Ok(statements)
108        }
109    }
110
111    fn pre_execute(
112        &self,
113        statement: &Statement,
114        plan: Option<&LogicalPlan>,
115        query_ctx: QueryContextRef,
116    ) -> Result<(), Self::Error> {
117        if let Some(this) = self {
118            this.pre_execute(statement, plan, query_ctx)
119        } else {
120            Ok(())
121        }
122    }
123
124    fn post_execute(
125        &self,
126        output: Output,
127        query_ctx: QueryContextRef,
128    ) -> Result<Output, Self::Error> {
129        if let Some(this) = self {
130            this.post_execute(output, query_ctx)
131        } else {
132            Ok(output)
133        }
134    }
135}
136
137/// GrpcQueryInterceptor can track life cycle of a grpc request and customize or
138/// abort its execution at given point.
139pub trait GrpcQueryInterceptor {
140    type Error: ErrorExt;
141
142    /// Called before request is actually executed.
143    fn pre_execute(
144        &self,
145        _request: &Request,
146        _query_ctx: QueryContextRef,
147    ) -> Result<(), Self::Error> {
148        Ok(())
149    }
150
151    /// Called after execution finished. The implementation can modify the
152    /// output if needed.
153    fn post_execute(
154        &self,
155        output: Output,
156        _query_ctx: QueryContextRef,
157    ) -> Result<Output, Self::Error> {
158        Ok(output)
159    }
160}
161
162pub type GrpcQueryInterceptorRef<E> =
163    Arc<dyn GrpcQueryInterceptor<Error = E> + Send + Sync + 'static>;
164
165impl<E> GrpcQueryInterceptor for Option<&GrpcQueryInterceptorRef<E>>
166where
167    E: ErrorExt,
168{
169    type Error = E;
170
171    fn pre_execute(
172        &self,
173        _request: &Request,
174        _query_ctx: QueryContextRef,
175    ) -> Result<(), Self::Error> {
176        if let Some(this) = self {
177            this.pre_execute(_request, _query_ctx)
178        } else {
179            Ok(())
180        }
181    }
182
183    fn post_execute(
184        &self,
185        output: Output,
186        _query_ctx: QueryContextRef,
187    ) -> Result<Output, Self::Error> {
188        if let Some(this) = self {
189            this.post_execute(output, _query_ctx)
190        } else {
191            Ok(output)
192        }
193    }
194}
195
196/// PromQueryInterceptor can track life cycle of a prometheus request and customize or
197/// abort its execution at given point.
198pub trait PromQueryInterceptor {
199    type Error: ErrorExt;
200
201    /// Called before request is actually executed.
202    fn pre_execute(
203        &self,
204        _query: &PromQuery,
205        _plan: Option<&LogicalPlan>,
206        _query_ctx: QueryContextRef,
207    ) -> Result<(), Self::Error> {
208        Ok(())
209    }
210
211    /// Called after execution finished. The implementation can modify the
212    /// output if needed.
213    fn post_execute(
214        &self,
215        output: Output,
216        _query_ctx: QueryContextRef,
217    ) -> Result<Output, Self::Error> {
218        Ok(output)
219    }
220}
221
222pub type PromQueryInterceptorRef<E> =
223    Arc<dyn PromQueryInterceptor<Error = E> + Send + Sync + 'static>;
224
225impl<E> PromQueryInterceptor for Option<PromQueryInterceptorRef<E>>
226where
227    E: ErrorExt,
228{
229    type Error = E;
230
231    fn pre_execute(
232        &self,
233        query: &PromQuery,
234        plan: Option<&LogicalPlan>,
235        query_ctx: QueryContextRef,
236    ) -> Result<(), Self::Error> {
237        if let Some(this) = self {
238            this.pre_execute(query, plan, query_ctx)
239        } else {
240            Ok(())
241        }
242    }
243
244    fn post_execute(
245        &self,
246        output: Output,
247        query_ctx: QueryContextRef,
248    ) -> Result<Output, Self::Error> {
249        if let Some(this) = self {
250            this.post_execute(output, query_ctx)
251        } else {
252            Ok(output)
253        }
254    }
255}
256
257/// LineProtocolInterceptor can track life cycle of a line protocol request
258/// and customize or abort its execution at given point.
259#[async_trait]
260pub trait LineProtocolInterceptor {
261    type Error: ErrorExt;
262
263    fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
264        Ok(())
265    }
266
267    /// Called after the lines are converted to the [RowInsertRequests].
268    /// We can then modify the resulting requests if needed.
269    /// Typically used in some backward compatibility situation.
270    async fn post_lines_conversion(
271        &self,
272        requests: RowInsertRequests,
273        query_context: QueryContextRef,
274    ) -> Result<RowInsertRequests, Self::Error> {
275        let _ = query_context;
276        Ok(requests)
277    }
278}
279
280pub type LineProtocolInterceptorRef<E> =
281    Arc<dyn LineProtocolInterceptor<Error = E> + Send + Sync + 'static>;
282
283#[async_trait]
284impl<E: ErrorExt> LineProtocolInterceptor for Option<LineProtocolInterceptorRef<E>> {
285    type Error = E;
286
287    fn pre_execute(&self, line: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
288        if let Some(this) = self {
289            this.pre_execute(line, query_ctx)
290        } else {
291            Ok(())
292        }
293    }
294
295    async fn post_lines_conversion(
296        &self,
297        requests: RowInsertRequests,
298        query_context: QueryContextRef,
299    ) -> Result<RowInsertRequests, Self::Error> {
300        if let Some(this) = self {
301            this.post_lines_conversion(requests, query_context).await
302        } else {
303            Ok(requests)
304        }
305    }
306}
307
308/// OpenTelemetryProtocolInterceptor can track life cycle of an open telemetry protocol request
309/// and customize or abort its execution at given point.
310pub trait OpenTelemetryProtocolInterceptor {
311    type Error: ErrorExt;
312
313    fn pre_execute(&self, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
314        Ok(())
315    }
316}
317
318pub type OpenTelemetryProtocolInterceptorRef<E> =
319    Arc<dyn OpenTelemetryProtocolInterceptor<Error = E> + Send + Sync + 'static>;
320
321impl<E: ErrorExt> OpenTelemetryProtocolInterceptor
322    for Option<OpenTelemetryProtocolInterceptorRef<E>>
323{
324    type Error = E;
325
326    fn pre_execute(&self, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
327        if let Some(this) = self {
328            this.pre_execute(query_ctx)
329        } else {
330            Ok(())
331        }
332    }
333}
334
335/// PromStoreProtocolInterceptor can track life cycle of a prom store request
336/// and customize or abort its execution at given point.
337pub trait PromStoreProtocolInterceptor {
338    type Error: ErrorExt;
339
340    fn pre_write(
341        &self,
342        _write_req: &RowInsertRequests,
343        _ctx: QueryContextRef,
344    ) -> Result<(), Self::Error> {
345        Ok(())
346    }
347
348    fn pre_read(&self, _read_req: &ReadRequest, _ctx: QueryContextRef) -> Result<(), Self::Error> {
349        Ok(())
350    }
351}
352
353pub type PromStoreProtocolInterceptorRef<E> =
354    Arc<dyn PromStoreProtocolInterceptor<Error = E> + Send + Sync + 'static>;
355
356impl<E: ErrorExt> PromStoreProtocolInterceptor for Option<PromStoreProtocolInterceptorRef<E>> {
357    type Error = E;
358
359    fn pre_write(
360        &self,
361        write_req: &RowInsertRequests,
362        ctx: QueryContextRef,
363    ) -> Result<(), Self::Error> {
364        if let Some(this) = self {
365            this.pre_write(write_req, ctx)
366        } else {
367            Ok(())
368        }
369    }
370
371    fn pre_read(&self, read_req: &ReadRequest, ctx: QueryContextRef) -> Result<(), Self::Error> {
372        if let Some(this) = self {
373            this.pre_read(read_req, ctx)
374        } else {
375            Ok(())
376        }
377    }
378}
379
380/// LogIngestInterceptor can track life cycle of a log ingestion request
381/// and customize or abort its execution at given point.
382pub trait LogIngestInterceptor {
383    type Error: ErrorExt;
384
385    /// Called before pipeline execution.
386    fn pre_pipeline(
387        &self,
388        values: Vec<PipelineMap>,
389        _query_ctx: QueryContextRef,
390    ) -> Result<Vec<PipelineMap>, Self::Error> {
391        Ok(values)
392    }
393
394    /// Called before insertion.
395    fn pre_ingest(
396        &self,
397        request: RowInsertRequests,
398        _query_ctx: QueryContextRef,
399    ) -> Result<RowInsertRequests, Self::Error> {
400        Ok(request)
401    }
402}
403
404pub type LogIngestInterceptorRef<E> =
405    Arc<dyn LogIngestInterceptor<Error = E> + Send + Sync + 'static>;
406
407impl<E> LogIngestInterceptor for Option<&LogIngestInterceptorRef<E>>
408where
409    E: ErrorExt,
410{
411    type Error = E;
412
413    fn pre_pipeline(
414        &self,
415        values: Vec<PipelineMap>,
416        query_ctx: QueryContextRef,
417    ) -> Result<Vec<PipelineMap>, Self::Error> {
418        if let Some(this) = self {
419            this.pre_pipeline(values, query_ctx)
420        } else {
421            Ok(values)
422        }
423    }
424
425    fn pre_ingest(
426        &self,
427        request: RowInsertRequests,
428        query_ctx: QueryContextRef,
429    ) -> Result<RowInsertRequests, Self::Error> {
430        if let Some(this) = self {
431            this.pre_ingest(request, query_ctx)
432        } else {
433            Ok(request)
434        }
435    }
436}
437
438/// LogQueryInterceptor can track life cycle of a log query request
439/// and customize or abort its execution at given point.
440pub trait LogQueryInterceptor {
441    type Error: ErrorExt;
442
443    /// Called before query is actually executed.
444    fn pre_query(&self, _query: &LogQuery, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
445        Ok(())
446    }
447
448    /// Called after execution finished. The implementation can modify the
449    /// output if needed.
450    fn post_query(
451        &self,
452        output: Output,
453        _query_ctx: QueryContextRef,
454    ) -> Result<Output, Self::Error> {
455        Ok(output)
456    }
457}
458
459pub type LogQueryInterceptorRef<E> =
460    Arc<dyn LogQueryInterceptor<Error = E> + Send + Sync + 'static>;
461
462impl<E> LogQueryInterceptor for Option<&LogQueryInterceptorRef<E>>
463where
464    E: ErrorExt,
465{
466    type Error = E;
467
468    fn pre_query(&self, query: &LogQuery, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
469        if let Some(this) = self {
470            this.pre_query(query, query_ctx)
471        } else {
472            Ok(())
473        }
474    }
475
476    fn post_query(
477        &self,
478        output: Output,
479        query_ctx: QueryContextRef,
480    ) -> Result<Output, Self::Error> {
481        if let Some(this) = self {
482            this.post_query(output, query_ctx)
483        } else {
484            Ok(output)
485        }
486    }
487}