1pub mod grpc;
27pub mod sql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::prom_store::remote::ReadRequest;
33use api::v1::RowInsertRequests;
34use async_trait::async_trait;
35use catalog::CatalogManager;
36use common_query::Output;
37use datatypes::timestamp::TimestampNanosecond;
38use headers::HeaderValue;
39use log_query::LogQuery;
40use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
41use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
42use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
43use pipeline::{GreptimePipelineParams, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
44use serde_json::Value;
45use session::context::{QueryContext, QueryContextRef};
46
47use crate::error::Result;
48use crate::http::jaeger::QueryTraceParams;
49use crate::influxdb::InfluxdbRequest;
50use crate::opentsdb::codec::DataPoint;
51use crate::prom_store::Metrics;
52pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
53pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
54pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
55pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
56pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
57pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
58pub type JaegerQueryHandlerRef = Arc<dyn JaegerQueryHandler + Send + Sync>;
59
60#[async_trait]
61pub trait InfluxdbLineProtocolHandler {
62 async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<Output>;
65}
66
67#[async_trait]
68pub trait OpentsdbProtocolHandler {
69 async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
72}
73
74pub struct PromStoreResponse {
75 pub content_type: HeaderValue,
76 pub content_encoding: HeaderValue,
77 pub resp_metrics: HashMap<String, Value>,
78 pub body: Vec<u8>,
79}
80
81#[async_trait]
82pub trait PromStoreProtocolHandler {
83 async fn write(
85 &self,
86 request: RowInsertRequests,
87 ctx: QueryContextRef,
88 with_metric_engine: bool,
89 ) -> Result<Output>;
90
91 async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
93 async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
95}
96
97#[async_trait]
98pub trait OpenTelemetryProtocolHandler: PipelineHandler {
99 async fn metrics(
101 &self,
102 request: ExportMetricsServiceRequest,
103 ctx: QueryContextRef,
104 ) -> Result<Output>;
105
106 async fn traces(
108 &self,
109 pipeline_handler: PipelineHandlerRef,
110 request: ExportTraceServiceRequest,
111 pipeline: PipelineWay,
112 pipeline_params: GreptimePipelineParams,
113 table_name: String,
114 ctx: QueryContextRef,
115 ) -> Result<Output>;
116
117 async fn logs(
118 &self,
119 pipeline_handler: PipelineHandlerRef,
120 request: ExportLogsServiceRequest,
121 pipeline: PipelineWay,
122 pipeline_params: GreptimePipelineParams,
123 table_name: String,
124 ctx: QueryContextRef,
125 ) -> Result<Output>;
126}
127
128#[async_trait]
136pub trait PipelineHandler {
137 async fn insert(&self, input: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
138
139 async fn get_pipeline(
140 &self,
141 name: &str,
142 version: PipelineVersion,
143 query_ctx: QueryContextRef,
144 ) -> Result<Arc<Pipeline>>;
145
146 async fn insert_pipeline(
147 &self,
148 name: &str,
149 content_type: &str,
150 pipeline: &str,
151 query_ctx: QueryContextRef,
152 ) -> Result<PipelineInfo>;
153
154 async fn delete_pipeline(
155 &self,
156 name: &str,
157 version: PipelineVersion,
158 query_ctx: QueryContextRef,
159 ) -> Result<Option<()>>;
160
161 async fn get_table(
162 &self,
163 table: &str,
164 query_ctx: &QueryContext,
165 ) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
166
167 fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline>;
169
170 async fn get_pipeline_str(
172 &self,
173 name: &str,
174 version: PipelineVersion,
175 query_ctx: QueryContextRef,
176 ) -> Result<(String, TimestampNanosecond)>;
177}
178
179#[async_trait]
181pub trait LogQueryHandler {
182 async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
184
185 fn catalog_manager(&self, ctx: &QueryContext) -> Result<&dyn CatalogManager>;
187}
188
189#[async_trait]
191pub trait JaegerQueryHandler {
192 async fn get_services(&self, ctx: QueryContextRef) -> Result<Output>;
194
195 async fn get_operations(
197 &self,
198 ctx: QueryContextRef,
199 service_name: &str,
200 span_kind: Option<&str>,
201 start_time: Option<i64>,
202 end_time: Option<i64>,
203 ) -> Result<Output>;
204
205 async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result<Output>;
207
208 async fn find_traces(
210 &self,
211 ctx: QueryContextRef,
212 query_params: QueryTraceParams,
213 ) -> Result<Output>;
214}