Skip to main content

servers/http/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use api::helper::pb_value_to_value_ref;
23use async_trait::async_trait;
24use axum::body::Bytes;
25use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
26use axum::http::header::CONTENT_TYPE;
27use axum::http::{HeaderMap, StatusCode};
28use axum::response::{IntoResponse, Response};
29use axum::{Extension, Json};
30use axum_extra::TypedHeader;
31use common_catalog::consts::default_engine;
32use common_error::ext::{BoxedError, ErrorExt};
33use common_query::{Output, OutputData};
34use common_telemetry::{error, warn};
35use headers::ContentType;
36use lazy_static::lazy_static;
37use mime_guess::mime;
38use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create};
39use pipeline::util::to_pipeline_version;
40use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
41use serde::{Deserialize, Serialize};
42use serde_json::{Deserializer, Map, Value as JsonValue, json};
43use session::context::{Channel, QueryContext, QueryContextRef};
44use simd_json::Buffers;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::mito_engine_options::APPEND_MODE_KEY;
47use strum::{EnumIter, IntoEnumIterator};
48use table::table_reference::TableReference;
49use vrl::value::{KeyString, Value as VrlValue};
50
51use crate::error::{
52    Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
53    status_code_to_http_status,
54};
55use crate::http::HttpResponse;
56use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
57use crate::http::header::{
58    CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
59};
60use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput};
61use crate::http::result::greptime_result_v1::GreptimedbV1Response;
62use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
63use crate::metrics::{
64    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
65    METRIC_SUCCESS_VALUE,
66};
67use crate::pipeline::run_pipeline;
68use crate::query_handler::PipelineHandlerRef;
69
70const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
71const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
72
73const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str =
74    "the pipeline has dispatcher or table_suffix, the table name may not be fixed";
75const CREATE_TABLE_SQL_TABLE_EXISTS: &str =
76    "table already exists, the CREATE TABLE SQL may be different";
77
78lazy_static! {
79    pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
80    pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
81    pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
82    pub static ref PB_CONTENT_TYPE: ContentType =
83        ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
84    pub static ref NDJSON_CONTENT_TYPE: ContentType =
85        ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
86}
87
88/// LogIngesterQueryParams is used for query params of log ingester API.
89#[derive(Debug, Default, Serialize, Deserialize)]
90pub struct LogIngesterQueryParams {
91    /// The database where log data will be written to.
92    pub db: Option<String>,
93    /// The table where log data will be written to.
94    pub table: Option<String>,
95    /// The pipeline that will be used for log ingestion.
96    pub pipeline_name: Option<String>,
97    /// The version of the pipeline to be used for log ingestion.
98    pub version: Option<String>,
99    /// Whether to ignore errors during log ingestion.
100    pub ignore_errors: Option<bool>,
101    /// The source of the log data.
102    pub source: Option<String>,
103    /// The JSON field name of the log message. If not provided, it will take the whole log as the message.
104    /// The field must be at the top level of the JSON structure.
105    pub msg_field: Option<String>,
106    /// Specify a custom time index from the input data rather than server's arrival time.
107    /// Valid formats:
108    /// - <field_name>;epoch;<resolution>
109    /// - <field_name>;datestr;<format>
110    ///
111    /// If an error occurs while parsing the config, the error will be returned in the response.
112    /// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored.
113    /// If so, use the current server's timestamp as the event time.
114    pub custom_time_index: Option<String>,
115    /// Whether to skip errors during log ingestion.
116    /// If set to true, the ingestion will continue even if there are errors in the data.
117    /// If set to false, the ingestion will stop at the first error.
118    /// This is different from `ignore_errors`, which is used to ignore errors during the pipeline execution.
119    /// The priority of query params is lower than that headers of x-greptime-pipeline-params.
120    pub skip_error: Option<bool>,
121}
122
123/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
124/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
125#[derive(Debug, PartialEq)]
126pub(crate) struct PipelineIngestRequest {
127    /// The table where the log data will be written to.
128    pub table: String,
129    /// The log data to be ingested.
130    pub values: Vec<VrlValue>,
131}
132
133pub struct PipelineContent(String);
134
135impl<S> FromRequest<S> for PipelineContent
136where
137    S: Send + Sync,
138{
139    type Rejection = Response;
140
141    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
142        let content_type_header = req.headers().get(CONTENT_TYPE);
143        let content_type = content_type_header.and_then(|value| value.to_str().ok());
144        if let Some(content_type) = content_type {
145            if content_type.ends_with("yaml") {
146                let payload = String::from_request(req, state)
147                    .await
148                    .map_err(IntoResponse::into_response)?;
149                return Ok(Self(payload));
150            }
151
152            if content_type.starts_with("multipart/form-data") {
153                let mut payload: Multipart = Multipart::from_request(req, state)
154                    .await
155                    .map_err(IntoResponse::into_response)?;
156                let file = payload
157                    .next_field()
158                    .await
159                    .map_err(IntoResponse::into_response)?;
160                let payload = file
161                    .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
162                    .text()
163                    .await
164                    .map_err(IntoResponse::into_response)?;
165                return Ok(Self(payload));
166            }
167        }
168
169        Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
170    }
171}
172
173#[axum_macros::debug_handler]
174pub async fn query_pipeline(
175    State(state): State<LogState>,
176    Extension(mut query_ctx): Extension<QueryContext>,
177    Query(query_params): Query<LogIngesterQueryParams>,
178    Path(pipeline_name): Path<String>,
179) -> Result<GreptimedbManageResponse> {
180    let start = Instant::now();
181    let handler = state.log_handler;
182    ensure!(
183        !pipeline_name.is_empty(),
184        InvalidParameterSnafu {
185            reason: "pipeline_name is required in path",
186        }
187    );
188
189    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
190
191    query_ctx.set_channel(Channel::Log);
192    let query_ctx = Arc::new(query_ctx);
193
194    let (pipeline, pipeline_version) = handler
195        .get_pipeline_str(&pipeline_name, version, query_ctx)
196        .await?;
197
198    Ok(GreptimedbManageResponse::from_pipeline(
199        pipeline_name,
200        query_params
201            .version
202            .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
203        start.elapsed().as_millis() as u64,
204        Some(pipeline),
205    ))
206}
207
208/// Generate DDL from pipeline definition.
209#[axum_macros::debug_handler]
210pub async fn query_pipeline_ddl(
211    State(state): State<LogState>,
212    Extension(mut query_ctx): Extension<QueryContext>,
213    Query(query_params): Query<LogIngesterQueryParams>,
214    Path(pipeline_name): Path<String>,
215) -> Result<GreptimedbManageResponse> {
216    let start = Instant::now();
217    let handler = state.log_handler;
218    ensure!(
219        !pipeline_name.is_empty(),
220        InvalidParameterSnafu {
221            reason: "pipeline_name is required in path",
222        }
223    );
224    ensure!(
225        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
226        InvalidParameterSnafu {
227            reason: "built-in pipelines don't have fixed table schema",
228        }
229    );
230    let table_name = query_params.table.context(InvalidParameterSnafu {
231        reason: "table name is required",
232    })?;
233
234    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
235
236    query_ctx.set_channel(Channel::Log);
237    let query_ctx = Arc::new(query_ctx);
238
239    let pipeline = handler
240        .get_pipeline(&pipeline_name, version, query_ctx.clone())
241        .await?;
242
243    let schemas_def = pipeline.schemas().context(InvalidParameterSnafu {
244        reason: "auto transform doesn't have fixed table schema",
245    })?;
246
247    let schema = query_ctx.current_schema();
248    let table_name_ref = TableReference {
249        catalog: query_ctx.current_catalog(),
250        schema: &schema,
251        table: &table_name,
252    };
253
254    let mut create_table_expr =
255        create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None)
256            .map_err(BoxedError::new)
257            .context(OtherSnafu)?;
258
259    // manually set the append_mode to true
260    create_table_expr
261        .table_options
262        .insert(APPEND_MODE_KEY.to_string(), "true".to_string());
263
264    let expr = expr_to_create(&create_table_expr, None)
265        .map_err(BoxedError::new)
266        .context(OtherSnafu)?;
267
268    let message = if handler.get_table(&table_name, &query_ctx).await?.is_some() {
269        Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
270    } else if pipeline.is_variant_table_name() {
271        Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
272    } else {
273        None
274    };
275
276    let sql = SqlOutput {
277        sql: format!("{:#}", expr),
278        message,
279    };
280
281    Ok(GreptimedbManageResponse::from_sql(
282        sql,
283        start.elapsed().as_millis() as u64,
284    ))
285}
286
287#[axum_macros::debug_handler]
288pub async fn add_pipeline(
289    State(state): State<LogState>,
290    Path(pipeline_name): Path<String>,
291    Extension(mut query_ctx): Extension<QueryContext>,
292    PipelineContent(payload): PipelineContent,
293) -> Result<GreptimedbManageResponse> {
294    let start = Instant::now();
295    let handler = state.log_handler;
296    ensure!(
297        !pipeline_name.is_empty(),
298        InvalidParameterSnafu {
299            reason: "pipeline_name is required in path",
300        }
301    );
302    ensure!(
303        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
304        InvalidParameterSnafu {
305            reason: "pipeline_name cannot start with greptime_",
306        }
307    );
308    ensure!(
309        !payload.is_empty(),
310        InvalidParameterSnafu {
311            reason: "pipeline is required in body",
312        }
313    );
314
315    query_ctx.set_channel(Channel::Log);
316    let query_ctx = Arc::new(query_ctx);
317
318    let content_type = "yaml";
319    let result = handler
320        .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
321        .await;
322
323    result
324        .map(|pipeline| {
325            GreptimedbManageResponse::from_pipeline(
326                pipeline_name,
327                pipeline.0.to_timezone_aware_string(None),
328                start.elapsed().as_millis() as u64,
329                None,
330            )
331        })
332        .map_err(|e| {
333            error!(e; "failed to insert pipeline");
334            e
335        })
336}
337
338#[axum_macros::debug_handler]
339pub async fn delete_pipeline(
340    State(state): State<LogState>,
341    Extension(mut query_ctx): Extension<QueryContext>,
342    Query(query_params): Query<LogIngesterQueryParams>,
343    Path(pipeline_name): Path<String>,
344) -> Result<GreptimedbManageResponse> {
345    let start = Instant::now();
346    let handler = state.log_handler;
347    ensure!(
348        !pipeline_name.is_empty(),
349        InvalidParameterSnafu {
350            reason: "pipeline_name is required",
351        }
352    );
353
354    let version_str = query_params.version.context(InvalidParameterSnafu {
355        reason: "version is required",
356    })?;
357
358    let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
359
360    query_ctx.set_channel(Channel::Log);
361    let query_ctx = Arc::new(query_ctx);
362
363    handler
364        .delete_pipeline(&pipeline_name, version, query_ctx)
365        .await
366        .map(|v| {
367            if v.is_some() {
368                GreptimedbManageResponse::from_pipeline(
369                    pipeline_name,
370                    version_str,
371                    start.elapsed().as_millis() as u64,
372                    None,
373                )
374            } else {
375                GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
376            }
377        })
378        .map_err(|e| {
379            error!(e; "failed to delete pipeline");
380            e
381        })
382}
383
384/// Transform NDJSON array into a single array
385/// always return an array
386fn transform_ndjson_array_factory(
387    values: impl IntoIterator<Item = Result<VrlValue, serde_json::Error>>,
388    ignore_error: bool,
389) -> Result<Vec<VrlValue>> {
390    values
391        .into_iter()
392        .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
393            Ok(item_value) => {
394                match item_value {
395                    VrlValue::Array(item_array) => {
396                        acc_array.extend(item_array);
397                    }
398                    VrlValue::Object(_) => {
399                        acc_array.push(item_value);
400                    }
401                    _ => {
402                        if !ignore_error {
403                            warn!("invalid item in array: {:?}", item_value);
404                            return InvalidParameterSnafu {
405                                reason: format!("invalid item: {} in array", item_value),
406                            }
407                            .fail();
408                        }
409                    }
410                }
411                Ok(acc_array)
412            }
413            Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
414            Err(_) => {
415                warn!("invalid item in array: {:?}", item);
416                Ok(acc_array)
417            }
418        })
419}
420
421/// Dryrun pipeline with given data
422async fn dryrun_pipeline_inner(
423    value: Vec<VrlValue>,
424    pipeline: Arc<pipeline::Pipeline>,
425    pipeline_handler: PipelineHandlerRef,
426    query_ctx: &QueryContextRef,
427) -> Result<Response> {
428    let params = GreptimePipelineParams::default();
429
430    let pipeline_def = PipelineDefinition::Resolved(pipeline);
431    let pipeline_ctx = PipelineContext::new(&pipeline_def, &params, query_ctx.channel());
432    let results = run_pipeline(
433        &pipeline_handler,
434        &pipeline_ctx,
435        PipelineIngestRequest {
436            table: "dry_run".to_owned(),
437            values: value,
438        },
439        query_ctx,
440        true,
441    )
442    .await?;
443
444    let column_type_key = "column_type";
445    let data_type_key = "data_type";
446    let name_key = "name";
447
448    let results = results
449        .all_req()
450        .filter_map(|row| {
451            if let Some(rows) = row.rows {
452                let table_name = row.table_name;
453                let result_schema = rows.schema;
454
455                let schema = result_schema
456                    .iter()
457                    .map(|cs| {
458                        let mut map = Map::new();
459                        map.insert(
460                            name_key.to_string(),
461                            JsonValue::String(cs.column_name.clone()),
462                        );
463                        map.insert(
464                            data_type_key.to_string(),
465                            JsonValue::String(cs.datatype().as_str_name().to_string()),
466                        );
467                        map.insert(
468                            column_type_key.to_string(),
469                            JsonValue::String(cs.semantic_type().as_str_name().to_string()),
470                        );
471                        map.insert(
472                            "fulltext".to_string(),
473                            JsonValue::Bool(
474                                cs.options
475                                    .clone()
476                                    .is_some_and(|x| x.options.contains_key("fulltext")),
477                            ),
478                        );
479                        JsonValue::Object(map)
480                    })
481                    .collect::<Vec<_>>();
482
483                let rows = rows
484                    .rows
485                    .into_iter()
486                    .map(|row| {
487                        row.values
488                            .into_iter()
489                            .enumerate()
490                            .map(|(idx, v)| {
491                                let mut map = Map::new();
492                                let value_ref = pb_value_to_value_ref(
493                                    &v,
494                                    result_schema[idx].datatype_extension.as_ref(),
495                                );
496                                let greptime_value: datatypes::value::Value = value_ref.into();
497                                let serde_json_value =
498                                    serde_json::Value::try_from(greptime_value).unwrap();
499                                map.insert("value".to_string(), serde_json_value);
500                                map.insert("key".to_string(), schema[idx][name_key].clone());
501                                map.insert(
502                                    "semantic_type".to_string(),
503                                    schema[idx][column_type_key].clone(),
504                                );
505                                map.insert(
506                                    "data_type".to_string(),
507                                    schema[idx][data_type_key].clone(),
508                                );
509                                JsonValue::Object(map)
510                            })
511                            .collect()
512                    })
513                    .collect();
514
515                let mut result = Map::new();
516                result.insert("schema".to_string(), JsonValue::Array(schema));
517                result.insert("rows".to_string(), JsonValue::Array(rows));
518                result.insert("table_name".to_string(), JsonValue::String(table_name));
519                let result = JsonValue::Object(result);
520                Some(result)
521            } else {
522                None
523            }
524        })
525        .collect();
526    Ok(Json(JsonValue::Array(results)).into_response())
527}
528
529/// Dryrun pipeline with given data
530/// pipeline_name and pipeline_version to specify pipeline stored in db
531/// pipeline to specify pipeline raw content
532/// data to specify data
533/// data maght be list of string or list of object
534#[derive(Debug, Default, Serialize, Deserialize)]
535pub struct PipelineDryrunParams {
536    pub pipeline_name: Option<String>,
537    pub pipeline_version: Option<String>,
538    pub pipeline: Option<String>,
539    pub data_type: Option<String>,
540    pub data: String,
541}
542
543/// Check if the payload is valid json
544/// Check if the payload contains pipeline or pipeline_name and data
545/// Return Some if valid, None if invalid
546fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
547    match serde_json::from_slice::<PipelineDryrunParams>(payload) {
548        // payload with pipeline or pipeline_name and data is array
549        Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
550        // because of the pipeline_name or pipeline is required
551        Ok(_) => None,
552        // invalid json
553        Err(_) => None,
554    }
555}
556
557/// Check if the pipeline_name exists
558fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
559    pipeline_name.context(InvalidParameterSnafu {
560        reason: "pipeline_name is required",
561    })
562}
563
564/// Check if the data length less than 10
565fn check_data_valid(data_len: usize) -> Result<()> {
566    ensure!(
567        data_len <= 10,
568        InvalidParameterSnafu {
569            reason: "data is required",
570        }
571    );
572    Ok(())
573}
574
575fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
576    let body = Json(json!({
577        "error": format!("{}: {}", step_msg,e.output_msg()),
578    }));
579
580    (status_code_to_http_status(&e.status_code()), body).into_response()
581}
582
583/// Parse the data with given content type
584/// If the content type is invalid, return error
585/// content type is one of application/json, text/plain, application/x-ndjson
586fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<VrlValue>> {
587    if let Ok(content_type) = ContentType::from_str(&data_type) {
588        extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
589    } else {
590        InvalidParameterSnafu {
591            reason: format!(
592                "invalid content type: {}, expected: one of {}",
593                data_type,
594                EventPayloadResolver::support_content_type_list().join(", ")
595            ),
596        }
597        .fail()
598    }
599}
600
601#[axum_macros::debug_handler]
602pub async fn pipeline_dryrun(
603    State(log_state): State<LogState>,
604    Query(query_params): Query<LogIngesterQueryParams>,
605    Extension(mut query_ctx): Extension<QueryContext>,
606    TypedHeader(content_type): TypedHeader<ContentType>,
607    payload: Bytes,
608) -> Result<Response> {
609    let handler = log_state.log_handler;
610
611    query_ctx.set_channel(Channel::Log);
612    let query_ctx = Arc::new(query_ctx);
613
614    match check_pipeline_dryrun_params_valid(&payload) {
615        Some(params) => {
616            let data = parse_dryrun_data(
617                params.data_type.unwrap_or("application/json".to_string()),
618                params.data,
619            )?;
620
621            check_data_valid(data.len())?;
622
623            match params.pipeline {
624                None => {
625                    let version = to_pipeline_version(params.pipeline_version.as_deref())
626                        .context(PipelineSnafu)?;
627                    let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
628                    let pipeline = handler
629                        .get_pipeline(&pipeline_name, version, query_ctx.clone())
630                        .await?;
631                    dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
632                }
633                Some(pipeline) => {
634                    let pipeline = handler.build_pipeline(&pipeline);
635                    match pipeline {
636                        Ok(pipeline) => {
637                            match dryrun_pipeline_inner(
638                                data,
639                                Arc::new(pipeline),
640                                handler,
641                                &query_ctx,
642                            )
643                            .await
644                            {
645                                Ok(response) => Ok(response),
646                                Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
647                                    "Failed to exec pipeline",
648                                    e,
649                                )),
650                            }
651                        }
652                        Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
653                            "Failed to build pipeline",
654                            e,
655                        )),
656                    }
657                }
658            }
659        }
660        None => {
661            // This path is for back compatibility with the previous dry run code
662            // where the payload is just data (JSON or plain text) and the pipeline name
663            // is specified using query param.
664            let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
665
666            let version =
667                to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
668
669            let ignore_errors = query_params.ignore_errors.unwrap_or(false);
670
671            let value =
672                extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
673
674            check_data_valid(value.len())?;
675
676            let pipeline = handler
677                .get_pipeline(&pipeline_name, version, query_ctx.clone())
678                .await?;
679
680            dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
681        }
682    }
683}
684
685pub(crate) fn extract_pipeline_params_map_from_headers(
686    headers: &HeaderMap,
687) -> ahash::HashMap<String, String> {
688    GreptimePipelineParams::parse_header_str_to_map(
689        headers
690            .get(GREPTIME_PIPELINE_PARAMS_HEADER)
691            .and_then(|v| v.to_str().ok()),
692    )
693}
694
695#[axum_macros::debug_handler]
696pub async fn log_ingester(
697    State(log_state): State<LogState>,
698    Query(query_params): Query<LogIngesterQueryParams>,
699    Extension(mut query_ctx): Extension<QueryContext>,
700    TypedHeader(content_type): TypedHeader<ContentType>,
701    headers: HeaderMap,
702    payload: Bytes,
703) -> Result<HttpResponse> {
704    // validate source and payload
705    let source = query_params.source.as_deref();
706    let response = match &log_state.log_validator {
707        Some(validator) => validator.validate(source, &payload).await,
708        None => None,
709    };
710    if let Some(response) = response {
711        return response;
712    }
713
714    let handler = log_state.log_handler;
715
716    let table_name = query_params.table.context(InvalidParameterSnafu {
717        reason: "table is required",
718    })?;
719
720    let ignore_errors = query_params.ignore_errors.unwrap_or(false);
721
722    let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
723        reason: "pipeline_name is required",
724    })?;
725    let skip_error = query_params.skip_error.unwrap_or(false);
726    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
727    let pipeline = PipelineDefinition::from_name(
728        &pipeline_name,
729        version,
730        query_params.custom_time_index.map(|s| (s, ignore_errors)),
731    )
732    .context(PipelineSnafu)?;
733
734    let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
735
736    query_ctx.set_channel(Channel::Log);
737    let query_ctx = Arc::new(query_ctx);
738
739    let value = log_state
740        .ingest_interceptor
741        .as_ref()
742        .pre_pipeline(value, query_ctx.clone())?;
743
744    let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
745    if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
746        pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
747    }
748    let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
749
750    ingest_logs_inner(
751        handler,
752        pipeline,
753        vec![PipelineIngestRequest {
754            table: table_name,
755            values: value,
756        }],
757        query_ctx,
758        pipeline_params,
759    )
760    .await
761}
762
763#[derive(Debug, EnumIter)]
764enum EventPayloadResolverInner {
765    Json,
766    Ndjson,
767    Text,
768}
769
770impl Display for EventPayloadResolverInner {
771    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
772        match self {
773            EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
774            EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
775            EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
776        }
777    }
778}
779
780impl TryFrom<&ContentType> for EventPayloadResolverInner {
781    type Error = Error;
782
783    fn try_from(content_type: &ContentType) -> Result<Self> {
784        let mime: mime_guess::Mime = content_type.clone().into();
785        match (mime.type_(), mime.subtype()) {
786            (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
787            (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
788                Ok(EventPayloadResolverInner::Ndjson)
789            }
790            (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
791            _ => InvalidParameterSnafu {
792                reason: format!(
793                    "invalid content type: {}, expected: one of {}",
794                    content_type,
795                    EventPayloadResolver::support_content_type_list().join(", ")
796                ),
797            }
798            .fail(),
799        }
800    }
801}
802
803#[derive(Debug)]
804struct EventPayloadResolver<'a> {
805    inner: EventPayloadResolverInner,
806    /// The content type of the payload.
807    /// keep it for logging original content type
808    #[allow(dead_code)]
809    content_type: &'a ContentType,
810}
811
812impl EventPayloadResolver<'_> {
813    pub(super) fn support_content_type_list() -> Vec<String> {
814        EventPayloadResolverInner::iter()
815            .map(|x| x.to_string())
816            .collect()
817    }
818}
819
820impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
821    type Error = Error;
822
823    fn try_from(content_type: &'a ContentType) -> Result<Self> {
824        let inner = EventPayloadResolverInner::try_from(content_type)?;
825        Ok(EventPayloadResolver {
826            inner,
827            content_type,
828        })
829    }
830}
831
832impl EventPayloadResolver<'_> {
833    fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<VrlValue>> {
834        match self.inner {
835            EventPayloadResolverInner::Json => transform_ndjson_array_factory(
836                Deserializer::from_slice(&payload).into_iter(),
837                ignore_errors,
838            ),
839            EventPayloadResolverInner::Ndjson => {
840                let mut result = Vec::with_capacity(1000);
841                let mut buffer = Buffers::new(1000);
842                for (index, line) in payload.lines().enumerate() {
843                    let mut line = match line {
844                        Ok(line) if !line.is_empty() => line,
845                        Ok(_) => continue, // Skip empty lines
846                        Err(_) if ignore_errors => continue,
847                        Err(e) => {
848                            warn!(e; "invalid string at index: {}", index);
849                            return InvalidParameterSnafu {
850                                reason: format!("invalid line at index: {}", index),
851                            }
852                            .fail();
853                        }
854                    };
855
856                    // simd_json, according to description, only de-escapes string at character level,
857                    // like any other json parser. So it should be safe here.
858                    if let Ok(v) = simd_json::serde::from_slice_with_buffers(
859                        unsafe { line.as_bytes_mut() },
860                        &mut buffer,
861                    ) {
862                        result.push(v);
863                    } else if !ignore_errors {
864                        warn!("invalid JSON at index: {}, content: {:?}", index, line);
865                        return InvalidParameterSnafu {
866                            reason: format!("invalid JSON at index: {}", index),
867                        }
868                        .fail();
869                    }
870                }
871                Ok(result)
872            }
873            EventPayloadResolverInner::Text => {
874                let result = payload
875                    .lines()
876                    .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
877                    .map(|line| {
878                        let mut map = BTreeMap::new();
879                        map.insert(
880                            KeyString::from("message"),
881                            VrlValue::Bytes(Bytes::from(line)),
882                        );
883                        VrlValue::Object(map)
884                    })
885                    .collect::<Vec<_>>();
886                Ok(result)
887            }
888        }
889    }
890}
891
892fn extract_pipeline_value_by_content_type(
893    content_type: ContentType,
894    payload: Bytes,
895    ignore_errors: bool,
896) -> Result<Vec<VrlValue>> {
897    EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
898        resolver
899            .parse_payload(payload, ignore_errors)
900            .map_err(|e| match &e {
901                Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
902                    if reason.contains("invalid item:") {
903                        InvalidParameterSnafu {
904                            reason: "json format error, please check the date is valid JSON.",
905                        }
906                        .build()
907                    } else {
908                        e
909                    }
910                }
911                _ => e,
912            })
913    })
914}
915
916pub(crate) async fn ingest_logs_inner(
917    handler: PipelineHandlerRef,
918    pipeline: PipelineDefinition,
919    log_ingest_requests: Vec<PipelineIngestRequest>,
920    query_ctx: QueryContextRef,
921    pipeline_params: GreptimePipelineParams,
922) -> Result<HttpResponse> {
923    let db = query_ctx.get_db_string();
924    let exec_timer = std::time::Instant::now();
925
926    let mut req = ContextReq::default();
927
928    let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
929    for pipeline_req in log_ingest_requests {
930        let requests =
931            run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
932
933        req.merge(requests);
934    }
935
936    let mut outputs = Vec::new();
937    let mut total_rows: u64 = 0;
938    let mut fail = false;
939    for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
940        let output = handler.insert(act_req, temp_ctx).await;
941
942        if let Ok(Output {
943            data: OutputData::AffectedRows(rows),
944            meta: _,
945        }) = &output
946        {
947            total_rows += *rows as u64;
948        } else {
949            fail = true;
950        }
951        outputs.push(output);
952    }
953
954    if total_rows > 0 {
955        METRIC_HTTP_LOGS_INGESTION_COUNTER
956            .with_label_values(&[db.as_str()])
957            .inc_by(total_rows);
958        METRIC_HTTP_LOGS_INGESTION_ELAPSED
959            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
960            .observe(exec_timer.elapsed().as_secs_f64());
961    }
962    if fail {
963        METRIC_HTTP_LOGS_INGESTION_ELAPSED
964            .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
965            .observe(exec_timer.elapsed().as_secs_f64());
966    }
967
968    let response = GreptimedbV1Response::from_output(outputs)
969        .await
970        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
971    Ok(response)
972}
973
974#[async_trait]
975pub trait LogValidator: Send + Sync {
976    /// validate payload by source before processing
977    /// Return a `Some` result to indicate validation failure.
978    async fn validate(&self, source: Option<&str>, payload: &Bytes)
979    -> Option<Result<HttpResponse>>;
980}
981
982pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
983
984/// axum state struct to hold log handler and validator
985#[derive(Clone)]
986pub struct LogState {
987    pub log_handler: PipelineHandlerRef,
988    pub log_validator: Option<LogValidatorRef>,
989    pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
990}
991
992#[cfg(test)]
993mod tests {
994
995    use super::*;
996
997    #[test]
998    fn test_transform_ndjson() {
999        let s = "{\"a\": 1}\n{\"b\": 2}";
1000        let a = serde_json::to_string(
1001            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1002        )
1003        .unwrap();
1004        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1005
1006        let s = "{\"a\": 1}";
1007        let a = serde_json::to_string(
1008            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1009        )
1010        .unwrap();
1011        assert_eq!(a, "[{\"a\":1}]");
1012
1013        let s = "[{\"a\": 1}]";
1014        let a = serde_json::to_string(
1015            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1016        )
1017        .unwrap();
1018        assert_eq!(a, "[{\"a\":1}]");
1019
1020        let s = "[{\"a\": 1}, {\"b\": 2}]";
1021        let a = serde_json::to_string(
1022            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1023        )
1024        .unwrap();
1025        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1026    }
1027
1028    #[test]
1029    fn test_extract_by_content() {
1030        let payload = r#"
1031        {"a": 1}
1032        {"b": 2"}
1033        {"c": 1}
1034"#
1035        .as_bytes();
1036        let payload = Bytes::from_static(payload);
1037
1038        let fail_rest =
1039            extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
1040        assert!(fail_rest.is_ok());
1041        assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1}).into()]);
1042
1043        let fail_only_wrong =
1044            extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
1045        assert!(fail_only_wrong.is_ok());
1046
1047        let mut map1 = BTreeMap::new();
1048        map1.insert(KeyString::from("a"), VrlValue::Integer(1));
1049        let map1 = VrlValue::Object(map1);
1050        let mut map2 = BTreeMap::new();
1051        map2.insert(KeyString::from("c"), VrlValue::Integer(1));
1052        let map2 = VrlValue::Object(map2);
1053        assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
1054    }
1055}