pub mod grpc;
pub mod sql;
use std::collections::HashMap;
use std::sync::Arc;
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use catalog::CatalogManager;
use common_query::Output;
use headers::HeaderValue;
use log_query::LogQuery;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimePipelineParams, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
use serde_json::Value;
use session::context::{QueryContext, QueryContextRef};
use crate::error::Result;
use crate::http::jaeger::QueryTraceParams;
use crate::influxdb::InfluxdbRequest;
use crate::opentsdb::codec::DataPoint;
use crate::prom_store::Metrics;
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
pub type PromStoreProtocolHandlerRef = Arc<dyn PromStoreProtocolHandler + Send + Sync>;
pub type OpenTelemetryProtocolHandlerRef = Arc<dyn OpenTelemetryProtocolHandler + Send + Sync>;
pub type PipelineHandlerRef = Arc<dyn PipelineHandler + Send + Sync>;
pub type LogQueryHandlerRef = Arc<dyn LogQueryHandler + Send + Sync>;
pub type JaegerQueryHandlerRef = Arc<dyn JaegerQueryHandler + Send + Sync>;
#[async_trait]
pub trait InfluxdbLineProtocolHandler {
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<Output>;
}
#[async_trait]
pub trait OpentsdbProtocolHandler {
async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
}
pub struct PromStoreResponse {
pub content_type: HeaderValue,
pub content_encoding: HeaderValue,
pub resp_metrics: HashMap<String, Value>,
pub body: Vec<u8>,
}
#[async_trait]
pub trait PromStoreProtocolHandler {
async fn write(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> Result<Output>;
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
}
#[async_trait]
pub trait OpenTelemetryProtocolHandler: PipelineHandler {
async fn metrics(
&self,
request: ExportMetricsServiceRequest,
ctx: QueryContextRef,
) -> Result<Output>;
async fn traces(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportTraceServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> Result<Output>;
async fn logs(
&self,
pipeline_handler: PipelineHandlerRef,
request: ExportLogsServiceRequest,
pipeline: PipelineWay,
pipeline_params: GreptimePipelineParams,
table_name: String,
ctx: QueryContextRef,
) -> Result<Output>;
}
#[async_trait]
pub trait PipelineHandler {
async fn insert(&self, input: RowInsertRequests, ctx: QueryContextRef) -> Result<Output>;
async fn get_pipeline(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Arc<Pipeline>>;
async fn insert_pipeline(
&self,
name: &str,
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> Result<PipelineInfo>;
async fn delete_pipeline(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>>;
async fn get_table(
&self,
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline>;
}
#[async_trait]
pub trait LogQueryHandler {
async fn query(&self, query: LogQuery, ctx: QueryContextRef) -> Result<Output>;
fn catalog_manager(&self, ctx: &QueryContext) -> Result<&dyn CatalogManager>;
}
#[async_trait]
pub trait JaegerQueryHandler {
async fn get_services(&self, ctx: QueryContextRef) -> Result<Output>;
async fn get_operations(
&self,
ctx: QueryContextRef,
service_name: &str,
span_kind: Option<&str>,
) -> Result<Output>;
async fn get_trace(&self, ctx: QueryContextRef, trace_id: &str) -> Result<Output>;
async fn find_traces(
&self,
ctx: QueryContextRef,
query_params: QueryTraceParams,
) -> Result<Output>;
}