Skip to main content

servers/
query_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All query handler traits for various request protocols, like SQL or GRPC.
16//!
17//! Instance that wishes to support certain request protocol, just implement the corresponding
18//! trait, the Server will handle codec for you.
19//!
20//! Note:
21//! Query handlers are not confined to only handle read requests, they are expecting to handle
22//! write requests too. So the "query" here not might seem ambiguity. However, "query" has been
23//! used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the
24//! word "query".
25
26pub mod grpc;
27pub mod sql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::prom_store::remote::ReadRequest;
33use api::v1::RowInsertRequests;
34use async_trait::async_trait;
35use catalog::CatalogManager;
36use common_query::Output;
37use datatypes::timestamp::TimestampNanosecond;
38use headers::HeaderValue;
39use log_query::LogQuery;
40use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
41use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
42use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
43use pipeline::{GreptimePipelineParams, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
44use serde_json::Value;
45use session::context::{QueryContext, QueryContextRef};
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48pub struct DashboardDefinition {
49    pub name: String,
50    pub definition: String,
51}
52
53use crate::error::Result;
54use crate::http::jaeger::QueryTraceParams;
55use crate::influxdb::InfluxdbRequest;
56use crate::opentsdb::codec::DataPoint;
57use crate::prom_store::Metrics;
58pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
59pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
60pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
61pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
62pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
63pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
64pub type JaegerQueryHandlerRef = Arc<dyn JaegerQueryHandler + Send + Sync>;
65
66#[derive(Debug, Default, Clone)]
67pub struct TraceIngestOutcome {
68    pub write_cost: usize,
69    pub accepted_spans: usize,
70    pub rejected_spans: usize,
71    pub error_message: Option<String>,
72}
73
74#[async_trait]
75pub trait InfluxdbLineProtocolHandler {
76    /// A successful request will not return a response.
77    /// Only on error will the socket return a line of data.
78    async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<Output>;
79}
80
81#[async_trait]
82pub trait OpentsdbProtocolHandler {
83    /// A successful request will not return a response.
84    /// Only on error will the socket return a line of data.
85    async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
86}
87
88pub struct PromStoreResponse {
89    pub content_type: HeaderValue,
90    pub content_encoding: HeaderValue,
91    pub resp_metrics: HashMap<String, Value>,
92    pub body: Vec<u8>,
93}
94
95#[async_trait]
96pub trait PromStoreProtocolHandler {
97    /// Runs pre-write checks/hooks for prometheus remote write requests.
98    async fn pre_write(&self, _request: &RowInsertRequests, _ctx: QueryContextRef) -> Result<()> {
99        Ok(())
100    }
101
102    /// Handling prometheus remote write requests
103    async fn write(
104        &self,
105        request: RowInsertRequests,
106        ctx: QueryContextRef,
107        with_metric_engine: bool,
108    ) -> Result<Output>;
109
110    /// Handling prometheus remote read requests
111    async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
112    /// Handling push gateway requests
113    async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
114}
115
116#[async_trait]
117pub trait OpenTelemetryProtocolHandler: PipelineHandler {
118    /// Handling opentelemetry metrics request
119    async fn metrics(
120        &self,
121        request: ExportMetricsServiceRequest,
122        ctx: QueryContextRef,
123    ) -> Result<Output>;
124
125    /// Handling opentelemetry traces request
126    async fn traces(
127        &self,
128        pipeline_handler: PipelineHandlerRef,
129        request: ExportTraceServiceRequest,
130        pipeline: PipelineWay,
131        pipeline_params: GreptimePipelineParams,
132        table_name: String,
133        ctx: QueryContextRef,
134    ) -> Result<TraceIngestOutcome>;
135
136    async fn logs(
137        &self,
138        pipeline_handler: PipelineHandlerRef,
139        request: ExportLogsServiceRequest,
140        pipeline: PipelineWay,
141        pipeline_params: GreptimePipelineParams,
142        table_name: String,
143        ctx: QueryContextRef,
144    ) -> Result<Vec<Output>>;
145}
146
147/// PipelineHandler is responsible for handling pipeline related requests.
148///
149/// The "Pipeline" is a series of transformations that can be applied to unstructured
150/// data like logs. This handler is responsible to manage pipelines and accept data for
151/// processing.
152///
153/// The pipeline is stored in the database and can be retrieved by its name.
154#[async_trait]
155pub trait PipelineHandler {
156    async fn insert(&self, input: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
157
158    async fn get_pipeline(
159        &self,
160        name: &str,
161        version: PipelineVersion,
162        query_ctx: QueryContextRef,
163    ) -> Result<Arc<Pipeline>>;
164
165    async fn insert_pipeline(
166        &self,
167        name: &str,
168        content_type: &str,
169        pipeline: &str,
170        query_ctx: QueryContextRef,
171    ) -> Result<PipelineInfo>;
172
173    async fn delete_pipeline(
174        &self,
175        name: &str,
176        version: PipelineVersion,
177        query_ctx: QueryContextRef,
178    ) -> Result<Option<()>>;
179
180    async fn get_table(
181        &self,
182        table: &str,
183        query_ctx: &QueryContext,
184    ) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
185
186    //// Build a pipeline from a string.
187    fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline>;
188
189    /// Get a original pipeline by name.
190    async fn get_pipeline_str(
191        &self,
192        name: &str,
193        version: PipelineVersion,
194        query_ctx: QueryContextRef,
195    ) -> Result<(String, TimestampNanosecond)>;
196}
197
198/// Handling dashboard as code CRUD
199pub type DashboardHandlerRef = Arc<dyn DashboardHandler + Send + Sync>;
200
201#[async_trait]
202pub trait DashboardHandler {
203    async fn save(&self, name: &str, definition: &str, ctx: QueryContextRef) -> Result<()>;
204
205    async fn list(&self, ctx: QueryContextRef) -> Result<Vec<DashboardDefinition>>;
206
207    async fn delete(&self, name: &str, ctx: QueryContextRef) -> Result<()>;
208}
209
210/// Handle log query requests.
211#[async_trait]
212pub trait LogQueryHandler {
213    /// Execute a log query.
214    async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
215
216    /// Get catalog manager.
217    fn catalog_manager(&self, ctx: &QueryContext) -> Result<&dyn CatalogManager>;
218}
219
220/// Handle Jaeger query requests.
221#[async_trait]
222pub trait JaegerQueryHandler {
223    /// Get trace services. It's used for `/api/services` API.
224    async fn get_services(&self, ctx: QueryContextRef) -> Result<Output>;
225
226    /// Get Jaeger operations. It's used for `/api/operations` and `/api/services/{service_name}/operations` API.
227    async fn get_operations(
228        &self,
229        ctx: QueryContextRef,
230        service_name: &str,
231        span_kind: Option<&str>,
232    ) -> Result<Output>;
233
234    /// Retrieves a trace by its unique identifier.
235    ///
236    /// This method is used to handle requests to the `/api/traces/{trace_id}` endpoint.
237    /// It accepts optional `start_time` and `end_time` parameters in nanoseconds to filter the trace data within a specific time range.
238    async fn get_trace(
239        &self,
240        ctx: QueryContextRef,
241        trace_id: &str,
242        start_time: Option<i64>,
243        end_time: Option<i64>,
244        limit: Option<usize>,
245    ) -> Result<Output>;
246
247    /// Find traces by query params. It's used for `/api/traces` API.
248    async fn find_traces(
249        &self,
250        ctx: QueryContextRef,
251        query_params: QueryTraceParams,
252    ) -> Result<Output>;
253}