1use std::borrow::Cow;
16use std::sync::Arc;
17
18use api::prom_store::remote::ReadRequest;
19use api::v1::RowInsertRequests;
20use api::v1::greptime_request::Request;
21use async_trait::async_trait;
22use common_error::ext::ErrorExt;
23use common_query::Output;
24use datafusion_expr::LogicalPlan;
25use log_query::LogQuery;
26use query::parser::PromQuery;
27use session::context::QueryContextRef;
28use sql::statements::statement::Statement;
29use table::TableRef;
30use vrl::value::Value;
31
32pub trait SqlQueryInterceptor {
35 type Error: ErrorExt;
36
37 fn pre_parsing<'a>(
40 &self,
41 query: &'a str,
42 _query_ctx: QueryContextRef,
43 ) -> Result<Cow<'a, str>, Self::Error> {
44 Ok(Cow::Borrowed(query))
45 }
46
47 fn post_parsing(
51 &self,
52 statements: Vec<Statement>,
53 _query_ctx: QueryContextRef,
54 ) -> Result<Vec<Statement>, Self::Error> {
55 Ok(statements)
56 }
57
58 fn pre_execute(
60 &self,
61 _statement: &Statement,
62 _plan: Option<&LogicalPlan>,
63 _query_ctx: QueryContextRef,
64 ) -> Result<(), Self::Error> {
65 Ok(())
66 }
67
68 fn post_execute(
71 &self,
72 output: Output,
73 _query_ctx: QueryContextRef,
74 ) -> Result<Output, Self::Error> {
75 Ok(output)
76 }
77}
78
79pub type SqlQueryInterceptorRef<E> =
80 Arc<dyn SqlQueryInterceptor<Error = E> + Send + Sync + 'static>;
81
82impl<E> SqlQueryInterceptor for Option<&SqlQueryInterceptorRef<E>>
83where
84 E: ErrorExt,
85{
86 type Error = E;
87
88 fn pre_parsing<'a>(
89 &self,
90 query: &'a str,
91 query_ctx: QueryContextRef,
92 ) -> Result<Cow<'a, str>, Self::Error> {
93 if let Some(this) = self {
94 this.pre_parsing(query, query_ctx)
95 } else {
96 Ok(Cow::Borrowed(query))
97 }
98 }
99
100 fn post_parsing(
101 &self,
102 statements: Vec<Statement>,
103 query_ctx: QueryContextRef,
104 ) -> Result<Vec<Statement>, Self::Error> {
105 if let Some(this) = self {
106 this.post_parsing(statements, query_ctx)
107 } else {
108 Ok(statements)
109 }
110 }
111
112 fn pre_execute(
113 &self,
114 statement: &Statement,
115 plan: Option<&LogicalPlan>,
116 query_ctx: QueryContextRef,
117 ) -> Result<(), Self::Error> {
118 if let Some(this) = self {
119 this.pre_execute(statement, plan, query_ctx)
120 } else {
121 Ok(())
122 }
123 }
124
125 fn post_execute(
126 &self,
127 output: Output,
128 query_ctx: QueryContextRef,
129 ) -> Result<Output, Self::Error> {
130 if let Some(this) = self {
131 this.post_execute(output, query_ctx)
132 } else {
133 Ok(output)
134 }
135 }
136}
137
138pub trait GrpcQueryInterceptor {
141 type Error: ErrorExt;
142
143 fn pre_execute(
145 &self,
146 _request: &Request,
147 _query_ctx: QueryContextRef,
148 ) -> Result<(), Self::Error> {
149 Ok(())
150 }
151
152 fn pre_bulk_insert(
154 &self,
155 _table: TableRef,
156 _query_ctx: QueryContextRef,
157 ) -> Result<(), Self::Error> {
158 Ok(())
159 }
160
161 fn post_execute(
164 &self,
165 output: Output,
166 _query_ctx: QueryContextRef,
167 ) -> Result<Output, Self::Error> {
168 Ok(output)
169 }
170}
171
172pub type GrpcQueryInterceptorRef<E> =
173 Arc<dyn GrpcQueryInterceptor<Error = E> + Send + Sync + 'static>;
174
175impl<E> GrpcQueryInterceptor for Option<&GrpcQueryInterceptorRef<E>>
176where
177 E: ErrorExt,
178{
179 type Error = E;
180
181 fn pre_execute(
182 &self,
183 _request: &Request,
184 _query_ctx: QueryContextRef,
185 ) -> Result<(), Self::Error> {
186 if let Some(this) = self {
187 this.pre_execute(_request, _query_ctx)
188 } else {
189 Ok(())
190 }
191 }
192
193 fn pre_bulk_insert(
194 &self,
195 _table: TableRef,
196 _query_ctx: QueryContextRef,
197 ) -> Result<(), Self::Error> {
198 if let Some(this) = self {
199 this.pre_bulk_insert(_table, _query_ctx)
200 } else {
201 Ok(())
202 }
203 }
204
205 fn post_execute(
206 &self,
207 output: Output,
208 _query_ctx: QueryContextRef,
209 ) -> Result<Output, Self::Error> {
210 if let Some(this) = self {
211 this.post_execute(output, _query_ctx)
212 } else {
213 Ok(output)
214 }
215 }
216}
217
218pub trait PromQueryInterceptor {
221 type Error: ErrorExt;
222
223 fn pre_execute(
225 &self,
226 _query: &PromQuery,
227 _plan: Option<&LogicalPlan>,
228 _query_ctx: QueryContextRef,
229 ) -> Result<(), Self::Error> {
230 Ok(())
231 }
232
233 fn post_execute(
236 &self,
237 output: Output,
238 _query_ctx: QueryContextRef,
239 ) -> Result<Output, Self::Error> {
240 Ok(output)
241 }
242}
243
244pub type PromQueryInterceptorRef<E> =
245 Arc<dyn PromQueryInterceptor<Error = E> + Send + Sync + 'static>;
246
247impl<E> PromQueryInterceptor for Option<PromQueryInterceptorRef<E>>
248where
249 E: ErrorExt,
250{
251 type Error = E;
252
253 fn pre_execute(
254 &self,
255 query: &PromQuery,
256 plan: Option<&LogicalPlan>,
257 query_ctx: QueryContextRef,
258 ) -> Result<(), Self::Error> {
259 if let Some(this) = self {
260 this.pre_execute(query, plan, query_ctx)
261 } else {
262 Ok(())
263 }
264 }
265
266 fn post_execute(
267 &self,
268 output: Output,
269 query_ctx: QueryContextRef,
270 ) -> Result<Output, Self::Error> {
271 if let Some(this) = self {
272 this.post_execute(output, query_ctx)
273 } else {
274 Ok(output)
275 }
276 }
277}
278
279#[async_trait]
282pub trait LineProtocolInterceptor {
283 type Error: ErrorExt;
284
285 fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
286 Ok(())
287 }
288
289 async fn post_lines_conversion(
293 &self,
294 requests: RowInsertRequests,
295 query_context: QueryContextRef,
296 ) -> Result<RowInsertRequests, Self::Error> {
297 let _ = query_context;
298 Ok(requests)
299 }
300}
301
302pub type LineProtocolInterceptorRef<E> =
303 Arc<dyn LineProtocolInterceptor<Error = E> + Send + Sync + 'static>;
304
305#[async_trait]
306impl<E: ErrorExt> LineProtocolInterceptor for Option<LineProtocolInterceptorRef<E>> {
307 type Error = E;
308
309 fn pre_execute(&self, line: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
310 if let Some(this) = self {
311 this.pre_execute(line, query_ctx)
312 } else {
313 Ok(())
314 }
315 }
316
317 async fn post_lines_conversion(
318 &self,
319 requests: RowInsertRequests,
320 query_context: QueryContextRef,
321 ) -> Result<RowInsertRequests, Self::Error> {
322 if let Some(this) = self {
323 this.post_lines_conversion(requests, query_context).await
324 } else {
325 Ok(requests)
326 }
327 }
328}
329
330pub trait OpenTelemetryProtocolInterceptor {
333 type Error: ErrorExt;
334
335 fn pre_execute(&self, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
336 Ok(())
337 }
338}
339
340pub type OpenTelemetryProtocolInterceptorRef<E> =
341 Arc<dyn OpenTelemetryProtocolInterceptor<Error = E> + Send + Sync + 'static>;
342
343impl<E: ErrorExt> OpenTelemetryProtocolInterceptor
344 for Option<OpenTelemetryProtocolInterceptorRef<E>>
345{
346 type Error = E;
347
348 fn pre_execute(&self, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
349 if let Some(this) = self {
350 this.pre_execute(query_ctx)
351 } else {
352 Ok(())
353 }
354 }
355}
356
357pub trait PromStoreProtocolInterceptor {
360 type Error: ErrorExt;
361
362 fn pre_write(
363 &self,
364 _write_req: &RowInsertRequests,
365 _ctx: QueryContextRef,
366 ) -> Result<(), Self::Error> {
367 Ok(())
368 }
369
370 fn pre_read(&self, _read_req: &ReadRequest, _ctx: QueryContextRef) -> Result<(), Self::Error> {
371 Ok(())
372 }
373}
374
375pub type PromStoreProtocolInterceptorRef<E> =
376 Arc<dyn PromStoreProtocolInterceptor<Error = E> + Send + Sync + 'static>;
377
378impl<E: ErrorExt> PromStoreProtocolInterceptor for Option<PromStoreProtocolInterceptorRef<E>> {
379 type Error = E;
380
381 fn pre_write(
382 &self,
383 write_req: &RowInsertRequests,
384 ctx: QueryContextRef,
385 ) -> Result<(), Self::Error> {
386 if let Some(this) = self {
387 this.pre_write(write_req, ctx)
388 } else {
389 Ok(())
390 }
391 }
392
393 fn pre_read(&self, read_req: &ReadRequest, ctx: QueryContextRef) -> Result<(), Self::Error> {
394 if let Some(this) = self {
395 this.pre_read(read_req, ctx)
396 } else {
397 Ok(())
398 }
399 }
400}
401
402pub trait LogIngestInterceptor {
405 type Error: ErrorExt;
406
407 fn pre_pipeline(
409 &self,
410 values: Vec<Value>,
411 _query_ctx: QueryContextRef,
412 ) -> Result<Vec<Value>, Self::Error> {
413 Ok(values)
414 }
415
416 fn pre_ingest(
418 &self,
419 request: RowInsertRequests,
420 _query_ctx: QueryContextRef,
421 ) -> Result<RowInsertRequests, Self::Error> {
422 Ok(request)
423 }
424}
425
426pub type LogIngestInterceptorRef<E> =
427 Arc<dyn LogIngestInterceptor<Error = E> + Send + Sync + 'static>;
428
429impl<E> LogIngestInterceptor for Option<&LogIngestInterceptorRef<E>>
430where
431 E: ErrorExt,
432{
433 type Error = E;
434
435 fn pre_pipeline(
436 &self,
437 values: Vec<Value>,
438 query_ctx: QueryContextRef,
439 ) -> Result<Vec<Value>, Self::Error> {
440 if let Some(this) = self {
441 this.pre_pipeline(values, query_ctx)
442 } else {
443 Ok(values)
444 }
445 }
446
447 fn pre_ingest(
448 &self,
449 request: RowInsertRequests,
450 query_ctx: QueryContextRef,
451 ) -> Result<RowInsertRequests, Self::Error> {
452 if let Some(this) = self {
453 this.pre_ingest(request, query_ctx)
454 } else {
455 Ok(request)
456 }
457 }
458}
459
460pub trait LogQueryInterceptor {
463 type Error: ErrorExt;
464
465 fn pre_query(&self, _query: &LogQuery, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
467 Ok(())
468 }
469
470 fn post_query(
473 &self,
474 output: Output,
475 _query_ctx: QueryContextRef,
476 ) -> Result<Output, Self::Error> {
477 Ok(output)
478 }
479}
480
481pub type LogQueryInterceptorRef<E> =
482 Arc<dyn LogQueryInterceptor<Error = E> + Send + Sync + 'static>;
483
484impl<E> LogQueryInterceptor for Option<&LogQueryInterceptorRef<E>>
485where
486 E: ErrorExt,
487{
488 type Error = E;
489
490 fn pre_query(&self, query: &LogQuery, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
491 if let Some(this) = self {
492 this.pre_query(query, query_ctx)
493 } else {
494 Ok(())
495 }
496 }
497
498 fn post_query(
499 &self,
500 output: Output,
501 query_ctx: QueryContextRef,
502 ) -> Result<Output, Self::Error> {
503 if let Some(this) = self {
504 this.post_query(output, query_ctx)
505 } else {
506 Ok(output)
507 }
508 }
509}