Skip to main content

servers/http/
prom_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::prom_store::remote::ReadRequest;
18use axum::Extension;
19use axum::body::Bytes;
20use axum::extract::{Query, State};
21use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
22use axum::response::IntoResponse;
23use axum_extra::TypedHeader;
24use common_catalog::consts::DEFAULT_SCHEMA_NAME;
25use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
26use common_telemetry::tracing;
27use mime_guess::mime;
28use pipeline::util::to_pipeline_version;
29use pipeline::{ContextReq, PipelineDefinition};
30use prometheus::HistogramTimer;
31use prost::Message;
32use serde::{Deserialize, Serialize};
33use session::context::{Channel, QueryContext};
34use snafu::prelude::*;
35use table::requests::{
36    METADATA_QUALITY_INFERRED, SEMANTIC_METRIC_METADATA_QUALITY, SEMANTIC_SIGNAL_TYPE,
37    SEMANTIC_SOURCE, SEMANTIC_SOURCE_VERSION, SIGNAL_TYPE_METRIC, SOURCE_PROMETHEUS,
38};
39
40use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
41use crate::http::extractor::PipelineInfo;
42use crate::http::header::{
43    CONTENT_TYPE_PROTOBUF_STR, GREPTIME_DB_HEADER_METRICS, write_cost_header_map,
44};
45use crate::pending_rows_batcher::PendingRowsBatcher;
46use crate::prom_remote_write::decode::PromSeriesProcessor;
47use crate::prom_remote_write::decode_remote_write_request;
48use crate::prom_remote_write::v2::{RemoteWriteV2RequestExt, decode_remote_write_v2_request};
49use crate::prom_remote_write::validation::PromValidationMode;
50use crate::prom_store::{extract_schema_from_read_request, snappy_decompress};
51use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
52
53pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
54pub const DEFAULT_ENCODING: &str = "snappy";
55pub const VM_ENCODING: &str = "zstd";
56pub const VM_PROTO_VERSION: &str = "1";
57const REMOTE_WRITE_V1_VERSION: &str = "1.0";
58const REMOTE_WRITE_V2_VERSION: &str = "2.0";
59const REMOTE_WRITE_V1_PROTO: &str = "prometheus.WriteRequest";
60const REMOTE_WRITE_V2_PROTO: &str = "io.prometheus.write.v2.Request";
61const CONTENT_TYPE_PROTO_PARAM: &str = "proto";
62const REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER: &str = "x-prometheus-remote-write-samples-written";
63const REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER: &str =
64    "x-prometheus-remote-write-histograms-written";
65const REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER: &str =
66    "x-prometheus-remote-write-exemplars-written";
67
68#[derive(Clone)]
69pub struct PromStoreState {
70    pub prom_store_handler: PromStoreProtocolHandlerRef,
71    pub pipeline_handler: Option<PipelineHandlerRef>,
72    pub prom_store_with_metric_engine: bool,
73    pub prom_validation_mode: PromValidationMode,
74    pub pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
78pub struct RemoteWriteQuery {
79    pub db: Option<String>,
80    /// Specify which physical table to use for storing metrics.
81    /// This only works on remote write requests.
82    pub physical_table: Option<String>,
83    /// For VictoriaMetrics modified remote write protocol
84    pub get_vm_proto_version: Option<String>,
85}
86
87impl Default for RemoteWriteQuery {
88    fn default() -> RemoteWriteQuery {
89        Self {
90            db: Some(DEFAULT_SCHEMA_NAME.to_string()),
91            physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
92            get_vm_proto_version: None,
93        }
94    }
95}
96
97#[axum_macros::debug_handler]
98#[tracing::instrument(
99    skip_all,
100    fields(protocol = "prometheus", request_type = "remote_write")
101)]
102pub async fn remote_write(
103    State(state): State<PromStoreState>,
104    Query(params): Query<RemoteWriteQuery>,
105    Extension(query_ctx): Extension<QueryContext>,
106    content_type: Option<TypedHeader<headers::ContentType>>,
107    pipeline_info: PipelineInfo,
108    content_encoding: TypedHeader<headers::ContentEncoding>,
109    body: Bytes,
110) -> Result<axum::response::Response> {
111    let is_zstd = content_encoding.contains(VM_ENCODING);
112
113    match remote_write_proto(content_type) {
114        RemoteWriteProto::V1 => {
115            remote_write_v1(state, params, query_ctx, pipeline_info, is_zstd, body).await
116        }
117        RemoteWriteProto::V2 => {
118            if let Some(response) = unsupported_remote_write_v2_encoding_response(&content_encoding)
119            {
120                return Ok(response);
121            }
122            remote_write_v2(state, params, query_ctx, pipeline_info, is_zstd, body).await
123        }
124        RemoteWriteProto::Unsupported(content_type) => Ok((
125            StatusCode::UNSUPPORTED_MEDIA_TYPE,
126            format!("unsupported prometheus remote write content type: {content_type}"),
127        )
128            .into_response()),
129    }
130}
131
132async fn remote_write_v1(
133    state: PromStoreState,
134    params: RemoteWriteQuery,
135    query_ctx: QueryContext,
136    pipeline_info: PipelineInfo,
137    is_zstd: bool,
138    body: Bytes,
139) -> Result<axum::response::Response> {
140    let PromStoreState {
141        prom_store_handler,
142        pipeline_handler,
143        prom_store_with_metric_engine,
144        prom_validation_mode,
145        pending_rows_batcher,
146    } = state;
147
148    if let Some(response) = vm_proto_version_response(&params) {
149        return Ok(response);
150    }
151
152    let (db, query_ctx, _timer) =
153        prepare_remote_write_context(&params, query_ctx, REMOTE_WRITE_V1_VERSION);
154
155    let mut processor = PromSeriesProcessor::default_processor();
156
157    if let Some(pipeline_name) = pipeline_info.pipeline_name {
158        let pipeline_def = PipelineDefinition::from_name(
159            &pipeline_name,
160            to_pipeline_version(pipeline_info.pipeline_version.as_deref())
161                .context(PipelineSnafu)?,
162            None,
163        )
164        .context(PipelineSnafu)?;
165        let pipeline_handler = pipeline_handler.context(InternalSnafu {
166            err_msg: "pipeline handler is not set".to_string(),
167        })?;
168
169        processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
170    }
171
172    let mut req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor)?;
173
174    let req = if processor.use_pipeline {
175        processor.exec_pipeline().await?
176    } else {
177        req.as_insert_requests()
178    };
179
180    let outcome = write_prometheus_rows(
181        prom_store_handler,
182        pending_rows_batcher,
183        prom_store_with_metric_engine,
184        &db,
185        query_ctx,
186        req,
187    )
188    .await?;
189
190    Ok((
191        StatusCode::NO_CONTENT,
192        write_cost_header_map(outcome.write_cost),
193    )
194        .into_response())
195}
196
197async fn remote_write_v2(
198    state: PromStoreState,
199    params: RemoteWriteQuery,
200    query_ctx: QueryContext,
201    pipeline_info: PipelineInfo,
202    is_zstd: bool,
203    body: Bytes,
204) -> Result<axum::response::Response> {
205    let PromStoreState {
206        prom_store_handler,
207        pipeline_handler: _,
208        prom_store_with_metric_engine,
209        prom_validation_mode: _,
210        pending_rows_batcher,
211    } = state;
212
213    if let Some(response) = vm_proto_version_response(&params) {
214        return Ok(response);
215    }
216
217    // Pipeline processing is not supported for remote write v2 yet. Ignore the
218    // optional pipeline parameter and ingest samples directly.
219    let _ = pipeline_info;
220
221    let (db, query_ctx, _timer) =
222        prepare_remote_write_context(&params, query_ctx, REMOTE_WRITE_V2_VERSION);
223
224    let request = match decode_remote_write_v2_request(is_zstd, body) {
225        Ok(request) => request,
226        Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)),
227    };
228    let req = match request.into_context_req() {
229        Ok(req) => req,
230        Err(error) => return Ok(remote_write_v2_error_response(error, 0, 0, 0)),
231    };
232
233    let outcome = match write_prometheus_rows_with_progress(
234        prom_store_handler,
235        pending_rows_batcher,
236        prom_store_with_metric_engine,
237        &db,
238        query_ctx,
239        req,
240    )
241    .await
242    {
243        Ok(outcome) => outcome,
244        Err(error) => {
245            return Ok(remote_write_v2_error_response(
246                error.error,
247                error.rows_written,
248                0,
249                0,
250            ));
251        }
252    };
253
254    let mut headers = write_cost_header_map(outcome.write_cost);
255    append_remote_write_v2_written_headers(&mut headers, outcome.rows_written, 0, 0);
256
257    Ok((StatusCode::NO_CONTENT, headers).into_response())
258}
259
260fn vm_proto_version_response(params: &RemoteWriteQuery) -> Option<axum::response::Response> {
261    params
262        .get_vm_proto_version
263        .as_ref()
264        .map(|_| VM_PROTO_VERSION.into_response())
265}
266
267fn prepare_remote_write_context(
268    params: &RemoteWriteQuery,
269    mut query_ctx: QueryContext,
270    remote_write_version: &str,
271) -> (String, Arc<QueryContext>, HistogramTimer) {
272    let db = params.db.clone().unwrap_or_default();
273    query_ctx.set_channel(Channel::Prometheus);
274    let physical_table = params
275        .physical_table
276        .clone()
277        .unwrap_or_else(|| GREPTIME_PHYSICAL_TABLE.to_string());
278    query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
279    // Stamp the Prometheus metric identity here, before `as_req_iter` splits into the
280    // batched and direct write paths, so both inherit it (the batched path bypasses
281    // `PromStoreProtocolHandler::write`). Prometheus remote-write metadata is weak
282    // here, so the type is inferred from naming.
283    query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_METRIC);
284    query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_PROMETHEUS);
285    query_ctx.set_extension(SEMANTIC_SOURCE_VERSION, remote_write_version);
286    query_ctx.set_extension(SEMANTIC_METRIC_METADATA_QUALITY, METADATA_QUALITY_INFERRED);
287    let query_ctx = Arc::new(query_ctx);
288    let timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
289        .with_label_values(&[db.as_str()])
290        .start_timer();
291
292    (db, query_ctx, timer)
293}
294
295struct PromWriteOutcome {
296    write_cost: usize,
297    rows_written: u64,
298}
299
300struct PromWriteError {
301    error: error::Error,
302    rows_written: u64,
303}
304
305async fn write_prometheus_rows(
306    prom_store_handler: PromStoreProtocolHandlerRef,
307    pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
308    prom_store_with_metric_engine: bool,
309    db: &str,
310    query_ctx: Arc<QueryContext>,
311    req: ContextReq,
312) -> Result<PromWriteOutcome> {
313    write_prometheus_rows_with_progress(
314        prom_store_handler,
315        pending_rows_batcher,
316        prom_store_with_metric_engine,
317        db,
318        query_ctx,
319        req,
320    )
321    .await
322    .map_err(|error| error.error)
323}
324
325async fn write_prometheus_rows_with_progress(
326    prom_store_handler: PromStoreProtocolHandlerRef,
327    pending_rows_batcher: Option<Arc<PendingRowsBatcher>>,
328    prom_store_with_metric_engine: bool,
329    db: &str,
330    query_ctx: Arc<QueryContext>,
331    req: ContextReq,
332) -> std::result::Result<PromWriteOutcome, PromWriteError> {
333    if prom_store_with_metric_engine && let Some(batcher) = pending_rows_batcher {
334        let mut rows_written = 0;
335        for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
336            prom_store_handler
337                .pre_write(&reqs, temp_ctx.clone())
338                .await
339                .map_err(|error| PromWriteError {
340                    error,
341                    rows_written,
342                })?;
343            let rows = batcher
344                .submit(reqs, temp_ctx)
345                .await
346                .map_err(|error| PromWriteError {
347                    error,
348                    rows_written,
349                })?;
350            crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
351                .with_label_values(&[db])
352                .inc_by(rows);
353            rows_written += rows;
354        }
355        return Ok(PromWriteOutcome {
356            write_cost: 0,
357            rows_written,
358        });
359    }
360
361    let mut write_cost = 0;
362    let mut rows_written = 0;
363    for (temp_ctx, reqs) in req.as_req_iter(query_ctx) {
364        let cnt: u64 = reqs
365            .inserts
366            .iter()
367            .filter_map(|s| s.rows.as_ref().map(|r| r.rows.len() as u64))
368            .sum();
369        let output = prom_store_handler
370            .write(reqs, temp_ctx, prom_store_with_metric_engine)
371            .await
372            .map_err(|error| PromWriteError {
373                error,
374                rows_written,
375            })?;
376        crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
377            .with_label_values(&[db])
378            .inc_by(cnt);
379        write_cost += output.meta.cost;
380        rows_written += cnt;
381    }
382
383    Ok(PromWriteOutcome {
384        write_cost,
385        rows_written,
386    })
387}
388
389fn remote_write_v2_error_response(
390    error: error::Error,
391    samples: u64,
392    histograms: u64,
393    exemplars: u64,
394) -> axum::response::Response {
395    let mut response = error.into_response();
396    append_remote_write_v2_written_headers(response.headers_mut(), samples, histograms, exemplars);
397    response
398}
399
400fn append_remote_write_v2_written_headers(
401    headers: &mut HeaderMap,
402    samples: u64,
403    histograms: u64,
404    exemplars: u64,
405) {
406    headers.insert(
407        REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER,
408        HeaderValue::from_str(&samples.to_string()).expect("u64 header value is valid"),
409    );
410    headers.insert(
411        REMOTE_WRITE_V2_HISTOGRAMS_WRITTEN_HEADER,
412        HeaderValue::from_str(&histograms.to_string()).expect("u64 header value is valid"),
413    );
414    headers.insert(
415        REMOTE_WRITE_V2_EXEMPLARS_WRITTEN_HEADER,
416        HeaderValue::from_str(&exemplars.to_string()).expect("u64 header value is valid"),
417    );
418}
419
420enum RemoteWriteProto {
421    V1,
422    V2,
423    Unsupported(mime::Mime),
424}
425
426// ref: https://github.com/prometheus/client_golang/blob/74560058a7af7a695db8196c8e84a0754032c6af/exp/api/remote/remote_api.go#L544
427fn remote_write_proto(content_type: Option<TypedHeader<headers::ContentType>>) -> RemoteWriteProto {
428    let Some(TypedHeader(content_type)) = content_type else {
429        return RemoteWriteProto::V1;
430    };
431
432    let mime_type: mime::Mime = content_type.into();
433    if !mime_type
434        .essence_str()
435        .eq_ignore_ascii_case(CONTENT_TYPE_PROTOBUF_STR)
436    {
437        return RemoteWriteProto::Unsupported(mime_type);
438    }
439
440    for (name, value) in mime_type.params() {
441        if !name.as_str().eq_ignore_ascii_case(CONTENT_TYPE_PROTO_PARAM) {
442            continue;
443        }
444
445        return match value.as_str() {
446            REMOTE_WRITE_V1_PROTO => RemoteWriteProto::V1,
447            REMOTE_WRITE_V2_PROTO => RemoteWriteProto::V2,
448            _ => RemoteWriteProto::Unsupported(mime_type.clone()),
449        };
450    }
451
452    RemoteWriteProto::V1
453}
454
455fn unsupported_remote_write_v2_encoding_response(
456    content_encoding: &headers::ContentEncoding,
457) -> Option<axum::response::Response> {
458    if content_encoding.contains(DEFAULT_ENCODING) || content_encoding.contains(VM_ENCODING) {
459        return None;
460    }
461
462    Some((
463        StatusCode::UNSUPPORTED_MEDIA_TYPE,
464        format!(
465            "unsupported prometheus remote write content encoding: only {DEFAULT_ENCODING} and {VM_ENCODING} are supported"
466        ),
467    )
468        .into_response())
469}
470
471impl IntoResponse for PromStoreResponse {
472    fn into_response(self) -> axum::response::Response {
473        let mut header_map = HeaderMap::new();
474        header_map.insert(&header::CONTENT_TYPE, self.content_type);
475        header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
476
477        let metrics = if self.resp_metrics.is_empty() {
478            None
479        } else {
480            serde_json::to_string(&self.resp_metrics).ok()
481        };
482        if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
483            header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
484        }
485
486        (header_map, self.body).into_response()
487    }
488}
489
490#[axum_macros::debug_handler]
491#[tracing::instrument(
492    skip_all,
493    fields(protocol = "prometheus", request_type = "remote_read")
494)]
495pub async fn remote_read(
496    State(state): State<PromStoreState>,
497    Query(params): Query<RemoteWriteQuery>,
498    Extension(mut query_ctx): Extension<QueryContext>,
499    body: Bytes,
500) -> Result<PromStoreResponse> {
501    let db = params.db.clone().unwrap_or_default();
502    query_ctx.set_channel(Channel::Prometheus);
503
504    let request = decode_remote_read_request(body).await?;
505
506    // Extract schema from special labels and set it in query context
507    if let Some(schema) = extract_schema_from_read_request(&request) {
508        query_ctx.set_current_schema(&schema);
509    }
510
511    let query_ctx = Arc::new(query_ctx);
512    let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_READ_ELAPSED
513        .with_label_values(&[db.as_str()])
514        .start_timer();
515
516    state.prom_store_handler.read(request, query_ctx).await
517}
518
519async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
520    let buf = snappy_decompress(&body[..])?;
521
522    ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
523}
524
525#[cfg(test)]
526mod tests {
527    use api::prom_store::remote::ReadRequest;
528    use api::v1::RowInsertRequests;
529    use async_trait::async_trait;
530    use common_query::Output;
531    use pipeline::GreptimePipelineParams;
532    use session::context::{QueryContext, QueryContextRef};
533
534    use super::*;
535    use crate::prom_remote_write::validation::PromValidationMode;
536    use crate::prom_store::Metrics;
537    use crate::query_handler::PromStoreProtocolHandler;
538
539    #[test]
540    fn test_remote_write_proto() {
541        assert!(matches!(
542            remote_write_proto(content_type(
543                "application/x-protobuf;proto=io.prometheus.write.v2.Request"
544            )),
545            RemoteWriteProto::V2
546        ));
547        assert!(matches!(
548            remote_write_proto(content_type(
549                "application/x-protobuf; proto=\"io.prometheus.write.v2.Request\""
550            )),
551            RemoteWriteProto::V2
552        ));
553        assert!(matches!(
554            remote_write_proto(content_type(
555                "APPLICATION/X-PROTOBUF;proto=io.prometheus.write.v2.Request"
556            )),
557            RemoteWriteProto::V2
558        ));
559        assert!(matches!(
560            remote_write_proto(content_type("application/x-protobuf")),
561            RemoteWriteProto::V1
562        ));
563        assert!(matches!(
564            remote_write_proto(content_type(
565                "application/x-protobuf;proto=prometheus.WriteRequest"
566            )),
567            RemoteWriteProto::V1
568        ));
569        assert!(matches!(
570            remote_write_proto(content_type(
571                "application/x-protobuf;proto=unknown.WriteRequest"
572            )),
573            RemoteWriteProto::Unsupported(_)
574        ));
575        assert!(matches!(
576            remote_write_proto(content_type(
577                "application/json;proto=io.prometheus.write.v2.Request"
578            )),
579            RemoteWriteProto::Unsupported(_)
580        ));
581        assert!(matches!(remote_write_proto(None), RemoteWriteProto::V1));
582    }
583
584    fn content_type(value: &str) -> Option<TypedHeader<headers::ContentType>> {
585        Some(TypedHeader(std::str::FromStr::from_str(value).unwrap()))
586    }
587
588    #[test]
589    fn test_prepare_remote_write_context_stamps_semantics() {
590        let (_, query_ctx, _timer) = prepare_remote_write_context(
591            &RemoteWriteQuery::default(),
592            QueryContext::with("greptime", "public"),
593            REMOTE_WRITE_V2_VERSION,
594        );
595
596        assert_eq!(
597            query_ctx.extension(SEMANTIC_SIGNAL_TYPE),
598            Some(SIGNAL_TYPE_METRIC)
599        );
600        assert_eq!(
601            query_ctx.extension(SEMANTIC_SOURCE),
602            Some(SOURCE_PROMETHEUS)
603        );
604        assert_eq!(
605            query_ctx.extension(SEMANTIC_SOURCE_VERSION),
606            Some(REMOTE_WRITE_V2_VERSION)
607        );
608        assert_eq!(
609            query_ctx.extension(SEMANTIC_METRIC_METADATA_QUALITY),
610            Some(METADATA_QUALITY_INFERRED)
611        );
612    }
613
614    #[tokio::test]
615    async fn test_remote_write_v2_ignores_pipeline() {
616        let request = api::greptime_proto::io::prometheus::write::v2::Request {
617            symbols: vec![String::new()],
618            timeseries: Vec::new(),
619        };
620        let body =
621            Bytes::from(crate::prom_store::snappy_compress(&request.encode_to_vec()).unwrap());
622
623        let response = remote_write_v2(
624            test_state(),
625            RemoteWriteQuery::default(),
626            QueryContext::with("greptime", "public"),
627            pipeline_info(Some("pipeline")),
628            false,
629            body,
630        )
631        .await
632        .unwrap();
633
634        assert_eq!(response.status(), StatusCode::NO_CONTENT);
635        assert_eq!(
636            Some("0"),
637            response
638                .headers()
639                .get(REMOTE_WRITE_V2_SAMPLES_WRITTEN_HEADER)
640                .map(|x| x.to_str().unwrap())
641        );
642    }
643
644    fn test_state() -> PromStoreState {
645        PromStoreState {
646            prom_store_handler: Arc::new(NoopPromStoreHandler),
647            pipeline_handler: None,
648            prom_store_with_metric_engine: false,
649            prom_validation_mode: PromValidationMode::Strict,
650            pending_rows_batcher: None,
651        }
652    }
653
654    fn pipeline_info(pipeline_name: Option<&str>) -> PipelineInfo {
655        PipelineInfo {
656            pipeline_name: pipeline_name.map(ToString::to_string),
657            pipeline_version: None,
658            pipeline_params: GreptimePipelineParams::default(),
659        }
660    }
661
662    struct NoopPromStoreHandler;
663
664    #[async_trait]
665    impl PromStoreProtocolHandler for NoopPromStoreHandler {
666        async fn write(
667            &self,
668            _request: RowInsertRequests,
669            _ctx: QueryContextRef,
670            _with_metric_engine: bool,
671        ) -> Result<Output> {
672            unreachable!("empty remote write v2 request should not write")
673        }
674
675        async fn read(
676            &self,
677            _request: ReadRequest,
678            _ctx: QueryContextRef,
679        ) -> Result<PromStoreResponse> {
680            unimplemented!()
681        }
682
683        async fn ingest_metrics(&self, _metrics: Metrics) -> Result<()> {
684            unimplemented!()
685        }
686    }
687}