1use std::borrow::Cow;
16use std::sync::Arc;
17
18use api::prom_store::remote::ReadRequest;
19use api::v1::greptime_request::Request;
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use common_error::ext::ErrorExt;
23use common_query::Output;
24use datafusion_expr::LogicalPlan;
25use log_query::LogQuery;
26use pipeline::PipelineMap;
27use query::parser::PromQuery;
28use session::context::QueryContextRef;
29use sql::statements::statement::Statement;
30
31pub trait SqlQueryInterceptor {
34 type Error: ErrorExt;
35
36 fn pre_parsing<'a>(
39 &self,
40 query: &'a str,
41 _query_ctx: QueryContextRef,
42 ) -> Result<Cow<'a, str>, Self::Error> {
43 Ok(Cow::Borrowed(query))
44 }
45
46 fn post_parsing(
50 &self,
51 statements: Vec<Statement>,
52 _query_ctx: QueryContextRef,
53 ) -> Result<Vec<Statement>, Self::Error> {
54 Ok(statements)
55 }
56
57 fn pre_execute(
59 &self,
60 _statement: &Statement,
61 _plan: Option<&LogicalPlan>,
62 _query_ctx: QueryContextRef,
63 ) -> Result<(), Self::Error> {
64 Ok(())
65 }
66
67 fn post_execute(
70 &self,
71 output: Output,
72 _query_ctx: QueryContextRef,
73 ) -> Result<Output, Self::Error> {
74 Ok(output)
75 }
76}
77
78pub type SqlQueryInterceptorRef<E> =
79 Arc<dyn SqlQueryInterceptor<Error = E> + Send + Sync + 'static>;
80
81impl<E> SqlQueryInterceptor for Option<&SqlQueryInterceptorRef<E>>
82where
83 E: ErrorExt,
84{
85 type Error = E;
86
87 fn pre_parsing<'a>(
88 &self,
89 query: &'a str,
90 query_ctx: QueryContextRef,
91 ) -> Result<Cow<'a, str>, Self::Error> {
92 if let Some(this) = self {
93 this.pre_parsing(query, query_ctx)
94 } else {
95 Ok(Cow::Borrowed(query))
96 }
97 }
98
99 fn post_parsing(
100 &self,
101 statements: Vec<Statement>,
102 query_ctx: QueryContextRef,
103 ) -> Result<Vec<Statement>, Self::Error> {
104 if let Some(this) = self {
105 this.post_parsing(statements, query_ctx)
106 } else {
107 Ok(statements)
108 }
109 }
110
111 fn pre_execute(
112 &self,
113 statement: &Statement,
114 plan: Option<&LogicalPlan>,
115 query_ctx: QueryContextRef,
116 ) -> Result<(), Self::Error> {
117 if let Some(this) = self {
118 this.pre_execute(statement, plan, query_ctx)
119 } else {
120 Ok(())
121 }
122 }
123
124 fn post_execute(
125 &self,
126 output: Output,
127 query_ctx: QueryContextRef,
128 ) -> Result<Output, Self::Error> {
129 if let Some(this) = self {
130 this.post_execute(output, query_ctx)
131 } else {
132 Ok(output)
133 }
134 }
135}
136
137pub trait GrpcQueryInterceptor {
140 type Error: ErrorExt;
141
142 fn pre_execute(
144 &self,
145 _request: &Request,
146 _query_ctx: QueryContextRef,
147 ) -> Result<(), Self::Error> {
148 Ok(())
149 }
150
151 fn post_execute(
154 &self,
155 output: Output,
156 _query_ctx: QueryContextRef,
157 ) -> Result<Output, Self::Error> {
158 Ok(output)
159 }
160}
161
162pub type GrpcQueryInterceptorRef<E> =
163 Arc<dyn GrpcQueryInterceptor<Error = E> + Send + Sync + 'static>;
164
165impl<E> GrpcQueryInterceptor for Option<&GrpcQueryInterceptorRef<E>>
166where
167 E: ErrorExt,
168{
169 type Error = E;
170
171 fn pre_execute(
172 &self,
173 _request: &Request,
174 _query_ctx: QueryContextRef,
175 ) -> Result<(), Self::Error> {
176 if let Some(this) = self {
177 this.pre_execute(_request, _query_ctx)
178 } else {
179 Ok(())
180 }
181 }
182
183 fn post_execute(
184 &self,
185 output: Output,
186 _query_ctx: QueryContextRef,
187 ) -> Result<Output, Self::Error> {
188 if let Some(this) = self {
189 this.post_execute(output, _query_ctx)
190 } else {
191 Ok(output)
192 }
193 }
194}
195
196pub trait PromQueryInterceptor {
199 type Error: ErrorExt;
200
201 fn pre_execute(
203 &self,
204 _query: &PromQuery,
205 _plan: Option<&LogicalPlan>,
206 _query_ctx: QueryContextRef,
207 ) -> Result<(), Self::Error> {
208 Ok(())
209 }
210
211 fn post_execute(
214 &self,
215 output: Output,
216 _query_ctx: QueryContextRef,
217 ) -> Result<Output, Self::Error> {
218 Ok(output)
219 }
220}
221
222pub type PromQueryInterceptorRef<E> =
223 Arc<dyn PromQueryInterceptor<Error = E> + Send + Sync + 'static>;
224
225impl<E> PromQueryInterceptor for Option<PromQueryInterceptorRef<E>>
226where
227 E: ErrorExt,
228{
229 type Error = E;
230
231 fn pre_execute(
232 &self,
233 query: &PromQuery,
234 plan: Option<&LogicalPlan>,
235 query_ctx: QueryContextRef,
236 ) -> Result<(), Self::Error> {
237 if let Some(this) = self {
238 this.pre_execute(query, plan, query_ctx)
239 } else {
240 Ok(())
241 }
242 }
243
244 fn post_execute(
245 &self,
246 output: Output,
247 query_ctx: QueryContextRef,
248 ) -> Result<Output, Self::Error> {
249 if let Some(this) = self {
250 this.post_execute(output, query_ctx)
251 } else {
252 Ok(output)
253 }
254 }
255}
256
257#[async_trait]
260pub trait LineProtocolInterceptor {
261 type Error: ErrorExt;
262
263 fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
264 Ok(())
265 }
266
267 async fn post_lines_conversion(
271 &self,
272 requests: RowInsertRequests,
273 query_context: QueryContextRef,
274 ) -> Result<RowInsertRequests, Self::Error> {
275 let _ = query_context;
276 Ok(requests)
277 }
278}
279
280pub type LineProtocolInterceptorRef<E> =
281 Arc<dyn LineProtocolInterceptor<Error = E> + Send + Sync + 'static>;
282
283#[async_trait]
284impl<E: ErrorExt> LineProtocolInterceptor for Option<LineProtocolInterceptorRef<E>> {
285 type Error = E;
286
287 fn pre_execute(&self, line: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
288 if let Some(this) = self {
289 this.pre_execute(line, query_ctx)
290 } else {
291 Ok(())
292 }
293 }
294
295 async fn post_lines_conversion(
296 &self,
297 requests: RowInsertRequests,
298 query_context: QueryContextRef,
299 ) -> Result<RowInsertRequests, Self::Error> {
300 if let Some(this) = self {
301 this.post_lines_conversion(requests, query_context).await
302 } else {
303 Ok(requests)
304 }
305 }
306}
307
308pub trait OpenTelemetryProtocolInterceptor {
311 type Error: ErrorExt;
312
313 fn pre_execute(&self, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
314 Ok(())
315 }
316}
317
318pub type OpenTelemetryProtocolInterceptorRef<E> =
319 Arc<dyn OpenTelemetryProtocolInterceptor<Error = E> + Send + Sync + 'static>;
320
321impl<E: ErrorExt> OpenTelemetryProtocolInterceptor
322 for Option<OpenTelemetryProtocolInterceptorRef<E>>
323{
324 type Error = E;
325
326 fn pre_execute(&self, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
327 if let Some(this) = self {
328 this.pre_execute(query_ctx)
329 } else {
330 Ok(())
331 }
332 }
333}
334
335pub trait PromStoreProtocolInterceptor {
338 type Error: ErrorExt;
339
340 fn pre_write(
341 &self,
342 _write_req: &RowInsertRequests,
343 _ctx: QueryContextRef,
344 ) -> Result<(), Self::Error> {
345 Ok(())
346 }
347
348 fn pre_read(&self, _read_req: &ReadRequest, _ctx: QueryContextRef) -> Result<(), Self::Error> {
349 Ok(())
350 }
351}
352
353pub type PromStoreProtocolInterceptorRef<E> =
354 Arc<dyn PromStoreProtocolInterceptor<Error = E> + Send + Sync + 'static>;
355
356impl<E: ErrorExt> PromStoreProtocolInterceptor for Option<PromStoreProtocolInterceptorRef<E>> {
357 type Error = E;
358
359 fn pre_write(
360 &self,
361 write_req: &RowInsertRequests,
362 ctx: QueryContextRef,
363 ) -> Result<(), Self::Error> {
364 if let Some(this) = self {
365 this.pre_write(write_req, ctx)
366 } else {
367 Ok(())
368 }
369 }
370
371 fn pre_read(&self, read_req: &ReadRequest, ctx: QueryContextRef) -> Result<(), Self::Error> {
372 if let Some(this) = self {
373 this.pre_read(read_req, ctx)
374 } else {
375 Ok(())
376 }
377 }
378}
379
380pub trait LogIngestInterceptor {
383 type Error: ErrorExt;
384
385 fn pre_pipeline(
387 &self,
388 values: Vec<PipelineMap>,
389 _query_ctx: QueryContextRef,
390 ) -> Result<Vec<PipelineMap>, Self::Error> {
391 Ok(values)
392 }
393
394 fn pre_ingest(
396 &self,
397 request: RowInsertRequests,
398 _query_ctx: QueryContextRef,
399 ) -> Result<RowInsertRequests, Self::Error> {
400 Ok(request)
401 }
402}
403
404pub type LogIngestInterceptorRef<E> =
405 Arc<dyn LogIngestInterceptor<Error = E> + Send + Sync + 'static>;
406
407impl<E> LogIngestInterceptor for Option<&LogIngestInterceptorRef<E>>
408where
409 E: ErrorExt,
410{
411 type Error = E;
412
413 fn pre_pipeline(
414 &self,
415 values: Vec<PipelineMap>,
416 query_ctx: QueryContextRef,
417 ) -> Result<Vec<PipelineMap>, Self::Error> {
418 if let Some(this) = self {
419 this.pre_pipeline(values, query_ctx)
420 } else {
421 Ok(values)
422 }
423 }
424
425 fn pre_ingest(
426 &self,
427 request: RowInsertRequests,
428 query_ctx: QueryContextRef,
429 ) -> Result<RowInsertRequests, Self::Error> {
430 if let Some(this) = self {
431 this.pre_ingest(request, query_ctx)
432 } else {
433 Ok(request)
434 }
435 }
436}
437
438pub trait LogQueryInterceptor {
441 type Error: ErrorExt;
442
443 fn pre_query(&self, _query: &LogQuery, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
445 Ok(())
446 }
447
448 fn post_query(
451 &self,
452 output: Output,
453 _query_ctx: QueryContextRef,
454 ) -> Result<Output, Self::Error> {
455 Ok(output)
456 }
457}
458
459pub type LogQueryInterceptorRef<E> =
460 Arc<dyn LogQueryInterceptor<Error = E> + Send + Sync + 'static>;
461
462impl<E> LogQueryInterceptor for Option<&LogQueryInterceptorRef<E>>
463where
464 E: ErrorExt,
465{
466 type Error = E;
467
468 fn pre_query(&self, query: &LogQuery, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
469 if let Some(this) = self {
470 this.pre_query(query, query_ctx)
471 } else {
472 Ok(())
473 }
474 }
475
476 fn post_query(
477 &self,
478 output: Output,
479 query_ctx: QueryContextRef,
480 ) -> Result<Output, Self::Error> {
481 if let Some(this) = self {
482 this.post_query(output, query_ctx)
483 } else {
484 Ok(output)
485 }
486 }
487}