servers/
query_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All query handler traits for various request protocols, like SQL or GRPC.
16//!
17//! Instance that wishes to support certain request protocol, just implement the corresponding
18//! trait, the Server will handle codec for you.
19//!
20//! Note:
21//! Query handlers are not confined to only handle read requests, they are expecting to handle
22//! write requests too. So the "query" here not might seem ambiguity. However, "query" has been
23//! used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the
24//! word "query".
25
26pub mod grpc;
27pub mod sql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::prom_store::remote::ReadRequest;
33use api::v1::RowInsertRequests;
34use async_trait::async_trait;
35use catalog::CatalogManager;
36use common_query::Output;
37use datatypes::timestamp::TimestampNanosecond;
38use headers::HeaderValue;
39use log_query::LogQuery;
40use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
41use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
42use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
43use pipeline::{GreptimePipelineParams, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
44use serde_json::Value;
45use session::context::{QueryContext, QueryContextRef};
46
47use crate::error::Result;
48use crate::http::jaeger::QueryTraceParams;
49use crate::influxdb::InfluxdbRequest;
50use crate::opentsdb::codec::DataPoint;
51use crate::prom_store::Metrics;
52pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
53pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
54pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
55pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
56pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
57pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
58pub type JaegerQueryHandlerRef = Arc<dyn JaegerQueryHandler + Send + Sync>;
59
60#[async_trait]
61pub trait InfluxdbLineProtocolHandler {
62    /// A successful request will not return a response.
63    /// Only on error will the socket return a line of data.
64    async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<Output>;
65}
66
67#[async_trait]
68pub trait OpentsdbProtocolHandler {
69    /// A successful request will not return a response.
70    /// Only on error will the socket return a line of data.
71    async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
72}
73
74pub struct PromStoreResponse {
75    pub content_type: HeaderValue,
76    pub content_encoding: HeaderValue,
77    pub resp_metrics: HashMap<String, Value>,
78    pub body: Vec<u8>,
79}
80
81#[async_trait]
82pub trait PromStoreProtocolHandler {
83    /// Handling prometheus remote write requests
84    async fn write(
85        &self,
86        request: RowInsertRequests,
87        ctx: QueryContextRef,
88        with_metric_engine: bool,
89    ) -> Result<Output>;
90
91    /// Handling prometheus remote read requests
92    async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
93    /// Handling push gateway requests
94    async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
95}
96
97#[async_trait]
98pub trait OpenTelemetryProtocolHandler: PipelineHandler {
99    /// Handling opentelemetry metrics request
100    async fn metrics(
101        &self,
102        request: ExportMetricsServiceRequest,
103        ctx: QueryContextRef,
104    ) -> Result<Output>;
105
106    /// Handling opentelemetry traces request
107    async fn traces(
108        &self,
109        pipeline_handler: PipelineHandlerRef,
110        request: ExportTraceServiceRequest,
111        pipeline: PipelineWay,
112        pipeline_params: GreptimePipelineParams,
113        table_name: String,
114        ctx: QueryContextRef,
115    ) -> Result<Output>;
116
117    async fn logs(
118        &self,
119        pipeline_handler: PipelineHandlerRef,
120        request: ExportLogsServiceRequest,
121        pipeline: PipelineWay,
122        pipeline_params: GreptimePipelineParams,
123        table_name: String,
124        ctx: QueryContextRef,
125    ) -> Result<Output>;
126}
127
128/// PipelineHandler is responsible for handling pipeline related requests.
129///
130/// The "Pipeline" is a series of transformations that can be applied to unstructured
131/// data like logs. This handler is responsible to manage pipelines and accept data for
132/// processing.
133///
134/// The pipeline is stored in the database and can be retrieved by its name.
135#[async_trait]
136pub trait PipelineHandler {
137    async fn insert(&self, input: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
138
139    async fn get_pipeline(
140        &self,
141        name: &str,
142        version: PipelineVersion,
143        query_ctx: QueryContextRef,
144    ) -> Result<Arc<Pipeline>>;
145
146    async fn insert_pipeline(
147        &self,
148        name: &str,
149        content_type: &str,
150        pipeline: &str,
151        query_ctx: QueryContextRef,
152    ) -> Result<PipelineInfo>;
153
154    async fn delete_pipeline(
155        &self,
156        name: &str,
157        version: PipelineVersion,
158        query_ctx: QueryContextRef,
159    ) -> Result<Option<()>>;
160
161    async fn get_table(
162        &self,
163        table: &str,
164        query_ctx: &QueryContext,
165    ) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
166
167    //// Build a pipeline from a string.
168    fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline>;
169
170    /// Get a original pipeline by name.
171    async fn get_pipeline_str(
172        &self,
173        name: &str,
174        version: PipelineVersion,
175        query_ctx: QueryContextRef,
176    ) -> Result<(String, TimestampNanosecond)>;
177}
178
179/// Handle log query requests.
180#[async_trait]
181pub trait LogQueryHandler {
182    /// Execute a log query.
183    async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
184
185    /// Get catalog manager.
186    fn catalog_manager(&self, ctx: &QueryContext) -> Result<&dyn CatalogManager>;
187}
188
189/// Handle Jaeger query requests.
190#[async_trait]
191pub trait JaegerQueryHandler {
192    /// Get trace services. It's used for `/api/services` API.
193    async fn get_services(&self, ctx: QueryContextRef) -> Result<Output>;
194
195    /// Get Jaeger operations. It's used for `/api/operations` and `/api/services/{service_name}/operations` API.
196    async fn get_operations(
197        &self,
198        ctx: QueryContextRef,
199        service_name: &str,
200        span_kind: Option<&str>,
201        start_time: Option<i64>,
202        end_time: Option<i64>,
203    ) -> Result<Output>;
204
205    /// Get trace by trace id. It's used for `/api/traces/{trace_id}` API.
206    async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result<Output>;
207
208    /// Find traces by query params. It's used for `/api/traces` API.
209    async fn find_traces(
210        &self,
211        ctx: QueryContextRef,
212        query_params: QueryTraceParams,
213    ) -> Result<Output>;
214}