servers/http/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use async_trait::async_trait;
23use axum::body::Bytes;
24use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
25use axum::http::header::CONTENT_TYPE;
26use axum::http::{HeaderMap, StatusCode};
27use axum::response::{IntoResponse, Response};
28use axum::{Extension, Json};
29use axum_extra::TypedHeader;
30use common_catalog::consts::default_engine;
31use common_error::ext::{BoxedError, ErrorExt};
32use common_query::{Output, OutputData};
33use common_telemetry::{error, warn};
34use datatypes::value::column_data_to_json;
35use headers::ContentType;
36use lazy_static::lazy_static;
37use mime_guess::mime;
38use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create};
39use pipeline::util::to_pipeline_version;
40use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
41use serde::{Deserialize, Serialize};
42use serde_json::{Deserializer, Map, Value as JsonValue, json};
43use session::context::{Channel, QueryContext, QueryContextRef};
44use simd_json::Buffers;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::mito_engine_options::APPEND_MODE_KEY;
47use strum::{EnumIter, IntoEnumIterator};
48use table::table_reference::TableReference;
49use vrl::value::{KeyString, Value as VrlValue};
50
51use crate::error::{
52    CatalogSnafu, Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
53    status_code_to_http_status,
54};
55use crate::http::HttpResponse;
56use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
57use crate::http::header::{
58    CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
59};
60use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput};
61use crate::http::result::greptime_result_v1::GreptimedbV1Response;
62use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
63use crate::metrics::{
64    METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
65    METRIC_SUCCESS_VALUE,
66};
67use crate::pipeline::run_pipeline;
68use crate::query_handler::PipelineHandlerRef;
69
70const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
71const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
72
73const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str =
74    "the pipeline has dispatcher or table_suffix, the table name may not be fixed";
75const CREATE_TABLE_SQL_TABLE_EXISTS: &str =
76    "table already exists, the CREATE TABLE SQL may be different";
77
78lazy_static! {
79    pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
80    pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
81    pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
82    pub static ref PB_CONTENT_TYPE: ContentType =
83        ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
84    pub static ref NDJSON_CONTENT_TYPE: ContentType =
85        ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
86}
87
88/// LogIngesterQueryParams is used for query params of log ingester API.
89#[derive(Debug, Default, Serialize, Deserialize)]
90pub struct LogIngesterQueryParams {
91    /// The database where log data will be written to.
92    pub db: Option<String>,
93    /// The table where log data will be written to.
94    pub table: Option<String>,
95    /// The pipeline that will be used for log ingestion.
96    pub pipeline_name: Option<String>,
97    /// The version of the pipeline to be used for log ingestion.
98    pub version: Option<String>,
99    /// Whether to ignore errors during log ingestion.
100    pub ignore_errors: Option<bool>,
101    /// The source of the log data.
102    pub source: Option<String>,
103    /// The JSON field name of the log message. If not provided, it will take the whole log as the message.
104    /// The field must be at the top level of the JSON structure.
105    pub msg_field: Option<String>,
106    /// Specify a custom time index from the input data rather than server's arrival time.
107    /// Valid formats:
108    /// - <field_name>;epoch;<resolution>
109    /// - <field_name>;datestr;<format>
110    ///
111    /// If an error occurs while parsing the config, the error will be returned in the response.
112    /// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored.
113    /// If so, use the current server's timestamp as the event time.
114    pub custom_time_index: Option<String>,
115    /// Whether to skip errors during log ingestion.
116    /// If set to true, the ingestion will continue even if there are errors in the data.
117    /// If set to false, the ingestion will stop at the first error.
118    /// This is different from `ignore_errors`, which is used to ignore errors during the pipeline execution.
119    /// The priority of query params is lower than that headers of x-greptime-pipeline-params.
120    pub skip_error: Option<bool>,
121}
122
123/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
124/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
125#[derive(Debug, PartialEq)]
126pub(crate) struct PipelineIngestRequest {
127    /// The table where the log data will be written to.
128    pub table: String,
129    /// The log data to be ingested.
130    pub values: Vec<VrlValue>,
131}
132
133pub struct PipelineContent(String);
134
135impl<S> FromRequest<S> for PipelineContent
136where
137    S: Send + Sync,
138{
139    type Rejection = Response;
140
141    async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
142        let content_type_header = req.headers().get(CONTENT_TYPE);
143        let content_type = content_type_header.and_then(|value| value.to_str().ok());
144        if let Some(content_type) = content_type {
145            if content_type.ends_with("yaml") {
146                let payload = String::from_request(req, state)
147                    .await
148                    .map_err(IntoResponse::into_response)?;
149                return Ok(Self(payload));
150            }
151
152            if content_type.starts_with("multipart/form-data") {
153                let mut payload: Multipart = Multipart::from_request(req, state)
154                    .await
155                    .map_err(IntoResponse::into_response)?;
156                let file = payload
157                    .next_field()
158                    .await
159                    .map_err(IntoResponse::into_response)?;
160                let payload = file
161                    .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
162                    .text()
163                    .await
164                    .map_err(IntoResponse::into_response)?;
165                return Ok(Self(payload));
166            }
167        }
168
169        Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
170    }
171}
172
173#[axum_macros::debug_handler]
174pub async fn query_pipeline(
175    State(state): State<LogState>,
176    Extension(mut query_ctx): Extension<QueryContext>,
177    Query(query_params): Query<LogIngesterQueryParams>,
178    Path(pipeline_name): Path<String>,
179) -> Result<GreptimedbManageResponse> {
180    let start = Instant::now();
181    let handler = state.log_handler;
182    ensure!(
183        !pipeline_name.is_empty(),
184        InvalidParameterSnafu {
185            reason: "pipeline_name is required in path",
186        }
187    );
188
189    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
190
191    query_ctx.set_channel(Channel::Log);
192    let query_ctx = Arc::new(query_ctx);
193
194    let (pipeline, pipeline_version) = handler
195        .get_pipeline_str(&pipeline_name, version, query_ctx)
196        .await?;
197
198    Ok(GreptimedbManageResponse::from_pipeline(
199        pipeline_name,
200        query_params
201            .version
202            .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
203        start.elapsed().as_millis() as u64,
204        Some(pipeline),
205    ))
206}
207
208/// Generate DDL from pipeline definition.
209#[axum_macros::debug_handler]
210pub async fn query_pipeline_ddl(
211    State(state): State<LogState>,
212    Extension(mut query_ctx): Extension<QueryContext>,
213    Query(query_params): Query<LogIngesterQueryParams>,
214    Path(pipeline_name): Path<String>,
215) -> Result<GreptimedbManageResponse> {
216    let start = Instant::now();
217    let handler = state.log_handler;
218    ensure!(
219        !pipeline_name.is_empty(),
220        InvalidParameterSnafu {
221            reason: "pipeline_name is required in path",
222        }
223    );
224    ensure!(
225        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
226        InvalidParameterSnafu {
227            reason: "built-in pipelines don't have fixed table schema",
228        }
229    );
230    let table_name = query_params.table.context(InvalidParameterSnafu {
231        reason: "table name is required",
232    })?;
233
234    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
235
236    query_ctx.set_channel(Channel::Log);
237    let query_ctx = Arc::new(query_ctx);
238
239    let pipeline = handler
240        .get_pipeline(&pipeline_name, version, query_ctx.clone())
241        .await?;
242
243    let schemas_def = pipeline.schemas().context(InvalidParameterSnafu {
244        reason: "auto transform doesn't have fixed table schema",
245    })?;
246
247    let schema = query_ctx.current_schema();
248    let table_name_ref = TableReference {
249        catalog: query_ctx.current_catalog(),
250        schema: &schema,
251        table: &table_name,
252    };
253
254    let mut create_table_expr =
255        create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None)
256            .map_err(BoxedError::new)
257            .context(OtherSnafu)?;
258
259    // manually set the append_mode to true
260    create_table_expr
261        .table_options
262        .insert(APPEND_MODE_KEY.to_string(), "true".to_string());
263
264    let expr = expr_to_create(&create_table_expr, None)
265        .map_err(BoxedError::new)
266        .context(OtherSnafu)?;
267
268    let message = if handler
269        .get_table(&table_name, &query_ctx)
270        .await
271        .context(CatalogSnafu)?
272        .is_some()
273    {
274        Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
275    } else if pipeline.is_variant_table_name() {
276        Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
277    } else {
278        None
279    };
280
281    let sql = SqlOutput {
282        sql: format!("{:#}", expr),
283        message,
284    };
285
286    Ok(GreptimedbManageResponse::from_sql(
287        sql,
288        start.elapsed().as_millis() as u64,
289    ))
290}
291
292#[axum_macros::debug_handler]
293pub async fn add_pipeline(
294    State(state): State<LogState>,
295    Path(pipeline_name): Path<String>,
296    Extension(mut query_ctx): Extension<QueryContext>,
297    PipelineContent(payload): PipelineContent,
298) -> Result<GreptimedbManageResponse> {
299    let start = Instant::now();
300    let handler = state.log_handler;
301    ensure!(
302        !pipeline_name.is_empty(),
303        InvalidParameterSnafu {
304            reason: "pipeline_name is required in path",
305        }
306    );
307    ensure!(
308        !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
309        InvalidParameterSnafu {
310            reason: "pipeline_name cannot start with greptime_",
311        }
312    );
313    ensure!(
314        !payload.is_empty(),
315        InvalidParameterSnafu {
316            reason: "pipeline is required in body",
317        }
318    );
319
320    query_ctx.set_channel(Channel::Log);
321    let query_ctx = Arc::new(query_ctx);
322
323    let content_type = "yaml";
324    let result = handler
325        .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
326        .await;
327
328    result
329        .map(|pipeline| {
330            GreptimedbManageResponse::from_pipeline(
331                pipeline_name,
332                pipeline.0.to_timezone_aware_string(None),
333                start.elapsed().as_millis() as u64,
334                None,
335            )
336        })
337        .map_err(|e| {
338            error!(e; "failed to insert pipeline");
339            e
340        })
341}
342
343#[axum_macros::debug_handler]
344pub async fn delete_pipeline(
345    State(state): State<LogState>,
346    Extension(mut query_ctx): Extension<QueryContext>,
347    Query(query_params): Query<LogIngesterQueryParams>,
348    Path(pipeline_name): Path<String>,
349) -> Result<GreptimedbManageResponse> {
350    let start = Instant::now();
351    let handler = state.log_handler;
352    ensure!(
353        !pipeline_name.is_empty(),
354        InvalidParameterSnafu {
355            reason: "pipeline_name is required",
356        }
357    );
358
359    let version_str = query_params.version.context(InvalidParameterSnafu {
360        reason: "version is required",
361    })?;
362
363    let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
364
365    query_ctx.set_channel(Channel::Log);
366    let query_ctx = Arc::new(query_ctx);
367
368    handler
369        .delete_pipeline(&pipeline_name, version, query_ctx)
370        .await
371        .map(|v| {
372            if v.is_some() {
373                GreptimedbManageResponse::from_pipeline(
374                    pipeline_name,
375                    version_str,
376                    start.elapsed().as_millis() as u64,
377                    None,
378                )
379            } else {
380                GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
381            }
382        })
383        .map_err(|e| {
384            error!(e; "failed to delete pipeline");
385            e
386        })
387}
388
389/// Transform NDJSON array into a single array
390/// always return an array
391fn transform_ndjson_array_factory(
392    values: impl IntoIterator<Item = Result<VrlValue, serde_json::Error>>,
393    ignore_error: bool,
394) -> Result<Vec<VrlValue>> {
395    values
396        .into_iter()
397        .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
398            Ok(item_value) => {
399                match item_value {
400                    VrlValue::Array(item_array) => {
401                        acc_array.extend(item_array);
402                    }
403                    VrlValue::Object(_) => {
404                        acc_array.push(item_value);
405                    }
406                    _ => {
407                        if !ignore_error {
408                            warn!("invalid item in array: {:?}", item_value);
409                            return InvalidParameterSnafu {
410                                reason: format!("invalid item: {} in array", item_value),
411                            }
412                            .fail();
413                        }
414                    }
415                }
416                Ok(acc_array)
417            }
418            Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
419            Err(_) => {
420                warn!("invalid item in array: {:?}", item);
421                Ok(acc_array)
422            }
423        })
424}
425
426/// Dryrun pipeline with given data
427async fn dryrun_pipeline_inner(
428    value: Vec<VrlValue>,
429    pipeline: Arc<pipeline::Pipeline>,
430    pipeline_handler: PipelineHandlerRef,
431    query_ctx: &QueryContextRef,
432) -> Result<Response> {
433    let params = GreptimePipelineParams::default();
434
435    let pipeline_def = PipelineDefinition::Resolved(pipeline);
436    let pipeline_ctx = PipelineContext::new(&pipeline_def, &params, query_ctx.channel());
437    let results = run_pipeline(
438        &pipeline_handler,
439        &pipeline_ctx,
440        PipelineIngestRequest {
441            table: "dry_run".to_owned(),
442            values: value,
443        },
444        query_ctx,
445        true,
446    )
447    .await?;
448
449    let colume_type_key = "colume_type";
450    let data_type_key = "data_type";
451    let name_key = "name";
452
453    let results = results
454        .all_req()
455        .filter_map(|row| {
456            if let Some(rows) = row.rows {
457                let table_name = row.table_name;
458                let schema = rows.schema;
459
460                let schema = schema
461                    .iter()
462                    .map(|cs| {
463                        let mut map = Map::new();
464                        map.insert(
465                            name_key.to_string(),
466                            JsonValue::String(cs.column_name.clone()),
467                        );
468                        map.insert(
469                            data_type_key.to_string(),
470                            JsonValue::String(cs.datatype().as_str_name().to_string()),
471                        );
472                        map.insert(
473                            colume_type_key.to_string(),
474                            JsonValue::String(cs.semantic_type().as_str_name().to_string()),
475                        );
476                        map.insert(
477                            "fulltext".to_string(),
478                            JsonValue::Bool(
479                                cs.options
480                                    .clone()
481                                    .is_some_and(|x| x.options.contains_key("fulltext")),
482                            ),
483                        );
484                        JsonValue::Object(map)
485                    })
486                    .collect::<Vec<_>>();
487
488                let rows = rows
489                    .rows
490                    .into_iter()
491                    .map(|row| {
492                        row.values
493                            .into_iter()
494                            .enumerate()
495                            .map(|(idx, v)| {
496                                v.value_data
497                                    .map(|d| {
498                                        let mut map = Map::new();
499                                        map.insert("value".to_string(), column_data_to_json(d));
500                                        map.insert(
501                                            "key".to_string(),
502                                            schema[idx][name_key].clone(),
503                                        );
504                                        map.insert(
505                                            "semantic_type".to_string(),
506                                            schema[idx][colume_type_key].clone(),
507                                        );
508                                        map.insert(
509                                            "data_type".to_string(),
510                                            schema[idx][data_type_key].clone(),
511                                        );
512                                        JsonValue::Object(map)
513                                    })
514                                    .unwrap_or(JsonValue::Null)
515                            })
516                            .collect()
517                    })
518                    .collect();
519
520                let mut result = Map::new();
521                result.insert("schema".to_string(), JsonValue::Array(schema));
522                result.insert("rows".to_string(), JsonValue::Array(rows));
523                result.insert("table_name".to_string(), JsonValue::String(table_name));
524                let result = JsonValue::Object(result);
525                Some(result)
526            } else {
527                None
528            }
529        })
530        .collect();
531    Ok(Json(JsonValue::Array(results)).into_response())
532}
533
534/// Dryrun pipeline with given data
535/// pipeline_name and pipeline_version to specify pipeline stored in db
536/// pipeline to specify pipeline raw content
537/// data to specify data
538/// data maght be list of string or list of object
539#[derive(Debug, Default, Serialize, Deserialize)]
540pub struct PipelineDryrunParams {
541    pub pipeline_name: Option<String>,
542    pub pipeline_version: Option<String>,
543    pub pipeline: Option<String>,
544    pub data_type: Option<String>,
545    pub data: String,
546}
547
548/// Check if the payload is valid json
549/// Check if the payload contains pipeline or pipeline_name and data
550/// Return Some if valid, None if invalid
551fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
552    match serde_json::from_slice::<PipelineDryrunParams>(payload) {
553        // payload with pipeline or pipeline_name and data is array
554        Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
555        // because of the pipeline_name or pipeline is required
556        Ok(_) => None,
557        // invalid json
558        Err(_) => None,
559    }
560}
561
562/// Check if the pipeline_name exists
563fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
564    pipeline_name.context(InvalidParameterSnafu {
565        reason: "pipeline_name is required",
566    })
567}
568
569/// Check if the data length less than 10
570fn check_data_valid(data_len: usize) -> Result<()> {
571    ensure!(
572        data_len <= 10,
573        InvalidParameterSnafu {
574            reason: "data is required",
575        }
576    );
577    Ok(())
578}
579
580fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
581    let body = Json(json!({
582        "error": format!("{}: {}", step_msg,e.output_msg()),
583    }));
584
585    (status_code_to_http_status(&e.status_code()), body).into_response()
586}
587
588/// Parse the data with given content type
589/// If the content type is invalid, return error
590/// content type is one of application/json, text/plain, application/x-ndjson
591fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<VrlValue>> {
592    if let Ok(content_type) = ContentType::from_str(&data_type) {
593        extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
594    } else {
595        InvalidParameterSnafu {
596            reason: format!(
597                "invalid content type: {}, expected: one of {}",
598                data_type,
599                EventPayloadResolver::support_content_type_list().join(", ")
600            ),
601        }
602        .fail()
603    }
604}
605
606#[axum_macros::debug_handler]
607pub async fn pipeline_dryrun(
608    State(log_state): State<LogState>,
609    Query(query_params): Query<LogIngesterQueryParams>,
610    Extension(mut query_ctx): Extension<QueryContext>,
611    TypedHeader(content_type): TypedHeader<ContentType>,
612    payload: Bytes,
613) -> Result<Response> {
614    let handler = log_state.log_handler;
615
616    query_ctx.set_channel(Channel::Log);
617    let query_ctx = Arc::new(query_ctx);
618
619    match check_pipeline_dryrun_params_valid(&payload) {
620        Some(params) => {
621            let data = parse_dryrun_data(
622                params.data_type.unwrap_or("application/json".to_string()),
623                params.data,
624            )?;
625
626            check_data_valid(data.len())?;
627
628            match params.pipeline {
629                None => {
630                    let version = to_pipeline_version(params.pipeline_version.as_deref())
631                        .context(PipelineSnafu)?;
632                    let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
633                    let pipeline = handler
634                        .get_pipeline(&pipeline_name, version, query_ctx.clone())
635                        .await?;
636                    dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
637                }
638                Some(pipeline) => {
639                    let pipeline = handler.build_pipeline(&pipeline);
640                    match pipeline {
641                        Ok(pipeline) => {
642                            match dryrun_pipeline_inner(
643                                data,
644                                Arc::new(pipeline),
645                                handler,
646                                &query_ctx,
647                            )
648                            .await
649                            {
650                                Ok(response) => Ok(response),
651                                Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
652                                    "Failed to exec pipeline",
653                                    e,
654                                )),
655                            }
656                        }
657                        Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
658                            "Failed to build pipeline",
659                            e,
660                        )),
661                    }
662                }
663            }
664        }
665        None => {
666            // This path is for back compatibility with the previous dry run code
667            // where the payload is just data (JSON or plain text) and the pipeline name
668            // is specified using query param.
669            let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
670
671            let version =
672                to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
673
674            let ignore_errors = query_params.ignore_errors.unwrap_or(false);
675
676            let value =
677                extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
678
679            check_data_valid(value.len())?;
680
681            let pipeline = handler
682                .get_pipeline(&pipeline_name, version, query_ctx.clone())
683                .await?;
684
685            dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
686        }
687    }
688}
689
690pub(crate) fn extract_pipeline_params_map_from_headers(
691    headers: &HeaderMap,
692) -> ahash::HashMap<String, String> {
693    GreptimePipelineParams::parse_header_str_to_map(
694        headers
695            .get(GREPTIME_PIPELINE_PARAMS_HEADER)
696            .and_then(|v| v.to_str().ok()),
697    )
698}
699
700#[axum_macros::debug_handler]
701pub async fn log_ingester(
702    State(log_state): State<LogState>,
703    Query(query_params): Query<LogIngesterQueryParams>,
704    Extension(mut query_ctx): Extension<QueryContext>,
705    TypedHeader(content_type): TypedHeader<ContentType>,
706    headers: HeaderMap,
707    payload: Bytes,
708) -> Result<HttpResponse> {
709    // validate source and payload
710    let source = query_params.source.as_deref();
711    let response = match &log_state.log_validator {
712        Some(validator) => validator.validate(source, &payload).await,
713        None => None,
714    };
715    if let Some(response) = response {
716        return response;
717    }
718
719    let handler = log_state.log_handler;
720
721    let table_name = query_params.table.context(InvalidParameterSnafu {
722        reason: "table is required",
723    })?;
724
725    let ignore_errors = query_params.ignore_errors.unwrap_or(false);
726
727    let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
728        reason: "pipeline_name is required",
729    })?;
730    let skip_error = query_params.skip_error.unwrap_or(false);
731    let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
732    let pipeline = PipelineDefinition::from_name(
733        &pipeline_name,
734        version,
735        query_params.custom_time_index.map(|s| (s, ignore_errors)),
736    )
737    .context(PipelineSnafu)?;
738
739    let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
740
741    query_ctx.set_channel(Channel::Log);
742    let query_ctx = Arc::new(query_ctx);
743
744    let value = log_state
745        .ingest_interceptor
746        .as_ref()
747        .pre_pipeline(value, query_ctx.clone())?;
748
749    let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
750    if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
751        pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
752    }
753    let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
754
755    ingest_logs_inner(
756        handler,
757        pipeline,
758        vec![PipelineIngestRequest {
759            table: table_name,
760            values: value,
761        }],
762        query_ctx,
763        pipeline_params,
764    )
765    .await
766}
767
768#[derive(Debug, EnumIter)]
769enum EventPayloadResolverInner {
770    Json,
771    Ndjson,
772    Text,
773}
774
775impl Display for EventPayloadResolverInner {
776    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
777        match self {
778            EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
779            EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
780            EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
781        }
782    }
783}
784
785impl TryFrom<&ContentType> for EventPayloadResolverInner {
786    type Error = Error;
787
788    fn try_from(content_type: &ContentType) -> Result<Self> {
789        let mime: mime_guess::Mime = content_type.clone().into();
790        match (mime.type_(), mime.subtype()) {
791            (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
792            (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
793                Ok(EventPayloadResolverInner::Ndjson)
794            }
795            (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
796            _ => InvalidParameterSnafu {
797                reason: format!(
798                    "invalid content type: {}, expected: one of {}",
799                    content_type,
800                    EventPayloadResolver::support_content_type_list().join(", ")
801                ),
802            }
803            .fail(),
804        }
805    }
806}
807
808#[derive(Debug)]
809struct EventPayloadResolver<'a> {
810    inner: EventPayloadResolverInner,
811    /// The content type of the payload.
812    /// keep it for logging original content type
813    #[allow(dead_code)]
814    content_type: &'a ContentType,
815}
816
817impl EventPayloadResolver<'_> {
818    pub(super) fn support_content_type_list() -> Vec<String> {
819        EventPayloadResolverInner::iter()
820            .map(|x| x.to_string())
821            .collect()
822    }
823}
824
825impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
826    type Error = Error;
827
828    fn try_from(content_type: &'a ContentType) -> Result<Self> {
829        let inner = EventPayloadResolverInner::try_from(content_type)?;
830        Ok(EventPayloadResolver {
831            inner,
832            content_type,
833        })
834    }
835}
836
837impl EventPayloadResolver<'_> {
838    fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<VrlValue>> {
839        match self.inner {
840            EventPayloadResolverInner::Json => transform_ndjson_array_factory(
841                Deserializer::from_slice(&payload).into_iter(),
842                ignore_errors,
843            ),
844            EventPayloadResolverInner::Ndjson => {
845                let mut result = Vec::with_capacity(1000);
846                let mut buffer = Buffers::new(1000);
847                for (index, line) in payload.lines().enumerate() {
848                    let mut line = match line {
849                        Ok(line) if !line.is_empty() => line,
850                        Ok(_) => continue, // Skip empty lines
851                        Err(_) if ignore_errors => continue,
852                        Err(e) => {
853                            warn!(e; "invalid string at index: {}", index);
854                            return InvalidParameterSnafu {
855                                reason: format!("invalid line at index: {}", index),
856                            }
857                            .fail();
858                        }
859                    };
860
861                    // simd_json, according to description, only de-escapes string at character level,
862                    // like any other json parser. So it should be safe here.
863                    if let Ok(v) = simd_json::serde::from_slice_with_buffers(
864                        unsafe { line.as_bytes_mut() },
865                        &mut buffer,
866                    ) {
867                        result.push(v);
868                    } else if !ignore_errors {
869                        warn!("invalid JSON at index: {}, content: {:?}", index, line);
870                        return InvalidParameterSnafu {
871                            reason: format!("invalid JSON at index: {}", index),
872                        }
873                        .fail();
874                    }
875                }
876                Ok(result)
877            }
878            EventPayloadResolverInner::Text => {
879                let result = payload
880                    .lines()
881                    .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
882                    .map(|line| {
883                        let mut map = BTreeMap::new();
884                        map.insert(
885                            KeyString::from("message"),
886                            VrlValue::Bytes(Bytes::from(line)),
887                        );
888                        VrlValue::Object(map)
889                    })
890                    .collect::<Vec<_>>();
891                Ok(result)
892            }
893        }
894    }
895}
896
897fn extract_pipeline_value_by_content_type(
898    content_type: ContentType,
899    payload: Bytes,
900    ignore_errors: bool,
901) -> Result<Vec<VrlValue>> {
902    EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
903        resolver
904            .parse_payload(payload, ignore_errors)
905            .map_err(|e| match &e {
906                Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
907                    if reason.contains("invalid item:") {
908                        InvalidParameterSnafu {
909                            reason: "json format error, please check the date is valid JSON.",
910                        }
911                        .build()
912                    } else {
913                        e
914                    }
915                }
916                _ => e,
917            })
918    })
919}
920
921pub(crate) async fn ingest_logs_inner(
922    handler: PipelineHandlerRef,
923    pipeline: PipelineDefinition,
924    log_ingest_requests: Vec<PipelineIngestRequest>,
925    query_ctx: QueryContextRef,
926    pipeline_params: GreptimePipelineParams,
927) -> Result<HttpResponse> {
928    let db = query_ctx.get_db_string();
929    let exec_timer = std::time::Instant::now();
930
931    let mut req = ContextReq::default();
932
933    let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
934    for pipeline_req in log_ingest_requests {
935        let requests =
936            run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
937
938        req.merge(requests);
939    }
940
941    let mut outputs = Vec::new();
942    let mut total_rows: u64 = 0;
943    let mut fail = false;
944    for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
945        let output = handler.insert(act_req, temp_ctx).await;
946
947        if let Ok(Output {
948            data: OutputData::AffectedRows(rows),
949            meta: _,
950        }) = &output
951        {
952            total_rows += *rows as u64;
953        } else {
954            fail = true;
955        }
956        outputs.push(output);
957    }
958
959    if total_rows > 0 {
960        METRIC_HTTP_LOGS_INGESTION_COUNTER
961            .with_label_values(&[db.as_str()])
962            .inc_by(total_rows);
963        METRIC_HTTP_LOGS_INGESTION_ELAPSED
964            .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
965            .observe(exec_timer.elapsed().as_secs_f64());
966    }
967    if fail {
968        METRIC_HTTP_LOGS_INGESTION_ELAPSED
969            .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
970            .observe(exec_timer.elapsed().as_secs_f64());
971    }
972
973    let response = GreptimedbV1Response::from_output(outputs)
974        .await
975        .with_execution_time(exec_timer.elapsed().as_millis() as u64);
976    Ok(response)
977}
978
979#[async_trait]
980pub trait LogValidator: Send + Sync {
981    /// validate payload by source before processing
982    /// Return a `Some` result to indicate validation failure.
983    async fn validate(&self, source: Option<&str>, payload: &Bytes)
984    -> Option<Result<HttpResponse>>;
985}
986
987pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
988
989/// axum state struct to hold log handler and validator
990#[derive(Clone)]
991pub struct LogState {
992    pub log_handler: PipelineHandlerRef,
993    pub log_validator: Option<LogValidatorRef>,
994    pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
995}
996
997#[cfg(test)]
998mod tests {
999
1000    use super::*;
1001
1002    #[test]
1003    fn test_transform_ndjson() {
1004        let s = "{\"a\": 1}\n{\"b\": 2}";
1005        let a = serde_json::to_string(
1006            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1007        )
1008        .unwrap();
1009        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1010
1011        let s = "{\"a\": 1}";
1012        let a = serde_json::to_string(
1013            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1014        )
1015        .unwrap();
1016        assert_eq!(a, "[{\"a\":1}]");
1017
1018        let s = "[{\"a\": 1}]";
1019        let a = serde_json::to_string(
1020            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1021        )
1022        .unwrap();
1023        assert_eq!(a, "[{\"a\":1}]");
1024
1025        let s = "[{\"a\": 1}, {\"b\": 2}]";
1026        let a = serde_json::to_string(
1027            &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1028        )
1029        .unwrap();
1030        assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1031    }
1032
1033    #[test]
1034    fn test_extract_by_content() {
1035        let payload = r#"
1036        {"a": 1}
1037        {"b": 2"}
1038        {"c": 1}
1039"#
1040        .as_bytes();
1041        let payload = Bytes::from_static(payload);
1042
1043        let fail_rest =
1044            extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
1045        assert!(fail_rest.is_ok());
1046        assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1}).into()]);
1047
1048        let fail_only_wrong =
1049            extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
1050        assert!(fail_only_wrong.is_ok());
1051
1052        let mut map1 = BTreeMap::new();
1053        map1.insert(KeyString::from("a"), VrlValue::Integer(1));
1054        let map1 = VrlValue::Object(map1);
1055        let mut map2 = BTreeMap::new();
1056        map2.insert(KeyString::from("c"), VrlValue::Integer(1));
1057        let map2 = VrlValue::Object(map2);
1058        assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
1059    }
1060}