servers/
interceptor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::borrow::Cow;
16use std::sync::Arc;
17
18use api::prom_store::remote::ReadRequest;
19use api::v1::RowInsertRequests;
20use api::v1::greptime_request::Request;
21use async_trait::async_trait;
22use common_error::ext::ErrorExt;
23use common_query::Output;
24use datafusion_expr::LogicalPlan;
25use log_query::LogQuery;
26use query::parser::PromQuery;
27use session::context::QueryContextRef;
28use sql::statements::statement::Statement;
29use table::TableRef;
30use vrl::value::Value;
31
32/// SqlQueryInterceptor can track life cycle of a sql query and customize or
33/// abort its execution at given point.
34pub trait SqlQueryInterceptor {
35    type Error: ErrorExt;
36
37    /// Called before a query string is parsed into sql statements.
38    /// The implementation is allowed to change the sql string if needed.
39    fn pre_parsing<'a>(
40        &self,
41        query: &'a str,
42        _query_ctx: QueryContextRef,
43    ) -> Result<Cow<'a, str>, Self::Error> {
44        Ok(Cow::Borrowed(query))
45    }
46
47    /// Called after sql is parsed into statements. This interceptor is called
48    /// on each statement and the implementation can alter the statement or
49    /// abort execution by raising an error.
50    fn post_parsing(
51        &self,
52        statements: Vec<Statement>,
53        _query_ctx: QueryContextRef,
54    ) -> Result<Vec<Statement>, Self::Error> {
55        Ok(statements)
56    }
57
58    /// Called before sql is actually executed. This hook is not called at the moment.
59    fn pre_execute(
60        &self,
61        _statement: &Statement,
62        _plan: Option<&LogicalPlan>,
63        _query_ctx: QueryContextRef,
64    ) -> Result<(), Self::Error> {
65        Ok(())
66    }
67
68    /// Called after execution finished. The implementation can modify the
69    /// output if needed.
70    fn post_execute(
71        &self,
72        output: Output,
73        _query_ctx: QueryContextRef,
74    ) -> Result<Output, Self::Error> {
75        Ok(output)
76    }
77}
78
79pub type SqlQueryInterceptorRef<E> =
80    Arc<dyn SqlQueryInterceptor<Error = E> + Send + Sync + 'static>;
81
82impl<E> SqlQueryInterceptor for Option<&SqlQueryInterceptorRef<E>>
83where
84    E: ErrorExt,
85{
86    type Error = E;
87
88    fn pre_parsing<'a>(
89        &self,
90        query: &'a str,
91        query_ctx: QueryContextRef,
92    ) -> Result<Cow<'a, str>, Self::Error> {
93        if let Some(this) = self {
94            this.pre_parsing(query, query_ctx)
95        } else {
96            Ok(Cow::Borrowed(query))
97        }
98    }
99
100    fn post_parsing(
101        &self,
102        statements: Vec<Statement>,
103        query_ctx: QueryContextRef,
104    ) -> Result<Vec<Statement>, Self::Error> {
105        if let Some(this) = self {
106            this.post_parsing(statements, query_ctx)
107        } else {
108            Ok(statements)
109        }
110    }
111
112    fn pre_execute(
113        &self,
114        statement: &Statement,
115        plan: Option<&LogicalPlan>,
116        query_ctx: QueryContextRef,
117    ) -> Result<(), Self::Error> {
118        if let Some(this) = self {
119            this.pre_execute(statement, plan, query_ctx)
120        } else {
121            Ok(())
122        }
123    }
124
125    fn post_execute(
126        &self,
127        output: Output,
128        query_ctx: QueryContextRef,
129    ) -> Result<Output, Self::Error> {
130        if let Some(this) = self {
131            this.post_execute(output, query_ctx)
132        } else {
133            Ok(output)
134        }
135    }
136}
137
138/// GrpcQueryInterceptor can track life cycle of a grpc request and customize or
139/// abort its execution at given point.
140pub trait GrpcQueryInterceptor {
141    type Error: ErrorExt;
142
143    /// Called before request is actually executed.
144    fn pre_execute(
145        &self,
146        _request: &Request,
147        _query_ctx: QueryContextRef,
148    ) -> Result<(), Self::Error> {
149        Ok(())
150    }
151
152    /// Called before bulk insert is executed.
153    fn pre_bulk_insert(
154        &self,
155        _table: TableRef,
156        _query_ctx: QueryContextRef,
157    ) -> Result<(), Self::Error> {
158        Ok(())
159    }
160
161    /// Called after execution finished. The implementation can modify the
162    /// output if needed.
163    fn post_execute(
164        &self,
165        output: Output,
166        _query_ctx: QueryContextRef,
167    ) -> Result<Output, Self::Error> {
168        Ok(output)
169    }
170}
171
172pub type GrpcQueryInterceptorRef<E> =
173    Arc<dyn GrpcQueryInterceptor<Error = E> + Send + Sync + 'static>;
174
175impl<E> GrpcQueryInterceptor for Option<&GrpcQueryInterceptorRef<E>>
176where
177    E: ErrorExt,
178{
179    type Error = E;
180
181    fn pre_execute(
182        &self,
183        _request: &Request,
184        _query_ctx: QueryContextRef,
185    ) -> Result<(), Self::Error> {
186        if let Some(this) = self {
187            this.pre_execute(_request, _query_ctx)
188        } else {
189            Ok(())
190        }
191    }
192
193    fn pre_bulk_insert(
194        &self,
195        _table: TableRef,
196        _query_ctx: QueryContextRef,
197    ) -> Result<(), Self::Error> {
198        if let Some(this) = self {
199            this.pre_bulk_insert(_table, _query_ctx)
200        } else {
201            Ok(())
202        }
203    }
204
205    fn post_execute(
206        &self,
207        output: Output,
208        _query_ctx: QueryContextRef,
209    ) -> Result<Output, Self::Error> {
210        if let Some(this) = self {
211            this.post_execute(output, _query_ctx)
212        } else {
213            Ok(output)
214        }
215    }
216}
217
218/// PromQueryInterceptor can track life cycle of a prometheus request and customize or
219/// abort its execution at given point.
220pub trait PromQueryInterceptor {
221    type Error: ErrorExt;
222
223    /// Called before request is actually executed.
224    fn pre_execute(
225        &self,
226        _query: &PromQuery,
227        _plan: Option<&LogicalPlan>,
228        _query_ctx: QueryContextRef,
229    ) -> Result<(), Self::Error> {
230        Ok(())
231    }
232
233    /// Called after execution finished. The implementation can modify the
234    /// output if needed.
235    fn post_execute(
236        &self,
237        output: Output,
238        _query_ctx: QueryContextRef,
239    ) -> Result<Output, Self::Error> {
240        Ok(output)
241    }
242}
243
244pub type PromQueryInterceptorRef<E> =
245    Arc<dyn PromQueryInterceptor<Error = E> + Send + Sync + 'static>;
246
247impl<E> PromQueryInterceptor for Option<PromQueryInterceptorRef<E>>
248where
249    E: ErrorExt,
250{
251    type Error = E;
252
253    fn pre_execute(
254        &self,
255        query: &PromQuery,
256        plan: Option<&LogicalPlan>,
257        query_ctx: QueryContextRef,
258    ) -> Result<(), Self::Error> {
259        if let Some(this) = self {
260            this.pre_execute(query, plan, query_ctx)
261        } else {
262            Ok(())
263        }
264    }
265
266    fn post_execute(
267        &self,
268        output: Output,
269        query_ctx: QueryContextRef,
270    ) -> Result<Output, Self::Error> {
271        if let Some(this) = self {
272            this.post_execute(output, query_ctx)
273        } else {
274            Ok(output)
275        }
276    }
277}
278
279/// LineProtocolInterceptor can track life cycle of a line protocol request
280/// and customize or abort its execution at given point.
281#[async_trait]
282pub trait LineProtocolInterceptor {
283    type Error: ErrorExt;
284
285    fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
286        Ok(())
287    }
288
289    /// Called after the lines are converted to the [RowInsertRequests].
290    /// We can then modify the resulting requests if needed.
291    /// Typically used in some backward compatibility situation.
292    async fn post_lines_conversion(
293        &self,
294        requests: RowInsertRequests,
295        query_context: QueryContextRef,
296    ) -> Result<RowInsertRequests, Self::Error> {
297        let _ = query_context;
298        Ok(requests)
299    }
300}
301
302pub type LineProtocolInterceptorRef<E> =
303    Arc<dyn LineProtocolInterceptor<Error = E> + Send + Sync + 'static>;
304
305#[async_trait]
306impl<E: ErrorExt> LineProtocolInterceptor for Option<LineProtocolInterceptorRef<E>> {
307    type Error = E;
308
309    fn pre_execute(&self, line: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
310        if let Some(this) = self {
311            this.pre_execute(line, query_ctx)
312        } else {
313            Ok(())
314        }
315    }
316
317    async fn post_lines_conversion(
318        &self,
319        requests: RowInsertRequests,
320        query_context: QueryContextRef,
321    ) -> Result<RowInsertRequests, Self::Error> {
322        if let Some(this) = self {
323            this.post_lines_conversion(requests, query_context).await
324        } else {
325            Ok(requests)
326        }
327    }
328}
329
330/// OpenTelemetryProtocolInterceptor can track life cycle of an open telemetry protocol request
331/// and customize or abort its execution at given point.
332pub trait OpenTelemetryProtocolInterceptor {
333    type Error: ErrorExt;
334
335    fn pre_execute(&self, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
336        Ok(())
337    }
338}
339
340pub type OpenTelemetryProtocolInterceptorRef<E> =
341    Arc<dyn OpenTelemetryProtocolInterceptor<Error = E> + Send + Sync + 'static>;
342
343impl<E: ErrorExt> OpenTelemetryProtocolInterceptor
344    for Option<OpenTelemetryProtocolInterceptorRef<E>>
345{
346    type Error = E;
347
348    fn pre_execute(&self, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
349        if let Some(this) = self {
350            this.pre_execute(query_ctx)
351        } else {
352            Ok(())
353        }
354    }
355}
356
357/// PromStoreProtocolInterceptor can track life cycle of a prom store request
358/// and customize or abort its execution at given point.
359pub trait PromStoreProtocolInterceptor {
360    type Error: ErrorExt;
361
362    fn pre_write(
363        &self,
364        _write_req: &RowInsertRequests,
365        _ctx: QueryContextRef,
366    ) -> Result<(), Self::Error> {
367        Ok(())
368    }
369
370    fn pre_read(&self, _read_req: &ReadRequest, _ctx: QueryContextRef) -> Result<(), Self::Error> {
371        Ok(())
372    }
373}
374
375pub type PromStoreProtocolInterceptorRef<E> =
376    Arc<dyn PromStoreProtocolInterceptor<Error = E> + Send + Sync + 'static>;
377
378impl<E: ErrorExt> PromStoreProtocolInterceptor for Option<PromStoreProtocolInterceptorRef<E>> {
379    type Error = E;
380
381    fn pre_write(
382        &self,
383        write_req: &RowInsertRequests,
384        ctx: QueryContextRef,
385    ) -> Result<(), Self::Error> {
386        if let Some(this) = self {
387            this.pre_write(write_req, ctx)
388        } else {
389            Ok(())
390        }
391    }
392
393    fn pre_read(&self, read_req: &ReadRequest, ctx: QueryContextRef) -> Result<(), Self::Error> {
394        if let Some(this) = self {
395            this.pre_read(read_req, ctx)
396        } else {
397            Ok(())
398        }
399    }
400}
401
402/// LogIngestInterceptor can track life cycle of a log ingestion request
403/// and customize or abort its execution at given point.
404pub trait LogIngestInterceptor {
405    type Error: ErrorExt;
406
407    /// Called before pipeline execution.
408    fn pre_pipeline(
409        &self,
410        values: Vec<Value>,
411        _query_ctx: QueryContextRef,
412    ) -> Result<Vec<Value>, Self::Error> {
413        Ok(values)
414    }
415
416    /// Called before insertion.
417    fn pre_ingest(
418        &self,
419        request: RowInsertRequests,
420        _query_ctx: QueryContextRef,
421    ) -> Result<RowInsertRequests, Self::Error> {
422        Ok(request)
423    }
424}
425
426pub type LogIngestInterceptorRef<E> =
427    Arc<dyn LogIngestInterceptor<Error = E> + Send + Sync + 'static>;
428
429impl<E> LogIngestInterceptor for Option<&LogIngestInterceptorRef<E>>
430where
431    E: ErrorExt,
432{
433    type Error = E;
434
435    fn pre_pipeline(
436        &self,
437        values: Vec<Value>,
438        query_ctx: QueryContextRef,
439    ) -> Result<Vec<Value>, Self::Error> {
440        if let Some(this) = self {
441            this.pre_pipeline(values, query_ctx)
442        } else {
443            Ok(values)
444        }
445    }
446
447    fn pre_ingest(
448        &self,
449        request: RowInsertRequests,
450        query_ctx: QueryContextRef,
451    ) -> Result<RowInsertRequests, Self::Error> {
452        if let Some(this) = self {
453            this.pre_ingest(request, query_ctx)
454        } else {
455            Ok(request)
456        }
457    }
458}
459
460/// LogQueryInterceptor can track life cycle of a log query request
461/// and customize or abort its execution at given point.
462pub trait LogQueryInterceptor {
463    type Error: ErrorExt;
464
465    /// Called before query is actually executed.
466    fn pre_query(&self, _query: &LogQuery, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
467        Ok(())
468    }
469
470    /// Called after execution finished. The implementation can modify the
471    /// output if needed.
472    fn post_query(
473        &self,
474        output: Output,
475        _query_ctx: QueryContextRef,
476    ) -> Result<Output, Self::Error> {
477        Ok(output)
478    }
479}
480
481pub type LogQueryInterceptorRef<E> =
482    Arc<dyn LogQueryInterceptor<Error = E> + Send + Sync + 'static>;
483
484impl<E> LogQueryInterceptor for Option<&LogQueryInterceptorRef<E>>
485where
486    E: ErrorExt,
487{
488    type Error = E;
489
490    fn pre_query(&self, query: &LogQuery, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
491        if let Some(this) = self {
492            this.pre_query(query, query_ctx)
493        } else {
494            Ok(())
495        }
496    }
497
498    fn post_query(
499        &self,
500        output: Output,
501        query_ctx: QueryContextRef,
502    ) -> Result<Output, Self::Error> {
503        if let Some(this) = self {
504            this.post_query(output, query_ctx)
505        } else {
506            Ok(output)
507        }
508    }
509}