servers/http/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::BufRead;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Instant;
19
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use axum::body::Bytes;
23use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
24use axum::http::header::CONTENT_TYPE;
25use axum::http::{HeaderMap, StatusCode};
26use axum::response::{IntoResponse, Response};
27use axum::{Extension, Json};
28use axum_extra::TypedHeader;
29use common_error::ext::ErrorExt;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use datatypes::value::column_data_to_json;
33use headers::ContentType;
34use lazy_static::lazy_static;
35use pipeline::util::to_pipeline_version;
36use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap};
37use serde::{Deserialize, Serialize};
38use serde_json::{json, Deserializer, Map, Value};
39use session::context::{Channel, QueryContext, QueryContextRef};
40use snafu::{ensure, OptionExt, ResultExt};
41
42use crate::error::{
43    status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu,
44    Result, UnsupportedContentTypeSnafu,
45};
46use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
47use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
48use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
49use crate::http::result::greptime_result_v1::GreptimedbV1Response;
50use crate::http::HttpResponse;
51use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
52use crate::metrics::{
53    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
54    METRIC_SUCCESS_VALUE,
55};
56use crate::pipeline::run_pipeline;
57use crate::query_handler::PipelineHandlerRef;
58
59const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
60
61lazy_static! {
62    pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
63    pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
64    pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
65    pub static ref PB_CONTENT_TYPE: ContentType =
66        ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
67    pub static ref NDJSON_CONTENT_TYPE: ContentType =
68        ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
69}
70
71/// LogIngesterQueryParams is used for query params of log ingester API.
72#[derive(Debug, Default, Serialize, Deserialize)]
73pub struct LogIngesterQueryParams {
74    /// The database where log data will be written to.
75    pub db: Option<String>,
76    /// The table where log data will be written to.
77    pub table: Option<String>,
78    /// The pipeline that will be used for log ingestion.
79    pub pipeline_name: Option<String>,
80    /// The version of the pipeline to be used for log ingestion.
81    pub version: Option<String>,
82    /// Whether to ignore errors during log ingestion.
83    pub ignore_errors: Option<bool>,
84    /// The source of the log data.
85    pub source: Option<String>,
86    /// The JSON field name of the log message. If not provided, it will take the whole log as the message.
87    /// The field must be at the top level of the JSON structure.
88    pub msg_field: Option<String>,
89    /// Specify a custom time index from the input data rather than server's arrival time.
90    /// Valid formats:
91    /// - <field_name>;epoch;<resolution>
92    /// - <field_name>;datestr;<format>
93    ///
94    /// If an error occurs while parsing the config, the error will be returned in the response.
95    /// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored.
96    /// If so, use the current server's timestamp as the event time.
97    pub custom_time_index: Option<String>,
98}
99
100/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
101/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
102#[derive(Debug, PartialEq)]
103pub(crate) struct PipelineIngestRequest {
104    /// The table where the log data will be written to.
105    pub table: String,
106    /// The log data to be ingested.
107    pub values: Vec<PipelineMap>,
108}
109
110pub struct PipelineContent(String);
111
112impl<S> FromRequest<S> for PipelineContent
113where
114    S: Send + Sync,
115{
116    type Rejection = Response;
117
118    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
119        let content_type_header = req.headers().get(CONTENT_TYPE);
120        let content_type = content_type_header.and_then(|value| value.to_str().ok());
121        if let Some(content_type) = content_type {
122            if content_type.ends_with("yaml") {
123                let payload = String::from_request(req, state)
124                    .await
125                    .map_err(IntoResponse::into_response)?;
126                return Ok(Self(payload));
127            }
128
129            if content_type.starts_with("multipart/form-data") {
130                let mut payload: Multipart = Multipart::from_request(req, state)
131                    .await
132                    .map_err(IntoResponse::into_response)?;
133                let file = payload
134                    .next_field()
135                    .await
136                    .map_err(IntoResponse::into_response)?;
137                let payload = file
138                    .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
139                    .text()
140                    .await
141                    .map_err(IntoResponse::into_response)?;
142                return Ok(Self(payload));
143            }
144        }
145
146        Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
147    }
148}
149
150#[axum_macros::debug_handler]
151pub async fn query_pipeline(
152    State(state): State<LogState>,
153    Extension(mut query_ctx): Extension<QueryContext>,
154    Query(query_params): Query<LogIngesterQueryParams>,
155    Path(pipeline_name): Path<String>,
156) -> Result<GreptimedbManageResponse> {
157    let start = Instant::now();
158    let handler = state.log_handler;
159    ensure!(
160        !pipeline_name.is_empty(),
161        InvalidParameterSnafu {
162            reason: "pipeline_name is required in path",
163        }
164    );
165
166    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
167
168    query_ctx.set_channel(Channel::Http);
169    let query_ctx = Arc::new(query_ctx);
170
171    let (pipeline, pipeline_version) = handler
172        .get_pipeline_str(&pipeline_name, version, query_ctx)
173        .await?;
174
175    Ok(GreptimedbManageResponse::from_pipeline(
176        pipeline_name,
177        query_params
178            .version
179            .unwrap_or(pipeline_version.0.to_iso8601_string()),
180        start.elapsed().as_millis() as u64,
181        Some(pipeline),
182    ))
183}
184
185#[axum_macros::debug_handler]
186pub async fn add_pipeline(
187    State(state): State<LogState>,
188    Path(pipeline_name): Path<String>,
189    Extension(mut query_ctx): Extension<QueryContext>,
190    PipelineContent(payload): PipelineContent,
191) -> Result<GreptimedbManageResponse> {
192    let start = Instant::now();
193    let handler = state.log_handler;
194    ensure!(
195        !pipeline_name.is_empty(),
196        InvalidParameterSnafu {
197            reason: "pipeline_name is required in path",
198        }
199    );
200    ensure!(
201        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
202        InvalidParameterSnafu {
203            reason: "pipeline_name cannot start with greptime_",
204        }
205    );
206    ensure!(
207        !payload.is_empty(),
208        InvalidParameterSnafu {
209            reason: "pipeline is required in body",
210        }
211    );
212
213    query_ctx.set_channel(Channel::Http);
214    let query_ctx = Arc::new(query_ctx);
215
216    let content_type = "yaml";
217    let result = handler
218        .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
219        .await;
220
221    result
222        .map(|pipeline| {
223            GreptimedbManageResponse::from_pipeline(
224                pipeline_name,
225                pipeline.0.to_timezone_aware_string(None),
226                start.elapsed().as_millis() as u64,
227                None,
228            )
229        })
230        .map_err(|e| {
231            error!(e; "failed to insert pipeline");
232            e
233        })
234}
235
236#[axum_macros::debug_handler]
237pub async fn delete_pipeline(
238    State(state): State<LogState>,
239    Extension(mut query_ctx): Extension<QueryContext>,
240    Query(query_params): Query<LogIngesterQueryParams>,
241    Path(pipeline_name): Path<String>,
242) -> Result<GreptimedbManageResponse> {
243    let start = Instant::now();
244    let handler = state.log_handler;
245    ensure!(
246        !pipeline_name.is_empty(),
247        InvalidParameterSnafu {
248            reason: "pipeline_name is required",
249        }
250    );
251
252    let version_str = query_params.version.context(InvalidParameterSnafu {
253        reason: "version is required",
254    })?;
255
256    let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
257
258    query_ctx.set_channel(Channel::Http);
259    let query_ctx = Arc::new(query_ctx);
260
261    handler
262        .delete_pipeline(&pipeline_name, version, query_ctx)
263        .await
264        .map(|v| {
265            if v.is_some() {
266                GreptimedbManageResponse::from_pipeline(
267                    pipeline_name,
268                    version_str,
269                    start.elapsed().as_millis() as u64,
270                    None,
271                )
272            } else {
273                GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
274            }
275        })
276        .map_err(|e| {
277            error!(e; "failed to delete pipeline");
278            e
279        })
280}
281
282/// Transform NDJSON array into a single array
283/// always return an array
284fn transform_ndjson_array_factory(
285    values: impl IntoIterator<Item = Result<Value, serde_json::Error>>,
286    ignore_error: bool,
287) -> Result<Vec<Value>> {
288    values
289        .into_iter()
290        .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
291            Ok(item_value) => {
292                match item_value {
293                    Value::Array(item_array) => {
294                        acc_array.extend(item_array);
295                    }
296                    Value::Object(_) => {
297                        acc_array.push(item_value);
298                    }
299                    _ => {
300                        if !ignore_error {
301                            warn!("invalid item in array: {:?}", item_value);
302                            return InvalidParameterSnafu {
303                                reason: format!("invalid item:{} in array", item_value),
304                            }
305                            .fail();
306                        }
307                    }
308                }
309                Ok(acc_array)
310            }
311            Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
312            Err(_) => {
313                warn!("invalid item in array: {:?}", item);
314                Ok(acc_array)
315            }
316        })
317}
318
319/// Dryrun pipeline with given data
320async fn dryrun_pipeline_inner(
321    value: Vec<PipelineMap>,
322    pipeline: Arc<pipeline::Pipeline>,
323    pipeline_handler: PipelineHandlerRef,
324    query_ctx: &QueryContextRef,
325) -> Result<Response> {
326    let params = GreptimePipelineParams::default();
327
328    let pipeline_def = PipelineDefinition::Resolved(pipeline);
329    let pipeline_ctx = PipelineContext::new(&pipeline_def, &params);
330    let results = run_pipeline(
331        &pipeline_handler,
332        &pipeline_ctx,
333        PipelineIngestRequest {
334            table: "dry_run".to_owned(),
335            values: value,
336        },
337        query_ctx,
338        true,
339    )
340    .await?;
341
342    let colume_type_key = "colume_type";
343    let data_type_key = "data_type";
344    let name_key = "name";
345
346    let results = results
347        .into_iter()
348        .filter_map(|row| {
349            if let Some(rows) = row.rows {
350                let table_name = row.table_name;
351                let schema = rows.schema;
352
353                let schema = schema
354                    .iter()
355                    .map(|cs| {
356                        let mut map = Map::new();
357                        map.insert(name_key.to_string(), Value::String(cs.column_name.clone()));
358                        map.insert(
359                            data_type_key.to_string(),
360                            Value::String(cs.datatype().as_str_name().to_string()),
361                        );
362                        map.insert(
363                            colume_type_key.to_string(),
364                            Value::String(cs.semantic_type().as_str_name().to_string()),
365                        );
366                        map.insert(
367                            "fulltext".to_string(),
368                            Value::Bool(
369                                cs.options
370                                    .clone()
371                                    .is_some_and(|x| x.options.contains_key("fulltext")),
372                            ),
373                        );
374                        Value::Object(map)
375                    })
376                    .collect::<Vec<_>>();
377
378                let rows = rows
379                    .rows
380                    .into_iter()
381                    .map(|row| {
382                        row.values
383                            .into_iter()
384                            .enumerate()
385                            .map(|(idx, v)| {
386                                v.value_data
387                                    .map(|d| {
388                                        let mut map = Map::new();
389                                        map.insert("value".to_string(), column_data_to_json(d));
390                                        map.insert(
391                                            "key".to_string(),
392                                            schema[idx][name_key].clone(),
393                                        );
394                                        map.insert(
395                                            "semantic_type".to_string(),
396                                            schema[idx][colume_type_key].clone(),
397                                        );
398                                        map.insert(
399                                            "data_type".to_string(),
400                                            schema[idx][data_type_key].clone(),
401                                        );
402                                        Value::Object(map)
403                                    })
404                                    .unwrap_or(Value::Null)
405                            })
406                            .collect()
407                    })
408                    .collect();
409
410                let mut result = Map::new();
411                result.insert("schema".to_string(), Value::Array(schema));
412                result.insert("rows".to_string(), Value::Array(rows));
413                result.insert("table_name".to_string(), Value::String(table_name));
414                let result = Value::Object(result);
415                Some(result)
416            } else {
417                None
418            }
419        })
420        .collect();
421    Ok(Json(Value::Array(results)).into_response())
422}
423
424/// Dryrun pipeline with given data
425/// pipeline_name and pipeline_version to specify pipeline stored in db
426/// pipeline to specify pipeline raw content
427/// data to specify data
428/// data maght be list of string or list of object
429#[derive(Debug, Default, Serialize, Deserialize)]
430pub struct PipelineDryrunParams {
431    pub pipeline_name: Option<String>,
432    pub pipeline_version: Option<String>,
433    pub pipeline: Option<String>,
434    pub data: Vec<Value>,
435}
436
437/// Check if the payload is valid json
438/// Check if the payload contains pipeline or pipeline_name and data
439/// Return Some if valid, None if invalid
440fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
441    match serde_json::from_slice::<PipelineDryrunParams>(payload) {
442        // payload with pipeline or pipeline_name and data is array
443        Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
444        // because of the pipeline_name or pipeline is required
445        Ok(_) => None,
446        // invalid json
447        Err(_) => None,
448    }
449}
450
451/// Check if the pipeline_name exists
452fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
453    pipeline_name.context(InvalidParameterSnafu {
454        reason: "pipeline_name is required",
455    })
456}
457
458/// Check if the data length less than 10
459fn check_data_valid(data_len: usize) -> Result<()> {
460    ensure!(
461        data_len <= 10,
462        InvalidParameterSnafu {
463            reason: "data is required",
464        }
465    );
466    Ok(())
467}
468
469fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
470    let body = Json(json!({
471        "error": format!("{}: {}", step_msg,e.output_msg()),
472    }));
473
474    (status_code_to_http_status(&e.status_code()), body).into_response()
475}
476
477#[axum_macros::debug_handler]
478pub async fn pipeline_dryrun(
479    State(log_state): State<LogState>,
480    Query(query_params): Query<LogIngesterQueryParams>,
481    Extension(mut query_ctx): Extension<QueryContext>,
482    TypedHeader(content_type): TypedHeader<ContentType>,
483    payload: Bytes,
484) -> Result<Response> {
485    let handler = log_state.log_handler;
486
487    query_ctx.set_channel(Channel::Http);
488    let query_ctx = Arc::new(query_ctx);
489
490    match check_pipeline_dryrun_params_valid(&payload) {
491        Some(params) => {
492            let data = pipeline::json_array_to_map(params.data).context(PipelineSnafu)?;
493
494            check_data_valid(data.len())?;
495
496            match params.pipeline {
497                None => {
498                    let version = to_pipeline_version(params.pipeline_version.as_deref())
499                        .context(PipelineSnafu)?;
500                    let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
501                    let pipeline = handler
502                        .get_pipeline(&pipeline_name, version, query_ctx.clone())
503                        .await?;
504                    dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
505                }
506                Some(pipeline) => {
507                    let pipeline = handler.build_pipeline(&pipeline);
508                    match pipeline {
509                        Ok(pipeline) => {
510                            match dryrun_pipeline_inner(
511                                data,
512                                Arc::new(pipeline),
513                                handler,
514                                &query_ctx,
515                            )
516                            .await
517                            {
518                                Ok(response) => Ok(response),
519                                Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
520                                    "Failed to exec pipeline",
521                                    e,
522                                )),
523                            }
524                        }
525                        Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
526                            "Failed to build pipeline",
527                            e,
528                        )),
529                    }
530                }
531            }
532        }
533        None => {
534            // This path is for back compatibility with the previous dry run code
535            // where the payload is just data (JSON or plain text) and the pipeline name
536            // is specified using query param.
537            let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
538
539            let version =
540                to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
541
542            let ignore_errors = query_params.ignore_errors.unwrap_or(false);
543
544            let value =
545                extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
546
547            check_data_valid(value.len())?;
548
549            let pipeline = handler
550                .get_pipeline(&pipeline_name, version, query_ctx.clone())
551                .await?;
552
553            dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
554        }
555    }
556}
557
558#[axum_macros::debug_handler]
559pub async fn log_ingester(
560    State(log_state): State<LogState>,
561    Query(query_params): Query<LogIngesterQueryParams>,
562    Extension(mut query_ctx): Extension<QueryContext>,
563    TypedHeader(content_type): TypedHeader<ContentType>,
564    headers: HeaderMap,
565    payload: Bytes,
566) -> Result<HttpResponse> {
567    // validate source and payload
568    let source = query_params.source.as_deref();
569    let response = match &log_state.log_validator {
570        Some(validator) => validator.validate(source, &payload).await,
571        None => None,
572    };
573    if let Some(response) = response {
574        return response;
575    }
576
577    let handler = log_state.log_handler;
578
579    let table_name = query_params.table.context(InvalidParameterSnafu {
580        reason: "table is required",
581    })?;
582
583    let ignore_errors = query_params.ignore_errors.unwrap_or(false);
584
585    let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
586        reason: "pipeline_name is required",
587    })?;
588    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
589    let pipeline = PipelineDefinition::from_name(
590        &pipeline_name,
591        version,
592        query_params.custom_time_index.map(|s| (s, ignore_errors)),
593    )
594    .context(PipelineSnafu)?;
595
596    let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
597
598    query_ctx.set_channel(Channel::Http);
599    let query_ctx = Arc::new(query_ctx);
600
601    let value = log_state
602        .ingest_interceptor
603        .as_ref()
604        .pre_pipeline(value, query_ctx.clone())?;
605
606    ingest_logs_inner(
607        handler,
608        pipeline,
609        vec![PipelineIngestRequest {
610            table: table_name,
611            values: value,
612        }],
613        query_ctx,
614        headers,
615    )
616    .await
617}
618
619fn extract_pipeline_value_by_content_type(
620    content_type: ContentType,
621    payload: Bytes,
622    ignore_errors: bool,
623) -> Result<Vec<PipelineMap>> {
624    Ok(match content_type {
625        ct if ct == *JSON_CONTENT_TYPE => {
626            // `simd_json` have not support stream and ndjson, see https://github.com/simd-lite/simd-json/issues/349
627            pipeline::json_array_to_map(transform_ndjson_array_factory(
628                Deserializer::from_slice(&payload).into_iter(),
629                ignore_errors,
630            )?)
631            .context(PipelineSnafu)?
632        }
633        ct if ct == *NDJSON_CONTENT_TYPE => {
634            let mut result = Vec::with_capacity(1000);
635            for (index, line) in payload.lines().enumerate() {
636                let mut line = match line {
637                    Ok(line) if !line.is_empty() => line,
638                    Ok(_) => continue, // Skip empty lines
639                    Err(_) if ignore_errors => continue,
640                    Err(e) => {
641                        warn!(e; "invalid string at index: {}", index);
642                        return InvalidParameterSnafu {
643                            reason: format!("invalid line at index: {}", index),
644                        }
645                        .fail();
646                    }
647                };
648
649                // simd_json, according to description, only de-escapes string at character level,
650                // like any other json parser. So it should be safe here.
651                if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
652                    let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
653                    result.push(v);
654                } else if !ignore_errors {
655                    warn!("invalid JSON at index: {}, content: {:?}", index, line);
656                    return InvalidParameterSnafu {
657                        reason: format!("invalid JSON at index: {}", index),
658                    }
659                    .fail();
660                }
661            }
662            result
663        }
664        ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
665            .lines()
666            .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
667            .map(|line| {
668                let mut map = PipelineMap::new();
669                map.insert("message".to_string(), pipeline::Value::String(line));
670                map
671            })
672            .collect::<Vec<_>>(),
673
674        _ => UnsupportedContentTypeSnafu { content_type }.fail()?,
675    })
676}
677
678pub(crate) async fn ingest_logs_inner(
679    handler: PipelineHandlerRef,
680    pipeline: PipelineDefinition,
681    log_ingest_requests: Vec<PipelineIngestRequest>,
682    query_ctx: QueryContextRef,
683    headers: HeaderMap,
684) -> Result<HttpResponse> {
685    let db = query_ctx.get_db_string();
686    let exec_timer = std::time::Instant::now();
687
688    let mut insert_requests = Vec::with_capacity(log_ingest_requests.len());
689
690    let pipeline_params = GreptimePipelineParams::from_params(
691        headers
692            .get(GREPTIME_PIPELINE_PARAMS_HEADER)
693            .and_then(|v| v.to_str().ok()),
694    );
695
696    let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params);
697    for pipeline_req in log_ingest_requests {
698        let requests =
699            run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
700
701        insert_requests.extend(requests);
702    }
703
704    let output = handler
705        .insert(
706            RowInsertRequests {
707                inserts: insert_requests,
708            },
709            query_ctx,
710        )
711        .await;
712
713    if let Ok(Output {
714        data: OutputData::AffectedRows(rows),
715        meta: _,
716    }) = &output
717    {
718        METRIC_HTTP_LOGS_INGESTION_COUNTER
719            .with_label_values(&[db.as_str()])
720            .inc_by(*rows as u64);
721        METRIC_HTTP_LOGS_INGESTION_ELAPSED
722            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
723            .observe(exec_timer.elapsed().as_secs_f64());
724    } else {
725        METRIC_HTTP_LOGS_INGESTION_ELAPSED
726            .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
727            .observe(exec_timer.elapsed().as_secs_f64());
728    }
729
730    let response = GreptimedbV1Response::from_output(vec![output])
731        .await
732        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
733    Ok(response)
734}
735
736#[async_trait]
737pub trait LogValidator: Send + Sync {
738    /// validate payload by source before processing
739    /// Return a `Some` result to indicate validation failure.
740    async fn validate(&self, source: Option<&str>, payload: &Bytes)
741        -> Option<Result<HttpResponse>>;
742}
743
744pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
745
746/// axum state struct to hold log handler and validator
747#[derive(Clone)]
748pub struct LogState {
749    pub log_handler: PipelineHandlerRef,
750    pub log_validator: Option<LogValidatorRef>,
751    pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
752}
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757
758    #[test]
759    fn test_transform_ndjson() {
760        let s = "{\"a\": 1}\n{\"b\": 2}";
761        let a = Value::Array(
762            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
763        )
764        .to_string();
765        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
766
767        let s = "{\"a\": 1}";
768        let a = Value::Array(
769            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
770        )
771        .to_string();
772        assert_eq!(a, "[{\"a\":1}]");
773
774        let s = "[{\"a\": 1}]";
775        let a = Value::Array(
776            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
777        )
778        .to_string();
779        assert_eq!(a, "[{\"a\":1}]");
780
781        let s = "[{\"a\": 1}, {\"b\": 2}]";
782        let a = Value::Array(
783            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
784        )
785        .to_string();
786        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
787    }
788
789    #[test]
790    fn test_extract_by_content() {
791        let payload = r#"
792        {"a": 1}
793        {"b": 2"}
794        {"c": 1}
795"#
796        .as_bytes();
797        let payload = Bytes::from_static(payload);
798
799        let fail_rest =
800            extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
801        assert!(fail_rest.is_ok());
802        assert_eq!(
803            fail_rest.unwrap(),
804            pipeline::json_array_to_map(vec![json!({"a": 1})]).unwrap()
805        );
806
807        let fail_only_wrong =
808            extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
809        assert!(fail_only_wrong.is_ok());
810
811        let mut map1 = PipelineMap::new();
812        map1.insert("a".to_string(), pipeline::Value::Uint64(1));
813        let mut map2 = PipelineMap::new();
814        map2.insert("c".to_string(), pipeline::Value::Uint64(1));
815        assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
816    }
817}