Skip to main content

servers/http/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use api::helper::pb_value_to_value_ref;
23use async_trait::async_trait;
24use axum::body::Bytes;
25use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
26use axum::http::header::CONTENT_TYPE;
27use axum::http::{HeaderMap, StatusCode};
28use axum::response::{IntoResponse, Response};
29use axum::{Extension, Json};
30use axum_extra::TypedHeader;
31use common_catalog::consts::default_engine;
32use common_error::ext::{BoxedError, ErrorExt};
33use common_query::{Output, OutputData};
34use common_telemetry::{error, warn};
35use headers::ContentType;
36use lazy_static::lazy_static;
37use mime_guess::mime;
38use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create};
39use pipeline::util::to_pipeline_version;
40use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
41use prometheus::{HistogramVec, IntCounterVec};
42use serde::{Deserialize, Serialize};
43use serde_json::{Deserializer, Map, Value as JsonValue, json};
44use session::context::{Channel, QueryContext, QueryContextRef};
45use simd_json::Buffers;
46use snafu::{OptionExt, ResultExt, ensure};
47use store_api::mito_engine_options::APPEND_MODE_KEY;
48use strum::{EnumIter, IntoEnumIterator};
49use table::table_reference::TableReference;
50use vrl::value::{KeyString, Value as VrlValue};
51
52use crate::error::{
53    Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
54    status_code_to_http_status,
55};
56use crate::http::HttpResponse;
57use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
58use crate::http::header::{
59    CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
60};
61use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput};
62use crate::http::result::greptime_result_v1::GreptimedbV1Response;
63use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
64use crate::metrics::{
65    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
66    METRIC_SUCCESS_VALUE,
67};
68use crate::pipeline::run_pipeline;
69use crate::query_handler::PipelineHandlerRef;
70
71const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
72const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
73
74const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str =
75    "the pipeline has dispatcher or table_suffix, the table name may not be fixed";
76const CREATE_TABLE_SQL_TABLE_EXISTS: &str =
77    "table already exists, the CREATE TABLE SQL may be different";
78
79lazy_static! {
80    pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
81    pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
82    pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
83    pub static ref PB_CONTENT_TYPE: ContentType =
84        ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
85    pub static ref NDJSON_CONTENT_TYPE: ContentType =
86        ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
87}
88
89/// LogIngesterQueryParams is used for query params of log ingester API.
90#[derive(Debug, Default, Serialize, Deserialize)]
91pub struct LogIngesterQueryParams {
92    /// The database where log data will be written to.
93    pub db: Option<String>,
94    /// The table where log data will be written to.
95    pub table: Option<String>,
96    /// The pipeline that will be used for log ingestion.
97    pub pipeline_name: Option<String>,
98    /// The version of the pipeline to be used for log ingestion.
99    pub version: Option<String>,
100    /// Whether to ignore errors during log ingestion.
101    pub ignore_errors: Option<bool>,
102    /// The source of the log data.
103    pub source: Option<String>,
104    /// The JSON field name of the log message. If not provided, it will take the whole log as the message.
105    /// The field must be at the top level of the JSON structure.
106    pub msg_field: Option<String>,
107    /// Specify a custom time index from the input data rather than server's arrival time.
108    /// Valid formats:
109    /// - <field_name>;epoch;<resolution>
110    /// - <field_name>;datestr;<format>
111    ///
112    /// If an error occurs while parsing the config, the error will be returned in the response.
113    /// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored.
114    /// If so, use the current server's timestamp as the event time.
115    pub custom_time_index: Option<String>,
116    /// Whether to skip errors during log ingestion.
117    /// If set to true, the ingestion will continue even if there are errors in the data.
118    /// If set to false, the ingestion will stop at the first error.
119    /// This is different from `ignore_errors`, which is used to ignore errors during the pipeline execution.
120    /// The priority of query params is lower than that headers of x-greptime-pipeline-params.
121    pub skip_error: Option<bool>,
122}
123
124/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
125/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
126#[derive(Debug, PartialEq)]
127pub(crate) struct PipelineIngestRequest {
128    /// The table where the log data will be written to.
129    pub table: String,
130    /// The log data to be ingested.
131    pub values: Vec<VrlValue>,
132}
133
134pub struct PipelineContent(String);
135
136impl<S> FromRequest<S> for PipelineContent
137where
138    S: Send + Sync,
139{
140    type Rejection = Response;
141
142    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
143        let content_type_header = req.headers().get(CONTENT_TYPE);
144        let content_type = content_type_header.and_then(|value| value.to_str().ok());
145        if let Some(content_type) = content_type {
146            if content_type.ends_with("yaml") {
147                let payload = String::from_request(req, state)
148                    .await
149                    .map_err(IntoResponse::into_response)?;
150                return Ok(Self(payload));
151            }
152
153            if content_type.starts_with("multipart/form-data") {
154                let mut payload: Multipart = Multipart::from_request(req, state)
155                    .await
156                    .map_err(IntoResponse::into_response)?;
157                let file = payload
158                    .next_field()
159                    .await
160                    .map_err(IntoResponse::into_response)?;
161                let payload = file
162                    .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
163                    .text()
164                    .await
165                    .map_err(IntoResponse::into_response)?;
166                return Ok(Self(payload));
167            }
168        }
169
170        Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
171    }
172}
173
174#[axum_macros::debug_handler]
175pub async fn query_pipeline(
176    State(state): State<LogState>,
177    Extension(mut query_ctx): Extension<QueryContext>,
178    Query(query_params): Query<LogIngesterQueryParams>,
179    Path(pipeline_name): Path<String>,
180) -> Result<GreptimedbManageResponse> {
181    let start = Instant::now();
182    let handler = state.log_handler;
183    ensure!(
184        !pipeline_name.is_empty(),
185        InvalidParameterSnafu {
186            reason: "pipeline_name is required in path",
187        }
188    );
189
190    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
191
192    query_ctx.set_channel(Channel::Log);
193    let query_ctx = Arc::new(query_ctx);
194
195    let (pipeline, pipeline_version) = handler
196        .get_pipeline_str(&pipeline_name, version, query_ctx)
197        .await?;
198
199    Ok(GreptimedbManageResponse::from_pipeline(
200        pipeline_name,
201        query_params
202            .version
203            .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
204        start.elapsed().as_millis() as u64,
205        Some(pipeline),
206    ))
207}
208
209/// Generate DDL from pipeline definition.
210#[axum_macros::debug_handler]
211pub async fn query_pipeline_ddl(
212    State(state): State<LogState>,
213    Extension(mut query_ctx): Extension<QueryContext>,
214    Query(query_params): Query<LogIngesterQueryParams>,
215    Path(pipeline_name): Path<String>,
216) -> Result<GreptimedbManageResponse> {
217    let start = Instant::now();
218    let handler = state.log_handler;
219    ensure!(
220        !pipeline_name.is_empty(),
221        InvalidParameterSnafu {
222            reason: "pipeline_name is required in path",
223        }
224    );
225    ensure!(
226        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
227        InvalidParameterSnafu {
228            reason: "built-in pipelines don't have fixed table schema",
229        }
230    );
231    let table_name = query_params.table.context(InvalidParameterSnafu {
232        reason: "table name is required",
233    })?;
234
235    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
236
237    query_ctx.set_channel(Channel::Log);
238    let query_ctx = Arc::new(query_ctx);
239
240    let pipeline = handler
241        .get_pipeline(&pipeline_name, version, query_ctx.clone())
242        .await?;
243
244    let schemas_def = pipeline.schemas().context(InvalidParameterSnafu {
245        reason: "auto transform doesn't have fixed table schema",
246    })?;
247
248    let schema = query_ctx.current_schema();
249    let table_name_ref = TableReference {
250        catalog: query_ctx.current_catalog(),
251        schema: &schema,
252        table: &table_name,
253    };
254
255    let mut create_table_expr =
256        create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None)
257            .map_err(BoxedError::new)
258            .context(OtherSnafu)?;
259
260    // manually set the append_mode to true
261    create_table_expr
262        .table_options
263        .insert(APPEND_MODE_KEY.to_string(), "true".to_string());
264
265    let expr = expr_to_create(&create_table_expr, None)
266        .map_err(BoxedError::new)
267        .context(OtherSnafu)?;
268
269    let message = if handler.get_table(&table_name, &query_ctx).await?.is_some() {
270        Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
271    } else if pipeline.is_variant_table_name() {
272        Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
273    } else {
274        None
275    };
276
277    let sql = SqlOutput {
278        sql: format!("{:#}", expr),
279        message,
280    };
281
282    Ok(GreptimedbManageResponse::from_sql(
283        sql,
284        start.elapsed().as_millis() as u64,
285    ))
286}
287
288#[axum_macros::debug_handler]
289pub async fn add_pipeline(
290    State(state): State<LogState>,
291    Path(pipeline_name): Path<String>,
292    Extension(mut query_ctx): Extension<QueryContext>,
293    PipelineContent(payload): PipelineContent,
294) -> Result<GreptimedbManageResponse> {
295    let start = Instant::now();
296    let handler = state.log_handler;
297    ensure!(
298        !pipeline_name.is_empty(),
299        InvalidParameterSnafu {
300            reason: "pipeline_name is required in path",
301        }
302    );
303    ensure!(
304        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
305        InvalidParameterSnafu {
306            reason: "pipeline_name cannot start with greptime_",
307        }
308    );
309    ensure!(
310        !payload.is_empty(),
311        InvalidParameterSnafu {
312            reason: "pipeline is required in body",
313        }
314    );
315
316    query_ctx.set_channel(Channel::Log);
317    let query_ctx = Arc::new(query_ctx);
318
319    let content_type = "yaml";
320    let result = handler
321        .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
322        .await;
323
324    result
325        .map(|pipeline| {
326            GreptimedbManageResponse::from_pipeline(
327                pipeline_name,
328                pipeline.0.to_timezone_aware_string(None),
329                start.elapsed().as_millis() as u64,
330                None,
331            )
332        })
333        .map_err(|e| {
334            error!(e; "failed to insert pipeline");
335            e
336        })
337}
338
339#[axum_macros::debug_handler]
340pub async fn delete_pipeline(
341    State(state): State<LogState>,
342    Extension(mut query_ctx): Extension<QueryContext>,
343    Query(query_params): Query<LogIngesterQueryParams>,
344    Path(pipeline_name): Path<String>,
345) -> Result<GreptimedbManageResponse> {
346    let start = Instant::now();
347    let handler = state.log_handler;
348    ensure!(
349        !pipeline_name.is_empty(),
350        InvalidParameterSnafu {
351            reason: "pipeline_name is required",
352        }
353    );
354
355    let version_str = query_params.version.context(InvalidParameterSnafu {
356        reason: "version is required",
357    })?;
358
359    let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
360
361    query_ctx.set_channel(Channel::Log);
362    let query_ctx = Arc::new(query_ctx);
363
364    handler
365        .delete_pipeline(&pipeline_name, version, query_ctx)
366        .await
367        .map(|v| {
368            if v.is_some() {
369                GreptimedbManageResponse::from_pipeline(
370                    pipeline_name,
371                    version_str,
372                    start.elapsed().as_millis() as u64,
373                    None,
374                )
375            } else {
376                GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
377            }
378        })
379        .map_err(|e| {
380            error!(e; "failed to delete pipeline");
381            e
382        })
383}
384
385/// Transform NDJSON array into a single array
386/// always return an array
387fn transform_ndjson_array_factory(
388    values: impl IntoIterator<Item = Result<VrlValue, serde_json::Error>>,
389    ignore_error: bool,
390) -> Result<Vec<VrlValue>> {
391    values
392        .into_iter()
393        .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
394            Ok(item_value) => {
395                match item_value {
396                    VrlValue::Array(item_array) => {
397                        acc_array.extend(item_array);
398                    }
399                    VrlValue::Object(_) => {
400                        acc_array.push(item_value);
401                    }
402                    _ => {
403                        if !ignore_error {
404                            warn!("invalid item in array: {:?}", item_value);
405                            return InvalidParameterSnafu {
406                                reason: format!("invalid item: {} in array", item_value),
407                            }
408                            .fail();
409                        }
410                    }
411                }
412                Ok(acc_array)
413            }
414            Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
415            Err(_) => {
416                warn!("invalid item in array: {:?}", item);
417                Ok(acc_array)
418            }
419        })
420}
421
422/// Dryrun pipeline with given data
423async fn dryrun_pipeline_inner(
424    value: Vec<VrlValue>,
425    pipeline: Arc<pipeline::Pipeline>,
426    pipeline_handler: PipelineHandlerRef,
427    query_ctx: &QueryContextRef,
428) -> Result<Response> {
429    let params = GreptimePipelineParams::default();
430
431    let pipeline_def = PipelineDefinition::Resolved(pipeline);
432    let pipeline_ctx = PipelineContext::new(&pipeline_def, &params, query_ctx.channel());
433    let results = run_pipeline(
434        &pipeline_handler,
435        &pipeline_ctx,
436        PipelineIngestRequest {
437            table: "dry_run".to_owned(),
438            values: value,
439        },
440        query_ctx,
441        true,
442    )
443    .await?;
444
445    let column_type_key = "column_type";
446    let data_type_key = "data_type";
447    let name_key = "name";
448
449    let results = results
450        .all_req()
451        .filter_map(|row| {
452            if let Some(rows) = row.rows {
453                let table_name = row.table_name;
454                let result_schema = rows.schema;
455
456                let schema = result_schema
457                    .iter()
458                    .map(|cs| {
459                        let mut map = Map::new();
460                        map.insert(
461                            name_key.to_string(),
462                            JsonValue::String(cs.column_name.clone()),
463                        );
464                        map.insert(
465                            data_type_key.to_string(),
466                            JsonValue::String(cs.datatype().as_str_name().to_string()),
467                        );
468                        map.insert(
469                            column_type_key.to_string(),
470                            JsonValue::String(cs.semantic_type().as_str_name().to_string()),
471                        );
472                        map.insert(
473                            "fulltext".to_string(),
474                            JsonValue::Bool(
475                                cs.options
476                                    .clone()
477                                    .is_some_and(|x| x.options.contains_key("fulltext")),
478                            ),
479                        );
480                        JsonValue::Object(map)
481                    })
482                    .collect::<Vec<_>>();
483
484                let rows = rows
485                    .rows
486                    .into_iter()
487                    .map(|row| {
488                        row.values
489                            .into_iter()
490                            .enumerate()
491                            .map(|(idx, v)| {
492                                let mut map = Map::new();
493                                let value_ref = pb_value_to_value_ref(
494                                    &v,
495                                    result_schema[idx].datatype_extension.as_ref(),
496                                );
497                                let greptime_value: datatypes::value::Value = value_ref.into();
498                                let serde_json_value =
499                                    serde_json::Value::try_from(greptime_value).unwrap();
500                                map.insert("value".to_string(), serde_json_value);
501                                map.insert("key".to_string(), schema[idx][name_key].clone());
502                                map.insert(
503                                    "semantic_type".to_string(),
504                                    schema[idx][column_type_key].clone(),
505                                );
506                                map.insert(
507                                    "data_type".to_string(),
508                                    schema[idx][data_type_key].clone(),
509                                );
510                                JsonValue::Object(map)
511                            })
512                            .collect()
513                    })
514                    .collect();
515
516                let mut result = Map::new();
517                result.insert("schema".to_string(), JsonValue::Array(schema));
518                result.insert("rows".to_string(), JsonValue::Array(rows));
519                result.insert("table_name".to_string(), JsonValue::String(table_name));
520                let result = JsonValue::Object(result);
521                Some(result)
522            } else {
523                None
524            }
525        })
526        .collect();
527    Ok(Json(JsonValue::Array(results)).into_response())
528}
529
530/// Dryrun pipeline with given data
531/// pipeline_name and pipeline_version to specify pipeline stored in db
532/// pipeline to specify pipeline raw content
533/// data to specify data
534/// data maght be list of string or list of object
535#[derive(Debug, Default, Serialize, Deserialize)]
536pub struct PipelineDryrunParams {
537    pub pipeline_name: Option<String>,
538    pub pipeline_version: Option<String>,
539    pub pipeline: Option<String>,
540    pub data_type: Option<String>,
541    pub data: String,
542}
543
544/// Check if the payload is valid json
545/// Check if the payload contains pipeline or pipeline_name and data
546/// Return Some if valid, None if invalid
547fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
548    match serde_json::from_slice::<PipelineDryrunParams>(payload) {
549        // payload with pipeline or pipeline_name and data is array
550        Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
551        // because of the pipeline_name or pipeline is required
552        Ok(_) => None,
553        // invalid json
554        Err(_) => None,
555    }
556}
557
558/// Check if the pipeline_name exists
559fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
560    pipeline_name.context(InvalidParameterSnafu {
561        reason: "pipeline_name is required",
562    })
563}
564
565/// Check if the data length less than 10
566fn check_data_valid(data_len: usize) -> Result<()> {
567    ensure!(
568        data_len <= 10,
569        InvalidParameterSnafu {
570            reason: "data is required",
571        }
572    );
573    Ok(())
574}
575
576fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
577    let body = Json(json!({
578        "error": format!("{}: {}", step_msg,e.output_msg()),
579    }));
580
581    (status_code_to_http_status(&e.status_code()), body).into_response()
582}
583
584/// Parse the data with given content type
585/// If the content type is invalid, return error
586/// content type is one of application/json, text/plain, application/x-ndjson
587fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<VrlValue>> {
588    if let Ok(content_type) = ContentType::from_str(&data_type) {
589        extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
590    } else {
591        InvalidParameterSnafu {
592            reason: format!(
593                "invalid content type: {}, expected: one of {}",
594                data_type,
595                EventPayloadResolver::support_content_type_list().join(", ")
596            ),
597        }
598        .fail()
599    }
600}
601
602#[axum_macros::debug_handler]
603pub async fn pipeline_dryrun(
604    State(log_state): State<LogState>,
605    Query(query_params): Query<LogIngesterQueryParams>,
606    Extension(mut query_ctx): Extension<QueryContext>,
607    TypedHeader(content_type): TypedHeader<ContentType>,
608    payload: Bytes,
609) -> Result<Response> {
610    let handler = log_state.log_handler;
611
612    query_ctx.set_channel(Channel::Log);
613    let query_ctx = Arc::new(query_ctx);
614
615    match check_pipeline_dryrun_params_valid(&payload) {
616        Some(params) => {
617            let data = parse_dryrun_data(
618                params.data_type.unwrap_or("application/json".to_string()),
619                params.data,
620            )?;
621
622            check_data_valid(data.len())?;
623
624            match params.pipeline {
625                None => {
626                    let version = to_pipeline_version(params.pipeline_version.as_deref())
627                        .context(PipelineSnafu)?;
628                    let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
629                    let pipeline = handler
630                        .get_pipeline(&pipeline_name, version, query_ctx.clone())
631                        .await?;
632                    dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
633                }
634                Some(pipeline) => {
635                    let pipeline = handler.build_pipeline(&pipeline);
636                    match pipeline {
637                        Ok(pipeline) => {
638                            match dryrun_pipeline_inner(
639                                data,
640                                Arc::new(pipeline),
641                                handler,
642                                &query_ctx,
643                            )
644                            .await
645                            {
646                                Ok(response) => Ok(response),
647                                Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
648                                    "Failed to exec pipeline",
649                                    e,
650                                )),
651                            }
652                        }
653                        Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
654                            "Failed to build pipeline",
655                            e,
656                        )),
657                    }
658                }
659            }
660        }
661        None => {
662            // This path is for back compatibility with the previous dry run code
663            // where the payload is just data (JSON or plain text) and the pipeline name
664            // is specified using query param.
665            let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
666
667            let version =
668                to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
669
670            let ignore_errors = query_params.ignore_errors.unwrap_or(false);
671
672            let value =
673                extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
674
675            check_data_valid(value.len())?;
676
677            let pipeline = handler
678                .get_pipeline(&pipeline_name, version, query_ctx.clone())
679                .await?;
680
681            dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
682        }
683    }
684}
685
686pub(crate) fn extract_pipeline_params_map_from_headers(
687    headers: &HeaderMap,
688) -> ahash::HashMap<String, String> {
689    GreptimePipelineParams::parse_header_str_to_map(
690        headers
691            .get(GREPTIME_PIPELINE_PARAMS_HEADER)
692            .and_then(|v| v.to_str().ok()),
693    )
694}
695
696#[axum_macros::debug_handler]
697pub async fn log_ingester(
698    State(log_state): State<LogState>,
699    Query(query_params): Query<LogIngesterQueryParams>,
700    Extension(mut query_ctx): Extension<QueryContext>,
701    TypedHeader(content_type): TypedHeader<ContentType>,
702    headers: HeaderMap,
703    payload: Bytes,
704) -> Result<HttpResponse> {
705    // validate source and payload
706    let source = query_params.source.as_deref();
707    let response = match &log_state.log_validator {
708        Some(validator) => validator.validate(source, &payload).await,
709        None => None,
710    };
711    if let Some(response) = response {
712        return response;
713    }
714
715    let handler = log_state.log_handler;
716
717    let table_name = query_params.table.context(InvalidParameterSnafu {
718        reason: "table is required",
719    })?;
720
721    let ignore_errors = query_params.ignore_errors.unwrap_or(false);
722
723    let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
724        reason: "pipeline_name is required",
725    })?;
726    let skip_error = query_params.skip_error.unwrap_or(false);
727    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
728    let pipeline = PipelineDefinition::from_name(
729        &pipeline_name,
730        version,
731        query_params.custom_time_index.map(|s| (s, ignore_errors)),
732    )
733    .context(PipelineSnafu)?;
734
735    let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
736
737    query_ctx.set_channel(Channel::Log);
738    let query_ctx = Arc::new(query_ctx);
739
740    let value = log_state
741        .ingest_interceptor
742        .as_ref()
743        .pre_pipeline(value, query_ctx.clone())?;
744
745    let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
746    if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
747        pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
748    }
749    let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
750
751    ingest_logs_inner(
752        handler,
753        pipeline,
754        vec![PipelineIngestRequest {
755            table: table_name,
756            values: value,
757        }],
758        query_ctx,
759        pipeline_params,
760    )
761    .await
762}
763
764#[derive(Debug, EnumIter)]
765enum EventPayloadResolverInner {
766    Json,
767    Ndjson,
768    Text,
769}
770
771impl Display for EventPayloadResolverInner {
772    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
773        match self {
774            EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
775            EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
776            EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
777        }
778    }
779}
780
781impl TryFrom<&ContentType> for EventPayloadResolverInner {
782    type Error = Error;
783
784    fn try_from(content_type: &ContentType) -> Result<Self> {
785        let mime: mime_guess::Mime = content_type.clone().into();
786        match (mime.type_(), mime.subtype()) {
787            (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
788            (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
789                Ok(EventPayloadResolverInner::Ndjson)
790            }
791            (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
792            _ => InvalidParameterSnafu {
793                reason: format!(
794                    "invalid content type: {}, expected: one of {}",
795                    content_type,
796                    EventPayloadResolver::support_content_type_list().join(", ")
797                ),
798            }
799            .fail(),
800        }
801    }
802}
803
804#[derive(Debug)]
805struct EventPayloadResolver<'a> {
806    inner: EventPayloadResolverInner,
807    /// The content type of the payload.
808    /// keep it for logging original content type
809    #[allow(dead_code)]
810    content_type: &'a ContentType,
811}
812
813impl EventPayloadResolver<'_> {
814    pub(super) fn support_content_type_list() -> Vec<String> {
815        EventPayloadResolverInner::iter()
816            .map(|x| x.to_string())
817            .collect()
818    }
819}
820
821impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
822    type Error = Error;
823
824    fn try_from(content_type: &'a ContentType) -> Result<Self> {
825        let inner = EventPayloadResolverInner::try_from(content_type)?;
826        Ok(EventPayloadResolver {
827            inner,
828            content_type,
829        })
830    }
831}
832
833impl EventPayloadResolver<'_> {
834    fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<VrlValue>> {
835        match self.inner {
836            EventPayloadResolverInner::Json => transform_ndjson_array_factory(
837                Deserializer::from_slice(&payload).into_iter(),
838                ignore_errors,
839            ),
840            EventPayloadResolverInner::Ndjson => {
841                let mut result = Vec::with_capacity(1000);
842                let mut buffer = Buffers::new(1000);
843                for (index, line) in payload.lines().enumerate() {
844                    let mut line = match line {
845                        Ok(line) if !line.is_empty() => line,
846                        Ok(_) => continue, // Skip empty lines
847                        Err(_) if ignore_errors => continue,
848                        Err(e) => {
849                            warn!(e; "invalid string at index: {}", index);
850                            return InvalidParameterSnafu {
851                                reason: format!("invalid line at index: {}", index),
852                            }
853                            .fail();
854                        }
855                    };
856
857                    // simd_json, according to description, only de-escapes string at character level,
858                    // like any other json parser. So it should be safe here.
859                    if let Ok(v) = simd_json::serde::from_slice_with_buffers(
860                        unsafe { line.as_bytes_mut() },
861                        &mut buffer,
862                    ) {
863                        result.push(v);
864                    } else if !ignore_errors {
865                        warn!("invalid JSON at index: {}, content: {:?}", index, line);
866                        return InvalidParameterSnafu {
867                            reason: format!("invalid JSON at index: {}", index),
868                        }
869                        .fail();
870                    }
871                }
872                Ok(result)
873            }
874            EventPayloadResolverInner::Text => {
875                let result = payload
876                    .lines()
877                    .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
878                    .map(|line| {
879                        let mut map = BTreeMap::new();
880                        map.insert(
881                            KeyString::from("message"),
882                            VrlValue::Bytes(Bytes::from(line)),
883                        );
884                        VrlValue::Object(map)
885                    })
886                    .collect::<Vec<_>>();
887                Ok(result)
888            }
889        }
890    }
891}
892
893fn extract_pipeline_value_by_content_type(
894    content_type: ContentType,
895    payload: Bytes,
896    ignore_errors: bool,
897) -> Result<Vec<VrlValue>> {
898    EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
899        resolver
900            .parse_payload(payload, ignore_errors)
901            .map_err(|e| match &e {
902                Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
903                    if reason.contains("invalid item:") {
904                        InvalidParameterSnafu {
905                            reason: "json format error, please check the date is valid JSON.",
906                        }
907                        .build()
908                    } else {
909                        e
910                    }
911                }
912                _ => e,
913            })
914    })
915}
916
917pub(crate) async fn ingest_logs_inner(
918    handler: PipelineHandlerRef,
919    pipeline: PipelineDefinition,
920    log_ingest_requests: Vec<PipelineIngestRequest>,
921    query_ctx: QueryContextRef,
922    pipeline_params: GreptimePipelineParams,
923) -> Result<HttpResponse> {
924    // Keep the timer boundary before pipeline execution to preserve existing
925    // ingestion elapsed metrics.
926    let exec_timer = Instant::now();
927    let mut req = ContextReq::default();
928
929    let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
930    for pipeline_req in log_ingest_requests {
931        let requests =
932            run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
933
934        req.merge(requests);
935    }
936
937    execute_log_context_req(
938        handler,
939        req,
940        query_ctx,
941        exec_timer,
942        &METRIC_HTTP_LOGS_INGESTION_COUNTER,
943        &METRIC_HTTP_LOGS_INGESTION_ELAPSED,
944    )
945    .await
946}
947
948pub(crate) async fn execute_log_context_req(
949    handler: PipelineHandlerRef,
950    ctx_req: ContextReq,
951    query_ctx: QueryContextRef,
952    exec_timer: Instant,
953    counter: &IntCounterVec,
954    elapsed: &HistogramVec,
955) -> Result<HttpResponse> {
956    let db = query_ctx.get_db_string();
957
958    let mut outputs = Vec::with_capacity(ctx_req.map_len());
959    let mut total_rows: u64 = 0;
960    let mut fail = false;
961    for (temp_ctx, act_req) in ctx_req.as_req_iter(query_ctx) {
962        let output = handler.insert(act_req, temp_ctx).await;
963
964        if let Ok(Output {
965            data: OutputData::AffectedRows(rows),
966            meta: _,
967        }) = &output
968        {
969            total_rows += *rows as u64;
970        } else {
971            fail = true;
972        }
973        outputs.push(output);
974    }
975
976    // Record one aggregate metric sample for the whole ingestion request.
977    if total_rows > 0 {
978        counter.with_label_values(&[db.as_str()]).inc_by(total_rows);
979        elapsed
980            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
981            .observe(exec_timer.elapsed().as_secs_f64());
982    }
983    if fail {
984        elapsed
985            .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
986            .observe(exec_timer.elapsed().as_secs_f64());
987    }
988
989    let response = GreptimedbV1Response::from_output(outputs)
990        .await
991        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
992    Ok(response)
993}
994
995#[async_trait]
996pub trait LogValidator: Send + Sync {
997    /// validate payload by source before processing
998    /// Return a `Some` result to indicate validation failure.
999    async fn validate(&self, source: Option<&str>, payload: &Bytes)
1000    -> Option<Result<HttpResponse>>;
1001}
1002
1003pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
1004
1005/// axum state struct to hold log handler and validator
1006#[derive(Clone)]
1007pub struct LogState {
1008    pub log_handler: PipelineHandlerRef,
1009    pub log_validator: Option<LogValidatorRef>,
1010    pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015
1016    use super::*;
1017
1018    #[test]
1019    fn test_transform_ndjson() {
1020        let s = "{\"a\": 1}\n{\"b\": 2}";
1021        let a = serde_json::to_string(
1022            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1023        )
1024        .unwrap();
1025        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1026
1027        let s = "{\"a\": 1}";
1028        let a = serde_json::to_string(
1029            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1030        )
1031        .unwrap();
1032        assert_eq!(a, "[{\"a\":1}]");
1033
1034        let s = "[{\"a\": 1}]";
1035        let a = serde_json::to_string(
1036            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1037        )
1038        .unwrap();
1039        assert_eq!(a, "[{\"a\":1}]");
1040
1041        let s = "[{\"a\": 1}, {\"b\": 2}]";
1042        let a = serde_json::to_string(
1043            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1044        )
1045        .unwrap();
1046        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1047    }
1048
1049    #[test]
1050    fn test_extract_by_content() {
1051        let payload = r#"
1052        {"a": 1}
1053        {"b": 2"}
1054        {"c": 1}
1055"#
1056        .as_bytes();
1057        let payload = Bytes::from_static(payload);
1058
1059        let fail_rest =
1060            extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
1061        assert!(fail_rest.is_ok());
1062        assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1}).into()]);
1063
1064        let fail_only_wrong =
1065            extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
1066        assert!(fail_only_wrong.is_ok());
1067
1068        let mut map1 = BTreeMap::new();
1069        map1.insert(KeyString::from("a"), VrlValue::Integer(1));
1070        let map1 = VrlValue::Object(map1);
1071        let mut map2 = BTreeMap::new();
1072        map2.insert(KeyString::from("c"), VrlValue::Integer(1));
1073        let map2 = VrlValue::Object(map2);
1074        assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
1075    }
1076}