servers/http/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::io::BufRead;
17use std::str::FromStr;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_trait::async_trait;
22use axum::body::Bytes;
23use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
24use axum::http::header::CONTENT_TYPE;
25use axum::http::{HeaderMap, StatusCode};
26use axum::response::{IntoResponse, Response};
27use axum::{Extension, Json};
28use axum_extra::TypedHeader;
29use common_error::ext::ErrorExt;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use datatypes::value::column_data_to_json;
33use headers::ContentType;
34use lazy_static::lazy_static;
35use pipeline::util::to_pipeline_version;
36use pipeline::{
37    ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap,
38};
39use serde::{Deserialize, Serialize};
40use serde_json::{json, Deserializer, Map, Value};
41use session::context::{Channel, QueryContext, QueryContextRef};
42use snafu::{ensure, OptionExt, ResultExt};
43use strum::{EnumIter, IntoEnumIterator};
44
45use crate::error::{
46    status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
47};
48use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
49use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
50use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
51use crate::http::result::greptime_result_v1::GreptimedbV1Response;
52use crate::http::HttpResponse;
53use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
54use crate::metrics::{
55    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
56    METRIC_SUCCESS_VALUE,
57};
58use crate::pipeline::run_pipeline;
59use crate::query_handler::PipelineHandlerRef;
60
61const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
62
63lazy_static! {
64    pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
65    pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
66    pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
67    pub static ref PB_CONTENT_TYPE: ContentType =
68        ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
69    pub static ref NDJSON_CONTENT_TYPE: ContentType =
70        ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
71}
72
73/// LogIngesterQueryParams is used for query params of log ingester API.
74#[derive(Debug, Default, Serialize, Deserialize)]
75pub struct LogIngesterQueryParams {
76    /// The database where log data will be written to.
77    pub db: Option<String>,
78    /// The table where log data will be written to.
79    pub table: Option<String>,
80    /// The pipeline that will be used for log ingestion.
81    pub pipeline_name: Option<String>,
82    /// The version of the pipeline to be used for log ingestion.
83    pub version: Option<String>,
84    /// Whether to ignore errors during log ingestion.
85    pub ignore_errors: Option<bool>,
86    /// The source of the log data.
87    pub source: Option<String>,
88    /// The JSON field name of the log message. If not provided, it will take the whole log as the message.
89    /// The field must be at the top level of the JSON structure.
90    pub msg_field: Option<String>,
91    /// Specify a custom time index from the input data rather than server's arrival time.
92    /// Valid formats:
93    /// - <field_name>;epoch;<resolution>
94    /// - <field_name>;datestr;<format>
95    ///
96    /// If an error occurs while parsing the config, the error will be returned in the response.
97    /// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored.
98    /// If so, use the current server's timestamp as the event time.
99    pub custom_time_index: Option<String>,
100}
101
102/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
103/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
104#[derive(Debug, PartialEq)]
105pub(crate) struct PipelineIngestRequest {
106    /// The table where the log data will be written to.
107    pub table: String,
108    /// The log data to be ingested.
109    pub values: Vec<PipelineMap>,
110}
111
112pub struct PipelineContent(String);
113
114impl<S> FromRequest<S> for PipelineContent
115where
116    S: Send + Sync,
117{
118    type Rejection = Response;
119
120    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
121        let content_type_header = req.headers().get(CONTENT_TYPE);
122        let content_type = content_type_header.and_then(|value| value.to_str().ok());
123        if let Some(content_type) = content_type {
124            if content_type.ends_with("yaml") {
125                let payload = String::from_request(req, state)
126                    .await
127                    .map_err(IntoResponse::into_response)?;
128                return Ok(Self(payload));
129            }
130
131            if content_type.starts_with("multipart/form-data") {
132                let mut payload: Multipart = Multipart::from_request(req, state)
133                    .await
134                    .map_err(IntoResponse::into_response)?;
135                let file = payload
136                    .next_field()
137                    .await
138                    .map_err(IntoResponse::into_response)?;
139                let payload = file
140                    .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
141                    .text()
142                    .await
143                    .map_err(IntoResponse::into_response)?;
144                return Ok(Self(payload));
145            }
146        }
147
148        Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
149    }
150}
151
152#[axum_macros::debug_handler]
153pub async fn query_pipeline(
154    State(state): State<LogState>,
155    Extension(mut query_ctx): Extension<QueryContext>,
156    Query(query_params): Query<LogIngesterQueryParams>,
157    Path(pipeline_name): Path<String>,
158) -> Result<GreptimedbManageResponse> {
159    let start = Instant::now();
160    let handler = state.log_handler;
161    ensure!(
162        !pipeline_name.is_empty(),
163        InvalidParameterSnafu {
164            reason: "pipeline_name is required in path",
165        }
166    );
167
168    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
169
170    query_ctx.set_channel(Channel::Http);
171    let query_ctx = Arc::new(query_ctx);
172
173    let (pipeline, pipeline_version) = handler
174        .get_pipeline_str(&pipeline_name, version, query_ctx)
175        .await?;
176
177    Ok(GreptimedbManageResponse::from_pipeline(
178        pipeline_name,
179        query_params
180            .version
181            .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
182        start.elapsed().as_millis() as u64,
183        Some(pipeline),
184    ))
185}
186
187#[axum_macros::debug_handler]
188pub async fn add_pipeline(
189    State(state): State<LogState>,
190    Path(pipeline_name): Path<String>,
191    Extension(mut query_ctx): Extension<QueryContext>,
192    PipelineContent(payload): PipelineContent,
193) -> Result<GreptimedbManageResponse> {
194    let start = Instant::now();
195    let handler = state.log_handler;
196    ensure!(
197        !pipeline_name.is_empty(),
198        InvalidParameterSnafu {
199            reason: "pipeline_name is required in path",
200        }
201    );
202    ensure!(
203        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
204        InvalidParameterSnafu {
205            reason: "pipeline_name cannot start with greptime_",
206        }
207    );
208    ensure!(
209        !payload.is_empty(),
210        InvalidParameterSnafu {
211            reason: "pipeline is required in body",
212        }
213    );
214
215    query_ctx.set_channel(Channel::Http);
216    let query_ctx = Arc::new(query_ctx);
217
218    let content_type = "yaml";
219    let result = handler
220        .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
221        .await;
222
223    result
224        .map(|pipeline| {
225            GreptimedbManageResponse::from_pipeline(
226                pipeline_name,
227                pipeline.0.to_timezone_aware_string(None),
228                start.elapsed().as_millis() as u64,
229                None,
230            )
231        })
232        .map_err(|e| {
233            error!(e; "failed to insert pipeline");
234            e
235        })
236}
237
238#[axum_macros::debug_handler]
239pub async fn delete_pipeline(
240    State(state): State<LogState>,
241    Extension(mut query_ctx): Extension<QueryContext>,
242    Query(query_params): Query<LogIngesterQueryParams>,
243    Path(pipeline_name): Path<String>,
244) -> Result<GreptimedbManageResponse> {
245    let start = Instant::now();
246    let handler = state.log_handler;
247    ensure!(
248        !pipeline_name.is_empty(),
249        InvalidParameterSnafu {
250            reason: "pipeline_name is required",
251        }
252    );
253
254    let version_str = query_params.version.context(InvalidParameterSnafu {
255        reason: "version is required",
256    })?;
257
258    let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
259
260    query_ctx.set_channel(Channel::Http);
261    let query_ctx = Arc::new(query_ctx);
262
263    handler
264        .delete_pipeline(&pipeline_name, version, query_ctx)
265        .await
266        .map(|v| {
267            if v.is_some() {
268                GreptimedbManageResponse::from_pipeline(
269                    pipeline_name,
270                    version_str,
271                    start.elapsed().as_millis() as u64,
272                    None,
273                )
274            } else {
275                GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
276            }
277        })
278        .map_err(|e| {
279            error!(e; "failed to delete pipeline");
280            e
281        })
282}
283
284/// Transform NDJSON array into a single array
285/// always return an array
286fn transform_ndjson_array_factory(
287    values: impl IntoIterator<Item = Result<Value, serde_json::Error>>,
288    ignore_error: bool,
289) -> Result<Vec<Value>> {
290    values
291        .into_iter()
292        .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
293            Ok(item_value) => {
294                match item_value {
295                    Value::Array(item_array) => {
296                        acc_array.extend(item_array);
297                    }
298                    Value::Object(_) => {
299                        acc_array.push(item_value);
300                    }
301                    _ => {
302                        if !ignore_error {
303                            warn!("invalid item in array: {:?}", item_value);
304                            return InvalidParameterSnafu {
305                                reason: format!("invalid item: {} in array", item_value),
306                            }
307                            .fail();
308                        }
309                    }
310                }
311                Ok(acc_array)
312            }
313            Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
314            Err(_) => {
315                warn!("invalid item in array: {:?}", item);
316                Ok(acc_array)
317            }
318        })
319}
320
321/// Dryrun pipeline with given data
322async fn dryrun_pipeline_inner(
323    value: Vec<PipelineMap>,
324    pipeline: Arc<pipeline::Pipeline>,
325    pipeline_handler: PipelineHandlerRef,
326    query_ctx: &QueryContextRef,
327) -> Result<Response> {
328    let params = GreptimePipelineParams::default();
329
330    let pipeline_def = PipelineDefinition::Resolved(pipeline);
331    let pipeline_ctx = PipelineContext::new(&pipeline_def, &params, query_ctx.channel());
332    let results = run_pipeline(
333        &pipeline_handler,
334        &pipeline_ctx,
335        PipelineIngestRequest {
336            table: "dry_run".to_owned(),
337            values: value,
338        },
339        query_ctx,
340        true,
341    )
342    .await?;
343
344    let colume_type_key = "colume_type";
345    let data_type_key = "data_type";
346    let name_key = "name";
347
348    let results = results
349        .all_req()
350        .filter_map(|row| {
351            if let Some(rows) = row.rows {
352                let table_name = row.table_name;
353                let schema = rows.schema;
354
355                let schema = schema
356                    .iter()
357                    .map(|cs| {
358                        let mut map = Map::new();
359                        map.insert(name_key.to_string(), Value::String(cs.column_name.clone()));
360                        map.insert(
361                            data_type_key.to_string(),
362                            Value::String(cs.datatype().as_str_name().to_string()),
363                        );
364                        map.insert(
365                            colume_type_key.to_string(),
366                            Value::String(cs.semantic_type().as_str_name().to_string()),
367                        );
368                        map.insert(
369                            "fulltext".to_string(),
370                            Value::Bool(
371                                cs.options
372                                    .clone()
373                                    .is_some_and(|x| x.options.contains_key("fulltext")),
374                            ),
375                        );
376                        Value::Object(map)
377                    })
378                    .collect::<Vec<_>>();
379
380                let rows = rows
381                    .rows
382                    .into_iter()
383                    .map(|row| {
384                        row.values
385                            .into_iter()
386                            .enumerate()
387                            .map(|(idx, v)| {
388                                v.value_data
389                                    .map(|d| {
390                                        let mut map = Map::new();
391                                        map.insert("value".to_string(), column_data_to_json(d));
392                                        map.insert(
393                                            "key".to_string(),
394                                            schema[idx][name_key].clone(),
395                                        );
396                                        map.insert(
397                                            "semantic_type".to_string(),
398                                            schema[idx][colume_type_key].clone(),
399                                        );
400                                        map.insert(
401                                            "data_type".to_string(),
402                                            schema[idx][data_type_key].clone(),
403                                        );
404                                        Value::Object(map)
405                                    })
406                                    .unwrap_or(Value::Null)
407                            })
408                            .collect()
409                    })
410                    .collect();
411
412                let mut result = Map::new();
413                result.insert("schema".to_string(), Value::Array(schema));
414                result.insert("rows".to_string(), Value::Array(rows));
415                result.insert("table_name".to_string(), Value::String(table_name));
416                let result = Value::Object(result);
417                Some(result)
418            } else {
419                None
420            }
421        })
422        .collect();
423    Ok(Json(Value::Array(results)).into_response())
424}
425
426/// Dryrun pipeline with given data
427/// pipeline_name and pipeline_version to specify pipeline stored in db
428/// pipeline to specify pipeline raw content
429/// data to specify data
430/// data maght be list of string or list of object
431#[derive(Debug, Default, Serialize, Deserialize)]
432pub struct PipelineDryrunParams {
433    pub pipeline_name: Option<String>,
434    pub pipeline_version: Option<String>,
435    pub pipeline: Option<String>,
436    pub data_type: Option<String>,
437    pub data: String,
438}
439
440/// Check if the payload is valid json
441/// Check if the payload contains pipeline or pipeline_name and data
442/// Return Some if valid, None if invalid
443fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
444    match serde_json::from_slice::<PipelineDryrunParams>(payload) {
445        // payload with pipeline or pipeline_name and data is array
446        Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
447        // because of the pipeline_name or pipeline is required
448        Ok(_) => None,
449        // invalid json
450        Err(_) => None,
451    }
452}
453
454/// Check if the pipeline_name exists
455fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
456    pipeline_name.context(InvalidParameterSnafu {
457        reason: "pipeline_name is required",
458    })
459}
460
461/// Check if the data length less than 10
462fn check_data_valid(data_len: usize) -> Result<()> {
463    ensure!(
464        data_len <= 10,
465        InvalidParameterSnafu {
466            reason: "data is required",
467        }
468    );
469    Ok(())
470}
471
472fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
473    let body = Json(json!({
474        "error": format!("{}: {}", step_msg,e.output_msg()),
475    }));
476
477    (status_code_to_http_status(&e.status_code()), body).into_response()
478}
479
480/// Parse the data with given content type
481/// If the content type is invalid, return error
482/// content type is one of application/json, text/plain, application/x-ndjson
483fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<PipelineMap>> {
484    if let Ok(content_type) = ContentType::from_str(&data_type) {
485        extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
486    } else {
487        InvalidParameterSnafu {
488            reason: format!(
489                "invalid content type: {}, expected: one of {}",
490                data_type,
491                EventPayloadResolver::support_content_type_list().join(", ")
492            ),
493        }
494        .fail()
495    }
496}
497
498#[axum_macros::debug_handler]
499pub async fn pipeline_dryrun(
500    State(log_state): State<LogState>,
501    Query(query_params): Query<LogIngesterQueryParams>,
502    Extension(mut query_ctx): Extension<QueryContext>,
503    TypedHeader(content_type): TypedHeader<ContentType>,
504    payload: Bytes,
505) -> Result<Response> {
506    let handler = log_state.log_handler;
507
508    query_ctx.set_channel(Channel::Http);
509    let query_ctx = Arc::new(query_ctx);
510
511    match check_pipeline_dryrun_params_valid(&payload) {
512        Some(params) => {
513            let data = parse_dryrun_data(
514                params.data_type.unwrap_or("application/json".to_string()),
515                params.data,
516            )?;
517
518            check_data_valid(data.len())?;
519
520            match params.pipeline {
521                None => {
522                    let version = to_pipeline_version(params.pipeline_version.as_deref())
523                        .context(PipelineSnafu)?;
524                    let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
525                    let pipeline = handler
526                        .get_pipeline(&pipeline_name, version, query_ctx.clone())
527                        .await?;
528                    dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
529                }
530                Some(pipeline) => {
531                    let pipeline = handler.build_pipeline(&pipeline);
532                    match pipeline {
533                        Ok(pipeline) => {
534                            match dryrun_pipeline_inner(
535                                data,
536                                Arc::new(pipeline),
537                                handler,
538                                &query_ctx,
539                            )
540                            .await
541                            {
542                                Ok(response) => Ok(response),
543                                Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
544                                    "Failed to exec pipeline",
545                                    e,
546                                )),
547                            }
548                        }
549                        Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
550                            "Failed to build pipeline",
551                            e,
552                        )),
553                    }
554                }
555            }
556        }
557        None => {
558            // This path is for back compatibility with the previous dry run code
559            // where the payload is just data (JSON or plain text) and the pipeline name
560            // is specified using query param.
561            let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
562
563            let version =
564                to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
565
566            let ignore_errors = query_params.ignore_errors.unwrap_or(false);
567
568            let value =
569                extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
570
571            check_data_valid(value.len())?;
572
573            let pipeline = handler
574                .get_pipeline(&pipeline_name, version, query_ctx.clone())
575                .await?;
576
577            dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
578        }
579    }
580}
581
582#[axum_macros::debug_handler]
583pub async fn log_ingester(
584    State(log_state): State<LogState>,
585    Query(query_params): Query<LogIngesterQueryParams>,
586    Extension(mut query_ctx): Extension<QueryContext>,
587    TypedHeader(content_type): TypedHeader<ContentType>,
588    headers: HeaderMap,
589    payload: Bytes,
590) -> Result<HttpResponse> {
591    // validate source and payload
592    let source = query_params.source.as_deref();
593    let response = match &log_state.log_validator {
594        Some(validator) => validator.validate(source, &payload).await,
595        None => None,
596    };
597    if let Some(response) = response {
598        return response;
599    }
600
601    let handler = log_state.log_handler;
602
603    let table_name = query_params.table.context(InvalidParameterSnafu {
604        reason: "table is required",
605    })?;
606
607    let ignore_errors = query_params.ignore_errors.unwrap_or(false);
608
609    let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
610        reason: "pipeline_name is required",
611    })?;
612    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
613    let pipeline = PipelineDefinition::from_name(
614        &pipeline_name,
615        version,
616        query_params.custom_time_index.map(|s| (s, ignore_errors)),
617    )
618    .context(PipelineSnafu)?;
619
620    let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
621
622    query_ctx.set_channel(Channel::Http);
623    let query_ctx = Arc::new(query_ctx);
624
625    let value = log_state
626        .ingest_interceptor
627        .as_ref()
628        .pre_pipeline(value, query_ctx.clone())?;
629
630    ingest_logs_inner(
631        handler,
632        pipeline,
633        vec![PipelineIngestRequest {
634            table: table_name,
635            values: value,
636        }],
637        query_ctx,
638        headers,
639    )
640    .await
641}
642
643#[derive(Debug, EnumIter)]
644enum EventPayloadResolverInner {
645    Json,
646    Ndjson,
647    Text,
648}
649
650impl Display for EventPayloadResolverInner {
651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652        match self {
653            EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
654            EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
655            EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
656        }
657    }
658}
659
660impl TryFrom<&ContentType> for EventPayloadResolverInner {
661    type Error = Error;
662
663    fn try_from(content_type: &ContentType) -> Result<Self> {
664        match content_type {
665            x if *x == *JSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Json),
666            x if *x == *NDJSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Ndjson),
667            x if *x == *TEXT_CONTENT_TYPE || *x == *TEXT_UTF8_CONTENT_TYPE => {
668                Ok(EventPayloadResolverInner::Text)
669            }
670            _ => InvalidParameterSnafu {
671                reason: format!(
672                    "invalid content type: {}, expected: one of {}",
673                    content_type,
674                    EventPayloadResolver::support_content_type_list().join(", ")
675                ),
676            }
677            .fail(),
678        }
679    }
680}
681
682#[derive(Debug)]
683struct EventPayloadResolver<'a> {
684    inner: EventPayloadResolverInner,
685    /// The content type of the payload.
686    /// keep it for logging original content type
687    #[allow(dead_code)]
688    content_type: &'a ContentType,
689}
690
691impl EventPayloadResolver<'_> {
692    pub(super) fn support_content_type_list() -> Vec<String> {
693        EventPayloadResolverInner::iter()
694            .map(|x| x.to_string())
695            .collect()
696    }
697}
698
699impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
700    type Error = Error;
701
702    fn try_from(content_type: &'a ContentType) -> Result<Self> {
703        let inner = EventPayloadResolverInner::try_from(content_type)?;
704        Ok(EventPayloadResolver {
705            inner,
706            content_type,
707        })
708    }
709}
710
711impl EventPayloadResolver<'_> {
712    fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<PipelineMap>> {
713        match self.inner {
714            EventPayloadResolverInner::Json => {
715                pipeline::json_array_to_map(transform_ndjson_array_factory(
716                    Deserializer::from_slice(&payload).into_iter(),
717                    ignore_errors,
718                )?)
719                .context(PipelineSnafu)
720            }
721            EventPayloadResolverInner::Ndjson => {
722                let mut result = Vec::with_capacity(1000);
723                for (index, line) in payload.lines().enumerate() {
724                    let mut line = match line {
725                        Ok(line) if !line.is_empty() => line,
726                        Ok(_) => continue, // Skip empty lines
727                        Err(_) if ignore_errors => continue,
728                        Err(e) => {
729                            warn!(e; "invalid string at index: {}", index);
730                            return InvalidParameterSnafu {
731                                reason: format!("invalid line at index: {}", index),
732                            }
733                            .fail();
734                        }
735                    };
736
737                    // simd_json, according to description, only de-escapes string at character level,
738                    // like any other json parser. So it should be safe here.
739                    if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
740                        let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
741                        result.push(v);
742                    } else if !ignore_errors {
743                        warn!("invalid JSON at index: {}, content: {:?}", index, line);
744                        return InvalidParameterSnafu {
745                            reason: format!("invalid JSON at index: {}", index),
746                        }
747                        .fail();
748                    }
749                }
750                Ok(result)
751            }
752            EventPayloadResolverInner::Text => {
753                let result = payload
754                    .lines()
755                    .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
756                    .map(|line| {
757                        let mut map = PipelineMap::new();
758                        map.insert("message".to_string(), pipeline::Value::String(line));
759                        map
760                    })
761                    .collect::<Vec<_>>();
762                Ok(result)
763            }
764        }
765    }
766}
767
768fn extract_pipeline_value_by_content_type(
769    content_type: ContentType,
770    payload: Bytes,
771    ignore_errors: bool,
772) -> Result<Vec<PipelineMap>> {
773    EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
774        resolver
775            .parse_payload(payload, ignore_errors)
776            .map_err(|e| match &e {
777                Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
778                    if reason.contains("invalid item:") {
779                        InvalidParameterSnafu {
780                            reason: "json format error, please check the date is valid JSON.",
781                        }
782                        .build()
783                    } else {
784                        e
785                    }
786                }
787                _ => e,
788            })
789    })
790}
791
792pub(crate) async fn ingest_logs_inner(
793    handler: PipelineHandlerRef,
794    pipeline: PipelineDefinition,
795    log_ingest_requests: Vec<PipelineIngestRequest>,
796    query_ctx: QueryContextRef,
797    headers: HeaderMap,
798) -> Result<HttpResponse> {
799    let db = query_ctx.get_db_string();
800    let exec_timer = std::time::Instant::now();
801
802    let mut req = ContextReq::default();
803
804    let pipeline_params = GreptimePipelineParams::from_params(
805        headers
806            .get(GREPTIME_PIPELINE_PARAMS_HEADER)
807            .and_then(|v| v.to_str().ok()),
808    );
809
810    let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
811    for pipeline_req in log_ingest_requests {
812        let requests =
813            run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
814
815        req.merge(requests);
816    }
817
818    let mut outputs = Vec::new();
819    let mut total_rows: u64 = 0;
820    let mut fail = false;
821    for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
822        let output = handler.insert(act_req, temp_ctx).await;
823
824        if let Ok(Output {
825            data: OutputData::AffectedRows(rows),
826            meta: _,
827        }) = &output
828        {
829            total_rows += *rows as u64;
830        } else {
831            fail = true;
832        }
833        outputs.push(output);
834    }
835
836    if total_rows > 0 {
837        METRIC_HTTP_LOGS_INGESTION_COUNTER
838            .with_label_values(&[db.as_str()])
839            .inc_by(total_rows);
840        METRIC_HTTP_LOGS_INGESTION_ELAPSED
841            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
842            .observe(exec_timer.elapsed().as_secs_f64());
843    }
844    if fail {
845        METRIC_HTTP_LOGS_INGESTION_ELAPSED
846            .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
847            .observe(exec_timer.elapsed().as_secs_f64());
848    }
849
850    let response = GreptimedbV1Response::from_output(outputs)
851        .await
852        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
853    Ok(response)
854}
855
856#[async_trait]
857pub trait LogValidator: Send + Sync {
858    /// validate payload by source before processing
859    /// Return a `Some` result to indicate validation failure.
860    async fn validate(&self, source: Option<&str>, payload: &Bytes)
861        -> Option<Result<HttpResponse>>;
862}
863
864pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
865
866/// axum state struct to hold log handler and validator
867#[derive(Clone)]
868pub struct LogState {
869    pub log_handler: PipelineHandlerRef,
870    pub log_validator: Option<LogValidatorRef>,
871    pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877
878    #[test]
879    fn test_transform_ndjson() {
880        let s = "{\"a\": 1}\n{\"b\": 2}";
881        let a = Value::Array(
882            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
883        )
884        .to_string();
885        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
886
887        let s = "{\"a\": 1}";
888        let a = Value::Array(
889            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
890        )
891        .to_string();
892        assert_eq!(a, "[{\"a\":1}]");
893
894        let s = "[{\"a\": 1}]";
895        let a = Value::Array(
896            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
897        )
898        .to_string();
899        assert_eq!(a, "[{\"a\":1}]");
900
901        let s = "[{\"a\": 1}, {\"b\": 2}]";
902        let a = Value::Array(
903            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
904        )
905        .to_string();
906        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
907    }
908
909    #[test]
910    fn test_extract_by_content() {
911        let payload = r#"
912        {"a": 1}
913        {"b": 2"}
914        {"c": 1}
915"#
916        .as_bytes();
917        let payload = Bytes::from_static(payload);
918
919        let fail_rest =
920            extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
921        assert!(fail_rest.is_ok());
922        assert_eq!(
923            fail_rest.unwrap(),
924            pipeline::json_array_to_map(vec![json!({"a": 1})]).unwrap()
925        );
926
927        let fail_only_wrong =
928            extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
929        assert!(fail_only_wrong.is_ok());
930
931        let mut map1 = PipelineMap::new();
932        map1.insert("a".to_string(), pipeline::Value::Uint64(1));
933        let mut map2 = PipelineMap::new();
934        map2.insert("c".to_string(), pipeline::Value::Uint64(1));
935        assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
936    }
937}