servers/http/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use async_trait::async_trait;
23use axum::body::Bytes;
24use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
25use axum::http::header::CONTENT_TYPE;
26use axum::http::{HeaderMap, StatusCode};
27use axum::response::{IntoResponse, Response};
28use axum::{Extension, Json};
29use axum_extra::TypedHeader;
30use common_error::ext::ErrorExt;
31use common_query::{Output, OutputData};
32use common_telemetry::{error, warn};
33use datatypes::value::column_data_to_json;
34use headers::ContentType;
35use lazy_static::lazy_static;
36use mime_guess::mime;
37use pipeline::util::to_pipeline_version;
38use pipeline::{
39    ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value as PipelineValue,
40};
41use serde::{Deserialize, Serialize};
42use serde_json::{json, Deserializer, Map, Value as JsonValue};
43use session::context::{Channel, QueryContext, QueryContextRef};
44use snafu::{ensure, OptionExt, ResultExt};
45use strum::{EnumIter, IntoEnumIterator};
46
47use crate::error::{
48    status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
49};
50use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
51use crate::http::header::{
52    CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
53};
54use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
55use crate::http::result::greptime_result_v1::GreptimedbV1Response;
56use crate::http::HttpResponse;
57use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
58use crate::metrics::{
59    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
60    METRIC_SUCCESS_VALUE,
61};
62use crate::pipeline::run_pipeline;
63use crate::query_handler::PipelineHandlerRef;
64
65const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
66const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
67
68lazy_static! {
69    pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
70    pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
71    pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
72    pub static ref PB_CONTENT_TYPE: ContentType =
73        ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
74    pub static ref NDJSON_CONTENT_TYPE: ContentType =
75        ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
76}
77
78/// LogIngesterQueryParams is used for query params of log ingester API.
79#[derive(Debug, Default, Serialize, Deserialize)]
80pub struct LogIngesterQueryParams {
81    /// The database where log data will be written to.
82    pub db: Option<String>,
83    /// The table where log data will be written to.
84    pub table: Option<String>,
85    /// The pipeline that will be used for log ingestion.
86    pub pipeline_name: Option<String>,
87    /// The version of the pipeline to be used for log ingestion.
88    pub version: Option<String>,
89    /// Whether to ignore errors during log ingestion.
90    pub ignore_errors: Option<bool>,
91    /// The source of the log data.
92    pub source: Option<String>,
93    /// The JSON field name of the log message. If not provided, it will take the whole log as the message.
94    /// The field must be at the top level of the JSON structure.
95    pub msg_field: Option<String>,
96    /// Specify a custom time index from the input data rather than server's arrival time.
97    /// Valid formats:
98    /// - <field_name>;epoch;<resolution>
99    /// - <field_name>;datestr;<format>
100    ///
101    /// If an error occurs while parsing the config, the error will be returned in the response.
102    /// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored.
103    /// If so, use the current server's timestamp as the event time.
104    pub custom_time_index: Option<String>,
105    /// Whether to skip errors during log ingestion.
106    /// If set to true, the ingestion will continue even if there are errors in the data.
107    /// If set to false, the ingestion will stop at the first error.
108    /// This is different from `ignore_errors`, which is used to ignore errors during the pipeline execution.
109    /// The priority of query params is lower than that headers of x-greptime-pipeline-params.
110    pub skip_error: Option<bool>,
111}
112
113/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
114/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
115#[derive(Debug, PartialEq)]
116pub(crate) struct PipelineIngestRequest {
117    /// The table where the log data will be written to.
118    pub table: String,
119    /// The log data to be ingested.
120    pub values: Vec<PipelineValue>,
121}
122
123pub struct PipelineContent(String);
124
125impl<S> FromRequest<S> for PipelineContent
126where
127    S: Send + Sync,
128{
129    type Rejection = Response;
130
131    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
132        let content_type_header = req.headers().get(CONTENT_TYPE);
133        let content_type = content_type_header.and_then(|value| value.to_str().ok());
134        if let Some(content_type) = content_type {
135            if content_type.ends_with("yaml") {
136                let payload = String::from_request(req, state)
137                    .await
138                    .map_err(IntoResponse::into_response)?;
139                return Ok(Self(payload));
140            }
141
142            if content_type.starts_with("multipart/form-data") {
143                let mut payload: Multipart = Multipart::from_request(req, state)
144                    .await
145                    .map_err(IntoResponse::into_response)?;
146                let file = payload
147                    .next_field()
148                    .await
149                    .map_err(IntoResponse::into_response)?;
150                let payload = file
151                    .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
152                    .text()
153                    .await
154                    .map_err(IntoResponse::into_response)?;
155                return Ok(Self(payload));
156            }
157        }
158
159        Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
160    }
161}
162
163#[axum_macros::debug_handler]
164pub async fn query_pipeline(
165    State(state): State<LogState>,
166    Extension(mut query_ctx): Extension<QueryContext>,
167    Query(query_params): Query<LogIngesterQueryParams>,
168    Path(pipeline_name): Path<String>,
169) -> Result<GreptimedbManageResponse> {
170    let start = Instant::now();
171    let handler = state.log_handler;
172    ensure!(
173        !pipeline_name.is_empty(),
174        InvalidParameterSnafu {
175            reason: "pipeline_name is required in path",
176        }
177    );
178
179    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
180
181    query_ctx.set_channel(Channel::Http);
182    let query_ctx = Arc::new(query_ctx);
183
184    let (pipeline, pipeline_version) = handler
185        .get_pipeline_str(&pipeline_name, version, query_ctx)
186        .await?;
187
188    Ok(GreptimedbManageResponse::from_pipeline(
189        pipeline_name,
190        query_params
191            .version
192            .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
193        start.elapsed().as_millis() as u64,
194        Some(pipeline),
195    ))
196}
197
198#[axum_macros::debug_handler]
199pub async fn add_pipeline(
200    State(state): State<LogState>,
201    Path(pipeline_name): Path<String>,
202    Extension(mut query_ctx): Extension<QueryContext>,
203    PipelineContent(payload): PipelineContent,
204) -> Result<GreptimedbManageResponse> {
205    let start = Instant::now();
206    let handler = state.log_handler;
207    ensure!(
208        !pipeline_name.is_empty(),
209        InvalidParameterSnafu {
210            reason: "pipeline_name is required in path",
211        }
212    );
213    ensure!(
214        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
215        InvalidParameterSnafu {
216            reason: "pipeline_name cannot start with greptime_",
217        }
218    );
219    ensure!(
220        !payload.is_empty(),
221        InvalidParameterSnafu {
222            reason: "pipeline is required in body",
223        }
224    );
225
226    query_ctx.set_channel(Channel::Http);
227    let query_ctx = Arc::new(query_ctx);
228
229    let content_type = "yaml";
230    let result = handler
231        .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
232        .await;
233
234    result
235        .map(|pipeline| {
236            GreptimedbManageResponse::from_pipeline(
237                pipeline_name,
238                pipeline.0.to_timezone_aware_string(None),
239                start.elapsed().as_millis() as u64,
240                None,
241            )
242        })
243        .map_err(|e| {
244            error!(e; "failed to insert pipeline");
245            e
246        })
247}
248
249#[axum_macros::debug_handler]
250pub async fn delete_pipeline(
251    State(state): State<LogState>,
252    Extension(mut query_ctx): Extension<QueryContext>,
253    Query(query_params): Query<LogIngesterQueryParams>,
254    Path(pipeline_name): Path<String>,
255) -> Result<GreptimedbManageResponse> {
256    let start = Instant::now();
257    let handler = state.log_handler;
258    ensure!(
259        !pipeline_name.is_empty(),
260        InvalidParameterSnafu {
261            reason: "pipeline_name is required",
262        }
263    );
264
265    let version_str = query_params.version.context(InvalidParameterSnafu {
266        reason: "version is required",
267    })?;
268
269    let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
270
271    query_ctx.set_channel(Channel::Http);
272    let query_ctx = Arc::new(query_ctx);
273
274    handler
275        .delete_pipeline(&pipeline_name, version, query_ctx)
276        .await
277        .map(|v| {
278            if v.is_some() {
279                GreptimedbManageResponse::from_pipeline(
280                    pipeline_name,
281                    version_str,
282                    start.elapsed().as_millis() as u64,
283                    None,
284                )
285            } else {
286                GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
287            }
288        })
289        .map_err(|e| {
290            error!(e; "failed to delete pipeline");
291            e
292        })
293}
294
295/// Transform NDJSON array into a single array
296/// always return an array
297fn transform_ndjson_array_factory(
298    values: impl IntoIterator<Item = Result<JsonValue, serde_json::Error>>,
299    ignore_error: bool,
300) -> Result<Vec<JsonValue>> {
301    values
302        .into_iter()
303        .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
304            Ok(item_value) => {
305                match item_value {
306                    JsonValue::Array(item_array) => {
307                        acc_array.extend(item_array);
308                    }
309                    JsonValue::Object(_) => {
310                        acc_array.push(item_value);
311                    }
312                    _ => {
313                        if !ignore_error {
314                            warn!("invalid item in array: {:?}", item_value);
315                            return InvalidParameterSnafu {
316                                reason: format!("invalid item: {} in array", item_value),
317                            }
318                            .fail();
319                        }
320                    }
321                }
322                Ok(acc_array)
323            }
324            Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
325            Err(_) => {
326                warn!("invalid item in array: {:?}", item);
327                Ok(acc_array)
328            }
329        })
330}
331
332/// Dryrun pipeline with given data
333async fn dryrun_pipeline_inner(
334    value: Vec<PipelineValue>,
335    pipeline: Arc<pipeline::Pipeline>,
336    pipeline_handler: PipelineHandlerRef,
337    query_ctx: &QueryContextRef,
338) -> Result<Response> {
339    let params = GreptimePipelineParams::default();
340
341    let pipeline_def = PipelineDefinition::Resolved(pipeline);
342    let pipeline_ctx = PipelineContext::new(&pipeline_def, &params, query_ctx.channel());
343    let results = run_pipeline(
344        &pipeline_handler,
345        &pipeline_ctx,
346        PipelineIngestRequest {
347            table: "dry_run".to_owned(),
348            values: value,
349        },
350        query_ctx,
351        true,
352    )
353    .await?;
354
355    let colume_type_key = "colume_type";
356    let data_type_key = "data_type";
357    let name_key = "name";
358
359    let results = results
360        .all_req()
361        .filter_map(|row| {
362            if let Some(rows) = row.rows {
363                let table_name = row.table_name;
364                let schema = rows.schema;
365
366                let schema = schema
367                    .iter()
368                    .map(|cs| {
369                        let mut map = Map::new();
370                        map.insert(
371                            name_key.to_string(),
372                            JsonValue::String(cs.column_name.clone()),
373                        );
374                        map.insert(
375                            data_type_key.to_string(),
376                            JsonValue::String(cs.datatype().as_str_name().to_string()),
377                        );
378                        map.insert(
379                            colume_type_key.to_string(),
380                            JsonValue::String(cs.semantic_type().as_str_name().to_string()),
381                        );
382                        map.insert(
383                            "fulltext".to_string(),
384                            JsonValue::Bool(
385                                cs.options
386                                    .clone()
387                                    .is_some_and(|x| x.options.contains_key("fulltext")),
388                            ),
389                        );
390                        JsonValue::Object(map)
391                    })
392                    .collect::<Vec<_>>();
393
394                let rows = rows
395                    .rows
396                    .into_iter()
397                    .map(|row| {
398                        row.values
399                            .into_iter()
400                            .enumerate()
401                            .map(|(idx, v)| {
402                                v.value_data
403                                    .map(|d| {
404                                        let mut map = Map::new();
405                                        map.insert("value".to_string(), column_data_to_json(d));
406                                        map.insert(
407                                            "key".to_string(),
408                                            schema[idx][name_key].clone(),
409                                        );
410                                        map.insert(
411                                            "semantic_type".to_string(),
412                                            schema[idx][colume_type_key].clone(),
413                                        );
414                                        map.insert(
415                                            "data_type".to_string(),
416                                            schema[idx][data_type_key].clone(),
417                                        );
418                                        JsonValue::Object(map)
419                                    })
420                                    .unwrap_or(JsonValue::Null)
421                            })
422                            .collect()
423                    })
424                    .collect();
425
426                let mut result = Map::new();
427                result.insert("schema".to_string(), JsonValue::Array(schema));
428                result.insert("rows".to_string(), JsonValue::Array(rows));
429                result.insert("table_name".to_string(), JsonValue::String(table_name));
430                let result = JsonValue::Object(result);
431                Some(result)
432            } else {
433                None
434            }
435        })
436        .collect();
437    Ok(Json(JsonValue::Array(results)).into_response())
438}
439
440/// Dryrun pipeline with given data
441/// pipeline_name and pipeline_version to specify pipeline stored in db
442/// pipeline to specify pipeline raw content
443/// data to specify data
444/// data maght be list of string or list of object
445#[derive(Debug, Default, Serialize, Deserialize)]
446pub struct PipelineDryrunParams {
447    pub pipeline_name: Option<String>,
448    pub pipeline_version: Option<String>,
449    pub pipeline: Option<String>,
450    pub data_type: Option<String>,
451    pub data: String,
452}
453
454/// Check if the payload is valid json
455/// Check if the payload contains pipeline or pipeline_name and data
456/// Return Some if valid, None if invalid
457fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
458    match serde_json::from_slice::<PipelineDryrunParams>(payload) {
459        // payload with pipeline or pipeline_name and data is array
460        Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
461        // because of the pipeline_name or pipeline is required
462        Ok(_) => None,
463        // invalid json
464        Err(_) => None,
465    }
466}
467
468/// Check if the pipeline_name exists
469fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
470    pipeline_name.context(InvalidParameterSnafu {
471        reason: "pipeline_name is required",
472    })
473}
474
475/// Check if the data length less than 10
476fn check_data_valid(data_len: usize) -> Result<()> {
477    ensure!(
478        data_len <= 10,
479        InvalidParameterSnafu {
480            reason: "data is required",
481        }
482    );
483    Ok(())
484}
485
486fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
487    let body = Json(json!({
488        "error": format!("{}: {}", step_msg,e.output_msg()),
489    }));
490
491    (status_code_to_http_status(&e.status_code()), body).into_response()
492}
493
494/// Parse the data with given content type
495/// If the content type is invalid, return error
496/// content type is one of application/json, text/plain, application/x-ndjson
497fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<PipelineValue>> {
498    if let Ok(content_type) = ContentType::from_str(&data_type) {
499        extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
500    } else {
501        InvalidParameterSnafu {
502            reason: format!(
503                "invalid content type: {}, expected: one of {}",
504                data_type,
505                EventPayloadResolver::support_content_type_list().join(", ")
506            ),
507        }
508        .fail()
509    }
510}
511
512#[axum_macros::debug_handler]
513pub async fn pipeline_dryrun(
514    State(log_state): State<LogState>,
515    Query(query_params): Query<LogIngesterQueryParams>,
516    Extension(mut query_ctx): Extension<QueryContext>,
517    TypedHeader(content_type): TypedHeader<ContentType>,
518    payload: Bytes,
519) -> Result<Response> {
520    let handler = log_state.log_handler;
521
522    query_ctx.set_channel(Channel::Http);
523    let query_ctx = Arc::new(query_ctx);
524
525    match check_pipeline_dryrun_params_valid(&payload) {
526        Some(params) => {
527            let data = parse_dryrun_data(
528                params.data_type.unwrap_or("application/json".to_string()),
529                params.data,
530            )?;
531
532            check_data_valid(data.len())?;
533
534            match params.pipeline {
535                None => {
536                    let version = to_pipeline_version(params.pipeline_version.as_deref())
537                        .context(PipelineSnafu)?;
538                    let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
539                    let pipeline = handler
540                        .get_pipeline(&pipeline_name, version, query_ctx.clone())
541                        .await?;
542                    dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
543                }
544                Some(pipeline) => {
545                    let pipeline = handler.build_pipeline(&pipeline);
546                    match pipeline {
547                        Ok(pipeline) => {
548                            match dryrun_pipeline_inner(
549                                data,
550                                Arc::new(pipeline),
551                                handler,
552                                &query_ctx,
553                            )
554                            .await
555                            {
556                                Ok(response) => Ok(response),
557                                Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
558                                    "Failed to exec pipeline",
559                                    e,
560                                )),
561                            }
562                        }
563                        Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
564                            "Failed to build pipeline",
565                            e,
566                        )),
567                    }
568                }
569            }
570        }
571        None => {
572            // This path is for back compatibility with the previous dry run code
573            // where the payload is just data (JSON or plain text) and the pipeline name
574            // is specified using query param.
575            let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
576
577            let version =
578                to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
579
580            let ignore_errors = query_params.ignore_errors.unwrap_or(false);
581
582            let value =
583                extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
584
585            check_data_valid(value.len())?;
586
587            let pipeline = handler
588                .get_pipeline(&pipeline_name, version, query_ctx.clone())
589                .await?;
590
591            dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
592        }
593    }
594}
595
596pub(crate) fn extract_pipeline_params_map_from_headers(
597    headers: &HeaderMap,
598) -> ahash::HashMap<String, String> {
599    GreptimePipelineParams::parse_header_str_to_map(
600        headers
601            .get(GREPTIME_PIPELINE_PARAMS_HEADER)
602            .and_then(|v| v.to_str().ok()),
603    )
604}
605
606#[axum_macros::debug_handler]
607pub async fn log_ingester(
608    State(log_state): State<LogState>,
609    Query(query_params): Query<LogIngesterQueryParams>,
610    Extension(mut query_ctx): Extension<QueryContext>,
611    TypedHeader(content_type): TypedHeader<ContentType>,
612    headers: HeaderMap,
613    payload: Bytes,
614) -> Result<HttpResponse> {
615    // validate source and payload
616    let source = query_params.source.as_deref();
617    let response = match &log_state.log_validator {
618        Some(validator) => validator.validate(source, &payload).await,
619        None => None,
620    };
621    if let Some(response) = response {
622        return response;
623    }
624
625    let handler = log_state.log_handler;
626
627    let table_name = query_params.table.context(InvalidParameterSnafu {
628        reason: "table is required",
629    })?;
630
631    let ignore_errors = query_params.ignore_errors.unwrap_or(false);
632
633    let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
634        reason: "pipeline_name is required",
635    })?;
636    let skip_error = query_params.skip_error.unwrap_or(false);
637    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
638    let pipeline = PipelineDefinition::from_name(
639        &pipeline_name,
640        version,
641        query_params.custom_time_index.map(|s| (s, ignore_errors)),
642    )
643    .context(PipelineSnafu)?;
644
645    let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
646
647    query_ctx.set_channel(Channel::Http);
648    let query_ctx = Arc::new(query_ctx);
649
650    let value = log_state
651        .ingest_interceptor
652        .as_ref()
653        .pre_pipeline(value, query_ctx.clone())?;
654
655    let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
656    if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
657        pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
658    }
659    let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
660
661    ingest_logs_inner(
662        handler,
663        pipeline,
664        vec![PipelineIngestRequest {
665            table: table_name,
666            values: value,
667        }],
668        query_ctx,
669        pipeline_params,
670    )
671    .await
672}
673
674#[derive(Debug, EnumIter)]
675enum EventPayloadResolverInner {
676    Json,
677    Ndjson,
678    Text,
679}
680
681impl Display for EventPayloadResolverInner {
682    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
683        match self {
684            EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
685            EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
686            EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
687        }
688    }
689}
690
691impl TryFrom<&ContentType> for EventPayloadResolverInner {
692    type Error = Error;
693
694    fn try_from(content_type: &ContentType) -> Result<Self> {
695        let mime: mime_guess::Mime = content_type.clone().into();
696        match (mime.type_(), mime.subtype()) {
697            (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
698            (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
699                Ok(EventPayloadResolverInner::Ndjson)
700            }
701            (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
702            _ => InvalidParameterSnafu {
703                reason: format!(
704                    "invalid content type: {}, expected: one of {}",
705                    content_type,
706                    EventPayloadResolver::support_content_type_list().join(", ")
707                ),
708            }
709            .fail(),
710        }
711    }
712}
713
714#[derive(Debug)]
715struct EventPayloadResolver<'a> {
716    inner: EventPayloadResolverInner,
717    /// The content type of the payload.
718    /// keep it for logging original content type
719    #[allow(dead_code)]
720    content_type: &'a ContentType,
721}
722
723impl EventPayloadResolver<'_> {
724    pub(super) fn support_content_type_list() -> Vec<String> {
725        EventPayloadResolverInner::iter()
726            .map(|x| x.to_string())
727            .collect()
728    }
729}
730
731impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
732    type Error = Error;
733
734    fn try_from(content_type: &'a ContentType) -> Result<Self> {
735        let inner = EventPayloadResolverInner::try_from(content_type)?;
736        Ok(EventPayloadResolver {
737            inner,
738            content_type,
739        })
740    }
741}
742
743impl EventPayloadResolver<'_> {
744    fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<PipelineValue>> {
745        match self.inner {
746            EventPayloadResolverInner::Json => {
747                pipeline::json_array_to_map(transform_ndjson_array_factory(
748                    Deserializer::from_slice(&payload).into_iter(),
749                    ignore_errors,
750                )?)
751                .context(PipelineSnafu)
752            }
753            EventPayloadResolverInner::Ndjson => {
754                let mut result = Vec::with_capacity(1000);
755                for (index, line) in payload.lines().enumerate() {
756                    let mut line = match line {
757                        Ok(line) if !line.is_empty() => line,
758                        Ok(_) => continue, // Skip empty lines
759                        Err(_) if ignore_errors => continue,
760                        Err(e) => {
761                            warn!(e; "invalid string at index: {}", index);
762                            return InvalidParameterSnafu {
763                                reason: format!("invalid line at index: {}", index),
764                            }
765                            .fail();
766                        }
767                    };
768
769                    // simd_json, according to description, only de-escapes string at character level,
770                    // like any other json parser. So it should be safe here.
771                    if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
772                        let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
773                        result.push(v);
774                    } else if !ignore_errors {
775                        warn!("invalid JSON at index: {}, content: {:?}", index, line);
776                        return InvalidParameterSnafu {
777                            reason: format!("invalid JSON at index: {}", index),
778                        }
779                        .fail();
780                    }
781                }
782                Ok(result)
783            }
784            EventPayloadResolverInner::Text => {
785                let result = payload
786                    .lines()
787                    .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
788                    .map(|line| {
789                        let mut map = BTreeMap::new();
790                        map.insert("message".to_string(), PipelineValue::String(line));
791                        PipelineValue::Map(map.into())
792                    })
793                    .collect::<Vec<_>>();
794                Ok(result)
795            }
796        }
797    }
798}
799
800fn extract_pipeline_value_by_content_type(
801    content_type: ContentType,
802    payload: Bytes,
803    ignore_errors: bool,
804) -> Result<Vec<PipelineValue>> {
805    EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
806        resolver
807            .parse_payload(payload, ignore_errors)
808            .map_err(|e| match &e {
809                Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
810                    if reason.contains("invalid item:") {
811                        InvalidParameterSnafu {
812                            reason: "json format error, please check the date is valid JSON.",
813                        }
814                        .build()
815                    } else {
816                        e
817                    }
818                }
819                _ => e,
820            })
821    })
822}
823
824pub(crate) async fn ingest_logs_inner(
825    handler: PipelineHandlerRef,
826    pipeline: PipelineDefinition,
827    log_ingest_requests: Vec<PipelineIngestRequest>,
828    query_ctx: QueryContextRef,
829    pipeline_params: GreptimePipelineParams,
830) -> Result<HttpResponse> {
831    let db = query_ctx.get_db_string();
832    let exec_timer = std::time::Instant::now();
833
834    let mut req = ContextReq::default();
835
836    let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
837    for pipeline_req in log_ingest_requests {
838        let requests =
839            run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
840
841        req.merge(requests);
842    }
843
844    let mut outputs = Vec::new();
845    let mut total_rows: u64 = 0;
846    let mut fail = false;
847    for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
848        let output = handler.insert(act_req, temp_ctx).await;
849
850        if let Ok(Output {
851            data: OutputData::AffectedRows(rows),
852            meta: _,
853        }) = &output
854        {
855            total_rows += *rows as u64;
856        } else {
857            fail = true;
858        }
859        outputs.push(output);
860    }
861
862    if total_rows > 0 {
863        METRIC_HTTP_LOGS_INGESTION_COUNTER
864            .with_label_values(&[db.as_str()])
865            .inc_by(total_rows);
866        METRIC_HTTP_LOGS_INGESTION_ELAPSED
867            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
868            .observe(exec_timer.elapsed().as_secs_f64());
869    }
870    if fail {
871        METRIC_HTTP_LOGS_INGESTION_ELAPSED
872            .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
873            .observe(exec_timer.elapsed().as_secs_f64());
874    }
875
876    let response = GreptimedbV1Response::from_output(outputs)
877        .await
878        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
879    Ok(response)
880}
881
882#[async_trait]
883pub trait LogValidator: Send + Sync {
884    /// validate payload by source before processing
885    /// Return a `Some` result to indicate validation failure.
886    async fn validate(&self, source: Option<&str>, payload: &Bytes)
887        -> Option<Result<HttpResponse>>;
888}
889
890pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
891
892/// axum state struct to hold log handler and validator
893#[derive(Clone)]
894pub struct LogState {
895    pub log_handler: PipelineHandlerRef,
896    pub log_validator: Option<LogValidatorRef>,
897    pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
898}
899
900#[cfg(test)]
901mod tests {
902    use super::*;
903
904    #[test]
905    fn test_transform_ndjson() {
906        let s = "{\"a\": 1}\n{\"b\": 2}";
907        let a = JsonValue::Array(
908            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
909        )
910        .to_string();
911        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
912
913        let s = "{\"a\": 1}";
914        let a = JsonValue::Array(
915            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
916        )
917        .to_string();
918        assert_eq!(a, "[{\"a\":1}]");
919
920        let s = "[{\"a\": 1}]";
921        let a = JsonValue::Array(
922            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
923        )
924        .to_string();
925        assert_eq!(a, "[{\"a\":1}]");
926
927        let s = "[{\"a\": 1}, {\"b\": 2}]";
928        let a = JsonValue::Array(
929            transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
930        )
931        .to_string();
932        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
933    }
934
935    #[test]
936    fn test_extract_by_content() {
937        let payload = r#"
938        {"a": 1}
939        {"b": 2"}
940        {"c": 1}
941"#
942        .as_bytes();
943        let payload = Bytes::from_static(payload);
944
945        let fail_rest =
946            extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
947        assert!(fail_rest.is_ok());
948        assert_eq!(
949            fail_rest.unwrap(),
950            pipeline::json_array_to_map(vec![json!({"a": 1})]).unwrap()
951        );
952
953        let fail_only_wrong =
954            extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
955        assert!(fail_only_wrong.is_ok());
956
957        let mut map1 = BTreeMap::new();
958        map1.insert("a".to_string(), PipelineValue::Uint64(1));
959        let map1 = PipelineValue::Map(map1.into());
960        let mut map2 = BTreeMap::new();
961        map2.insert("c".to_string(), PipelineValue::Uint64(1));
962        let map2 = PipelineValue::Map(map2.into());
963        assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
964    }
965}