1pub mod grpc;
27pub mod sql;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use api::prom_store::remote::ReadRequest;
33use api::v1::RowInsertRequests;
34use async_trait::async_trait;
35use catalog::CatalogManager;
36use common_query::Output;
37use datatypes::timestamp::TimestampNanosecond;
38use headers::HeaderValue;
39use log_query::LogQuery;
40use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
41use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
42use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
43use pipeline::{GreptimePipelineParams, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
44use serde_json::Value;
45use session::context::{QueryContext, QueryContextRef};
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48pub struct DashboardDefinition {
49 pub name: String,
50 pub definition: String,
51}
52
53use crate::error::Result;
54use crate::http::jaeger::QueryTraceParams;
55use crate::influxdb::InfluxdbRequest;
56use crate::opentsdb::codec::DataPoint;
57use crate::prom_store::Metrics;
58pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
59pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
60pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
61pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
62pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
63pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
64pub type JaegerQueryHandlerRef = Arc<dyn JaegerQueryHandler + Send + Sync>;
65
66#[derive(Debug, Default, Clone)]
67pub struct TraceIngestOutcome {
68 pub write_cost: usize,
69 pub accepted_spans: usize,
70 pub rejected_spans: usize,
71 pub error_message: Option<String>,
72}
73
74#[async_trait]
75pub trait InfluxdbLineProtocolHandler {
76 async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<Output>;
79}
80
81#[async_trait]
82pub trait OpentsdbProtocolHandler {
83 async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
86}
87
88pub struct PromStoreResponse {
89 pub content_type: HeaderValue,
90 pub content_encoding: HeaderValue,
91 pub resp_metrics: HashMap<String, Value>,
92 pub body: Vec<u8>,
93}
94
95#[async_trait]
96pub trait PromStoreProtocolHandler {
97 async fn pre_write(&self, _request: &RowInsertRequests, _ctx: QueryContextRef) -> Result<()> {
99 Ok(())
100 }
101
102 async fn write(
104 &self,
105 request: RowInsertRequests,
106 ctx: QueryContextRef,
107 with_metric_engine: bool,
108 ) -> Result<Output>;
109
110 async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
112 async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
114}
115
116#[async_trait]
117pub trait OpenTelemetryProtocolHandler: PipelineHandler {
118 async fn metrics(
120 &self,
121 request: ExportMetricsServiceRequest,
122 ctx: QueryContextRef,
123 ) -> Result<Output>;
124
125 async fn traces(
127 &self,
128 pipeline_handler: PipelineHandlerRef,
129 request: ExportTraceServiceRequest,
130 pipeline: PipelineWay,
131 pipeline_params: GreptimePipelineParams,
132 table_name: String,
133 ctx: QueryContextRef,
134 ) -> Result<TraceIngestOutcome>;
135
136 async fn logs(
137 &self,
138 pipeline_handler: PipelineHandlerRef,
139 request: ExportLogsServiceRequest,
140 pipeline: PipelineWay,
141 pipeline_params: GreptimePipelineParams,
142 table_name: String,
143 ctx: QueryContextRef,
144 ) -> Result<Vec<Output>>;
145}
146
147#[async_trait]
155pub trait PipelineHandler {
156 async fn insert(&self, input: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
157
158 async fn get_pipeline(
159 &self,
160 name: &str,
161 version: PipelineVersion,
162 query_ctx: QueryContextRef,
163 ) -> Result<Arc<Pipeline>>;
164
165 async fn insert_pipeline(
166 &self,
167 name: &str,
168 content_type: &str,
169 pipeline: &str,
170 query_ctx: QueryContextRef,
171 ) -> Result<PipelineInfo>;
172
173 async fn delete_pipeline(
174 &self,
175 name: &str,
176 version: PipelineVersion,
177 query_ctx: QueryContextRef,
178 ) -> Result<Option<()>>;
179
180 async fn get_table(
181 &self,
182 table: &str,
183 query_ctx: &QueryContext,
184 ) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
185
186 fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline>;
188
189 async fn get_pipeline_str(
191 &self,
192 name: &str,
193 version: PipelineVersion,
194 query_ctx: QueryContextRef,
195 ) -> Result<(String, TimestampNanosecond)>;
196}
197
198pub type DashboardHandlerRef = Arc<dyn DashboardHandler + Send + Sync>;
200
201#[async_trait]
202pub trait DashboardHandler {
203 async fn save(&self, name: &str, definition: &str, ctx: QueryContextRef) -> Result<()>;
204
205 async fn list(&self, ctx: QueryContextRef) -> Result<Vec<DashboardDefinition>>;
206
207 async fn delete(&self, name: &str, ctx: QueryContextRef) -> Result<()>;
208}
209
210#[async_trait]
212pub trait LogQueryHandler {
213 async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
215
216 fn catalog_manager(&self, ctx: &QueryContext) -> Result<&dyn CatalogManager>;
218}
219
220#[async_trait]
222pub trait JaegerQueryHandler {
223 async fn get_services(&self, ctx: QueryContextRef) -> Result<Output>;
225
226 async fn get_operations(
228 &self,
229 ctx: QueryContextRef,
230 service_name: &str,
231 span_kind: Option<&str>,
232 ) -> Result<Output>;
233
234 async fn get_trace(
239 &self,
240 ctx: QueryContextRef,
241 trace_id: &str,
242 start_time: Option<i64>,
243 end_time: Option<i64>,
244 limit: Option<usize>,
245 ) -> Result<Output>;
246
247 async fn find_traces(
249 &self,
250 ctx: QueryContextRef,
251 query_params: QueryTraceParams,
252 ) -> Result<Output>;
253}