servers/http/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::prom_store::remote::ReadRequest;
18use axum::Extension;
19use axum::body::Bytes;
20use axum::extract::{Query, State};
21use axum::http::{HeaderValue, StatusCode, header};
22use axum::response::IntoResponse;
23use axum_extra::TypedHeader;
24use common_catalog::consts::DEFAULT_SCHEMA_NAME;
25use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
26use common_telemetry::tracing;
27use hyper::HeaderMap;
28use pipeline::PipelineDefinition;
29use pipeline::util::to_pipeline_version;
30use prost::Message;
31use serde::{Deserialize, Serialize};
32use session::context::{Channel, QueryContext};
33use snafu::prelude::*;
34
35use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
36use crate::http::extractor::PipelineInfo;
37use crate::http::header::{GREPTIME_DB_HEADER_METRICS, write_cost_header_map};
38use crate::prom_remote_write::decode::PromSeriesProcessor;
39use crate::prom_remote_write::decode_remote_write_request;
40use crate::prom_remote_write::validation::PromValidationMode;
41use crate::prom_store::{extract_schema_from_read_request, snappy_decompress};
42use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
43
44pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
45pub const DEFAULT_ENCODING: &str = "snappy";
46pub const VM_ENCODING: &str = "zstd";
47pub const VM_PROTO_VERSION: &str = "1";
48
49#[derive(Clone)]
50pub struct PromStoreState {
51    pub prom_store_handler: PromStoreProtocolHandlerRef,
52    pub pipeline_handler: Option<PipelineHandlerRef>,
53    pub prom_store_with_metric_engine: bool,
54    pub prom_validation_mode: PromValidationMode,
55}
56
57#[derive(Debug, Serialize, Deserialize)]
58pub struct RemoteWriteQuery {
59    pub db: Option<String>,
60    /// Specify which physical table to use for storing metrics.
61    /// This only works on remote write requests.
62    pub physical_table: Option<String>,
63    /// For VictoriaMetrics modified remote write protocol
64    pub get_vm_proto_version: Option<String>,
65}
66
67impl Default for RemoteWriteQuery {
68    fn default() -> RemoteWriteQuery {
69        Self {
70            db: Some(DEFAULT_SCHEMA_NAME.to_string()),
71            physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
72            get_vm_proto_version: None,
73        }
74    }
75}
76
77#[axum_macros::debug_handler]
78#[tracing::instrument(
79    skip_all,
80    fields(protocol = "prometheus", request_type = "remote_write")
81)]
82pub async fn remote_write(
83    State(state): State<PromStoreState>,
84    Query(params): Query<RemoteWriteQuery>,
85    Extension(mut query_ctx): Extension<QueryContext>,
86    pipeline_info: PipelineInfo,
87    content_encoding: TypedHeader<headers::ContentEncoding>,
88    body: Bytes,
89) -> Result<impl IntoResponse> {
90    let PromStoreState {
91        prom_store_handler,
92        pipeline_handler,
93        prom_store_with_metric_engine,
94        prom_validation_mode,
95    } = state;
96
97    if let Some(_vm_handshake) = params.get_vm_proto_version {
98        return Ok(VM_PROTO_VERSION.into_response());
99    }
100
101    let db = params.db.clone().unwrap_or_default();
102    query_ctx.set_channel(Channel::Prometheus);
103    if let Some(physical_table) = params.physical_table {
104        query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
105    }
106    let query_ctx = Arc::new(query_ctx);
107    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
108        .with_label_values(&[db.as_str()])
109        .start_timer();
110
111    let is_zstd = content_encoding.contains(VM_ENCODING);
112
113    let mut processor = PromSeriesProcessor::default_processor();
114
115    if let Some(pipeline_name) = pipeline_info.pipeline_name {
116        let pipeline_def = PipelineDefinition::from_name(
117            &pipeline_name,
118            to_pipeline_version(pipeline_info.pipeline_version.as_deref())
119                .context(PipelineSnafu)?,
120            None,
121        )
122        .context(PipelineSnafu)?;
123        let pipeline_handler = pipeline_handler.context(InternalSnafu {
124            err_msg: "pipeline handler is not set".to_string(),
125        })?;
126
127        processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
128    }
129
130    let mut req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor)?;
131
132    let req = if processor.use_pipeline {
133        processor.exec_pipeline().await?
134    } else {
135        req.as_insert_requests()
136    };
137
138    let mut cost = 0;
139    for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
140        let cnt: u64 = reqs
141            .inserts
142            .iter()
143            .filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64))
144            .sum();
145        let output = prom_store_handler
146            .write(reqs, temp_ctx, prom_store_with_metric_engine)
147            .await?;
148        crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
149            .with_label_values(&[db.as_str()])
150            .inc_by(cnt);
151        cost += output.meta.cost;
152    }
153
154    Ok((StatusCode::NO_CONTENT, write_cost_header_map(cost)).into_response())
155}
156
157impl IntoResponse for PromStoreResponse {
158    fn into_response(self) -> axum::response::Response {
159        let mut header_map = HeaderMap::new();
160        header_map.insert(&header::CONTENT_TYPE, self.content_type);
161        header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
162
163        let metrics = if self.resp_metrics.is_empty() {
164            None
165        } else {
166            serde_json::to_string(&self.resp_metrics).ok()
167        };
168        if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
169            header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
170        }
171
172        (header_map, self.body).into_response()
173    }
174}
175
176#[axum_macros::debug_handler]
177#[tracing::instrument(
178    skip_all,
179    fields(protocol = "prometheus", request_type = "remote_read")
180)]
181pub async fn remote_read(
182    State(state): State<PromStoreState>,
183    Query(params): Query<RemoteWriteQuery>,
184    Extension(mut query_ctx): Extension<QueryContext>,
185    body: Bytes,
186) -> Result<PromStoreResponse> {
187    let db = params.db.clone().unwrap_or_default();
188    query_ctx.set_channel(Channel::Prometheus);
189
190    let request = decode_remote_read_request(body).await?;
191
192    // Extract schema from special labels and set it in query context
193    if let Some(schema) = extract_schema_from_read_request(&request) {
194        query_ctx.set_current_schema(&schema);
195    }
196
197    let query_ctx = Arc::new(query_ctx);
198    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
199        .with_label_values(&[db.as_str()])
200        .start_timer();
201
202    state.prom_store_handler.read(request, query_ctx).await
203}
204
205async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
206    let buf = snappy_decompress(&body[..])?;
207
208    ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
209}