1use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use api::helper::pb_value_to_value_ref;
23use async_trait::async_trait;
24use axum::body::Bytes;
25use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
26use axum::http::header::CONTENT_TYPE;
27use axum::http::{HeaderMap, StatusCode};
28use axum::response::{IntoResponse, Response};
29use axum::{Extension, Json};
30use axum_extra::TypedHeader;
31use common_catalog::consts::default_engine;
32use common_error::ext::{BoxedError, ErrorExt};
33use common_query::{Output, OutputData};
34use common_telemetry::{error, warn};
35use headers::ContentType;
36use lazy_static::lazy_static;
37use mime_guess::mime;
38use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create};
39use pipeline::util::to_pipeline_version;
40use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
41use prometheus::{HistogramVec, IntCounterVec};
42use serde::{Deserialize, Serialize};
43use serde_json::{Deserializer, Map, Value as JsonValue, json};
44use session::context::{Channel, QueryContext, QueryContextRef};
45use simd_json::Buffers;
46use snafu::{OptionExt, ResultExt, ensure};
47use store_api::mito_engine_options::APPEND_MODE_KEY;
48use strum::{EnumIter, IntoEnumIterator};
49use table::table_reference::TableReference;
50use vrl::value::{KeyString, Value as VrlValue};
51
52use crate::error::{
53 Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
54 status_code_to_http_status,
55};
56use crate::http::HttpResponse;
57use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
58use crate::http::header::{
59 CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
60};
61use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput};
62use crate::http::result::greptime_result_v1::GreptimedbV1Response;
63use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
64use crate::metrics::{
65 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
66 METRIC_SUCCESS_VALUE,
67};
68use crate::pipeline::run_pipeline;
69use crate::query_handler::PipelineHandlerRef;
70
71const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
72const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
73
74const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str =
75 "the pipeline has dispatcher or table_suffix, the table name may not be fixed";
76const CREATE_TABLE_SQL_TABLE_EXISTS: &str =
77 "table already exists, the CREATE TABLE SQL may be different";
78
79lazy_static! {
80 pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
81 pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
82 pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
83 pub static ref PB_CONTENT_TYPE: ContentType =
84 ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
85 pub static ref NDJSON_CONTENT_TYPE: ContentType =
86 ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
87}
88
89#[derive(Debug, Default, Serialize, Deserialize)]
91pub struct LogIngesterQueryParams {
92 pub db: Option<String>,
94 pub table: Option<String>,
96 pub pipeline_name: Option<String>,
98 pub version: Option<String>,
100 pub ignore_errors: Option<bool>,
102 pub source: Option<String>,
104 pub msg_field: Option<String>,
107 pub custom_time_index: Option<String>,
116 pub skip_error: Option<bool>,
122}
123
124#[derive(Debug, PartialEq)]
127pub(crate) struct PipelineIngestRequest {
128 pub table: String,
130 pub values: Vec<VrlValue>,
132}
133
134pub struct PipelineContent(String);
135
136impl<S> FromRequest<S> for PipelineContent
137where
138 S: Send + Sync,
139{
140 type Rejection = Response;
141
142 async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
143 let content_type_header = req.headers().get(CONTENT_TYPE);
144 let content_type = content_type_header.and_then(|value| value.to_str().ok());
145 if let Some(content_type) = content_type {
146 if content_type.ends_with("yaml") {
147 let payload = String::from_request(req, state)
148 .await
149 .map_err(IntoResponse::into_response)?;
150 return Ok(Self(payload));
151 }
152
153 if content_type.starts_with("multipart/form-data") {
154 let mut payload: Multipart = Multipart::from_request(req, state)
155 .await
156 .map_err(IntoResponse::into_response)?;
157 let file = payload
158 .next_field()
159 .await
160 .map_err(IntoResponse::into_response)?;
161 let payload = file
162 .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
163 .text()
164 .await
165 .map_err(IntoResponse::into_response)?;
166 return Ok(Self(payload));
167 }
168 }
169
170 Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
171 }
172}
173
174#[axum_macros::debug_handler]
175pub async fn query_pipeline(
176 State(state): State<LogState>,
177 Extension(mut query_ctx): Extension<QueryContext>,
178 Query(query_params): Query<LogIngesterQueryParams>,
179 Path(pipeline_name): Path<String>,
180) -> Result<GreptimedbManageResponse> {
181 let start = Instant::now();
182 let handler = state.log_handler;
183 ensure!(
184 !pipeline_name.is_empty(),
185 InvalidParameterSnafu {
186 reason: "pipeline_name is required in path",
187 }
188 );
189
190 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
191
192 query_ctx.set_channel(Channel::Log);
193 let query_ctx = Arc::new(query_ctx);
194
195 let (pipeline, pipeline_version) = handler
196 .get_pipeline_str(&pipeline_name, version, query_ctx)
197 .await?;
198
199 Ok(GreptimedbManageResponse::from_pipeline(
200 pipeline_name,
201 query_params
202 .version
203 .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
204 start.elapsed().as_millis() as u64,
205 Some(pipeline),
206 ))
207}
208
209#[axum_macros::debug_handler]
211pub async fn query_pipeline_ddl(
212 State(state): State<LogState>,
213 Extension(mut query_ctx): Extension<QueryContext>,
214 Query(query_params): Query<LogIngesterQueryParams>,
215 Path(pipeline_name): Path<String>,
216) -> Result<GreptimedbManageResponse> {
217 let start = Instant::now();
218 let handler = state.log_handler;
219 ensure!(
220 !pipeline_name.is_empty(),
221 InvalidParameterSnafu {
222 reason: "pipeline_name is required in path",
223 }
224 );
225 ensure!(
226 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
227 InvalidParameterSnafu {
228 reason: "built-in pipelines don't have fixed table schema",
229 }
230 );
231 let table_name = query_params.table.context(InvalidParameterSnafu {
232 reason: "table name is required",
233 })?;
234
235 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
236
237 query_ctx.set_channel(Channel::Log);
238 let query_ctx = Arc::new(query_ctx);
239
240 let pipeline = handler
241 .get_pipeline(&pipeline_name, version, query_ctx.clone())
242 .await?;
243
244 let schemas_def = pipeline.schemas().context(InvalidParameterSnafu {
245 reason: "auto transform doesn't have fixed table schema",
246 })?;
247
248 let schema = query_ctx.current_schema();
249 let table_name_ref = TableReference {
250 catalog: query_ctx.current_catalog(),
251 schema: &schema,
252 table: &table_name,
253 };
254
255 let mut create_table_expr =
256 create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None)
257 .map_err(BoxedError::new)
258 .context(OtherSnafu)?;
259
260 create_table_expr
262 .table_options
263 .insert(APPEND_MODE_KEY.to_string(), "true".to_string());
264
265 let expr = expr_to_create(&create_table_expr, None)
266 .map_err(BoxedError::new)
267 .context(OtherSnafu)?;
268
269 let message = if handler.get_table(&table_name, &query_ctx).await?.is_some() {
270 Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
271 } else if pipeline.is_variant_table_name() {
272 Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
273 } else {
274 None
275 };
276
277 let sql = SqlOutput {
278 sql: format!("{:#}", expr),
279 message,
280 };
281
282 Ok(GreptimedbManageResponse::from_sql(
283 sql,
284 start.elapsed().as_millis() as u64,
285 ))
286}
287
288#[axum_macros::debug_handler]
289pub async fn add_pipeline(
290 State(state): State<LogState>,
291 Path(pipeline_name): Path<String>,
292 Extension(mut query_ctx): Extension<QueryContext>,
293 PipelineContent(payload): PipelineContent,
294) -> Result<GreptimedbManageResponse> {
295 let start = Instant::now();
296 let handler = state.log_handler;
297 ensure!(
298 !pipeline_name.is_empty(),
299 InvalidParameterSnafu {
300 reason: "pipeline_name is required in path",
301 }
302 );
303 ensure!(
304 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
305 InvalidParameterSnafu {
306 reason: "pipeline_name cannot start with greptime_",
307 }
308 );
309 ensure!(
310 !payload.is_empty(),
311 InvalidParameterSnafu {
312 reason: "pipeline is required in body",
313 }
314 );
315
316 query_ctx.set_channel(Channel::Log);
317 let query_ctx = Arc::new(query_ctx);
318
319 let content_type = "yaml";
320 let result = handler
321 .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
322 .await;
323
324 result
325 .map(|pipeline| {
326 GreptimedbManageResponse::from_pipeline(
327 pipeline_name,
328 pipeline.0.to_timezone_aware_string(None),
329 start.elapsed().as_millis() as u64,
330 None,
331 )
332 })
333 .map_err(|e| {
334 error!(e; "failed to insert pipeline");
335 e
336 })
337}
338
339#[axum_macros::debug_handler]
340pub async fn delete_pipeline(
341 State(state): State<LogState>,
342 Extension(mut query_ctx): Extension<QueryContext>,
343 Query(query_params): Query<LogIngesterQueryParams>,
344 Path(pipeline_name): Path<String>,
345) -> Result<GreptimedbManageResponse> {
346 let start = Instant::now();
347 let handler = state.log_handler;
348 ensure!(
349 !pipeline_name.is_empty(),
350 InvalidParameterSnafu {
351 reason: "pipeline_name is required",
352 }
353 );
354
355 let version_str = query_params.version.context(InvalidParameterSnafu {
356 reason: "version is required",
357 })?;
358
359 let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
360
361 query_ctx.set_channel(Channel::Log);
362 let query_ctx = Arc::new(query_ctx);
363
364 handler
365 .delete_pipeline(&pipeline_name, version, query_ctx)
366 .await
367 .map(|v| {
368 if v.is_some() {
369 GreptimedbManageResponse::from_pipeline(
370 pipeline_name,
371 version_str,
372 start.elapsed().as_millis() as u64,
373 None,
374 )
375 } else {
376 GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
377 }
378 })
379 .map_err(|e| {
380 error!(e; "failed to delete pipeline");
381 e
382 })
383}
384
385fn transform_ndjson_array_factory(
388 values: impl IntoIterator<Item = Result<VrlValue, serde_json::Error>>,
389 ignore_error: bool,
390) -> Result<Vec<VrlValue>> {
391 values
392 .into_iter()
393 .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
394 Ok(item_value) => {
395 match item_value {
396 VrlValue::Array(item_array) => {
397 acc_array.extend(item_array);
398 }
399 VrlValue::Object(_) => {
400 acc_array.push(item_value);
401 }
402 _ => {
403 if !ignore_error {
404 warn!("invalid item in array: {:?}", item_value);
405 return InvalidParameterSnafu {
406 reason: format!("invalid item: {} in array", item_value),
407 }
408 .fail();
409 }
410 }
411 }
412 Ok(acc_array)
413 }
414 Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
415 Err(_) => {
416 warn!("invalid item in array: {:?}", item);
417 Ok(acc_array)
418 }
419 })
420}
421
422async fn dryrun_pipeline_inner(
424 value: Vec<VrlValue>,
425 pipeline: Arc<pipeline::Pipeline>,
426 pipeline_handler: PipelineHandlerRef,
427 query_ctx: &QueryContextRef,
428) -> Result<Response> {
429 let params = GreptimePipelineParams::default();
430
431 let pipeline_def = PipelineDefinition::Resolved(pipeline);
432 let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms, query_ctx.channel());
433 let results = run_pipeline(
434 &pipeline_handler,
435 &pipeline_ctx,
436 PipelineIngestRequest {
437 table: "dry_run".to_owned(),
438 values: value,
439 },
440 query_ctx,
441 true,
442 )
443 .await?;
444
445 let column_type_key = "column_type";
446 let data_type_key = "data_type";
447 let name_key = "name";
448
449 let results = results
450 .all_req()
451 .filter_map(|row| {
452 if let Some(rows) = row.rows {
453 let table_name = row.table_name;
454 let result_schema = rows.schema;
455
456 let schema = result_schema
457 .iter()
458 .map(|cs| {
459 let mut map = Map::new();
460 map.insert(
461 name_key.to_string(),
462 JsonValue::String(cs.column_name.clone()),
463 );
464 map.insert(
465 data_type_key.to_string(),
466 JsonValue::String(cs.datatype().as_str_name().to_string()),
467 );
468 map.insert(
469 column_type_key.to_string(),
470 JsonValue::String(cs.semantic_type().as_str_name().to_string()),
471 );
472 map.insert(
473 "fulltext".to_string(),
474 JsonValue::Bool(
475 cs.options
476 .clone()
477 .is_some_and(|x| x.options.contains_key("fulltext")),
478 ),
479 );
480 JsonValue::Object(map)
481 })
482 .collect::<Vec<_>>();
483
484 let rows = rows
485 .rows
486 .into_iter()
487 .map(|row| {
488 row.values
489 .into_iter()
490 .enumerate()
491 .map(|(idx, v)| {
492 let mut map = Map::new();
493 let value_ref = pb_value_to_value_ref(
494 &v,
495 result_schema[idx].datatype_extension.as_ref(),
496 );
497 let greptime_value: datatypes::value::Value = value_ref.into();
498 let serde_json_value =
499 serde_json::Value::try_from(greptime_value).unwrap();
500 map.insert("value".to_string(), serde_json_value);
501 map.insert("key".to_string(), schema[idx][name_key].clone());
502 map.insert(
503 "semantic_type".to_string(),
504 schema[idx][column_type_key].clone(),
505 );
506 map.insert(
507 "data_type".to_string(),
508 schema[idx][data_type_key].clone(),
509 );
510 JsonValue::Object(map)
511 })
512 .collect()
513 })
514 .collect();
515
516 let mut result = Map::new();
517 result.insert("schema".to_string(), JsonValue::Array(schema));
518 result.insert("rows".to_string(), JsonValue::Array(rows));
519 result.insert("table_name".to_string(), JsonValue::String(table_name));
520 let result = JsonValue::Object(result);
521 Some(result)
522 } else {
523 None
524 }
525 })
526 .collect();
527 Ok(Json(JsonValue::Array(results)).into_response())
528}
529
530#[derive(Debug, Default, Serialize, Deserialize)]
536pub struct PipelineDryrunParams {
537 pub pipeline_name: Option<String>,
538 pub pipeline_version: Option<String>,
539 pub pipeline: Option<String>,
540 pub data_type: Option<String>,
541 pub data: String,
542}
543
544fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
548 match serde_json::from_slice::<PipelineDryrunParams>(payload) {
549 Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
551 Ok(_) => None,
553 Err(_) => None,
555 }
556}
557
558fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
560 pipeline_name.context(InvalidParameterSnafu {
561 reason: "pipeline_name is required",
562 })
563}
564
565fn check_data_valid(data_len: usize) -> Result<()> {
567 ensure!(
568 data_len <= 10,
569 InvalidParameterSnafu {
570 reason: "data is required",
571 }
572 );
573 Ok(())
574}
575
576fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
577 let body = Json(json!({
578 "error": format!("{}: {}", step_msg,e.output_msg()),
579 }));
580
581 (status_code_to_http_status(&e.status_code()), body).into_response()
582}
583
584fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<VrlValue>> {
588 if let Ok(content_type) = ContentType::from_str(&data_type) {
589 extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
590 } else {
591 InvalidParameterSnafu {
592 reason: format!(
593 "invalid content type: {}, expected: one of {}",
594 data_type,
595 EventPayloadResolver::support_content_type_list().join(", ")
596 ),
597 }
598 .fail()
599 }
600}
601
602#[axum_macros::debug_handler]
603pub async fn pipeline_dryrun(
604 State(log_state): State<LogState>,
605 Query(query_params): Query<LogIngesterQueryParams>,
606 Extension(mut query_ctx): Extension<QueryContext>,
607 TypedHeader(content_type): TypedHeader<ContentType>,
608 payload: Bytes,
609) -> Result<Response> {
610 let handler = log_state.log_handler;
611
612 query_ctx.set_channel(Channel::Log);
613 let query_ctx = Arc::new(query_ctx);
614
615 match check_pipeline_dryrun_params_valid(&payload) {
616 Some(params) => {
617 let data = parse_dryrun_data(
618 params.data_type.unwrap_or("application/json".to_string()),
619 params.data,
620 )?;
621
622 check_data_valid(data.len())?;
623
624 match params.pipeline {
625 None => {
626 let version = to_pipeline_version(params.pipeline_version.as_deref())
627 .context(PipelineSnafu)?;
628 let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
629 let pipeline = handler
630 .get_pipeline(&pipeline_name, version, query_ctx.clone())
631 .await?;
632 dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
633 }
634 Some(pipeline) => {
635 let pipeline = handler.build_pipeline(&pipeline);
636 match pipeline {
637 Ok(pipeline) => {
638 match dryrun_pipeline_inner(
639 data,
640 Arc::new(pipeline),
641 handler,
642 &query_ctx,
643 )
644 .await
645 {
646 Ok(response) => Ok(response),
647 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
648 "Failed to exec pipeline",
649 e,
650 )),
651 }
652 }
653 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
654 "Failed to build pipeline",
655 e,
656 )),
657 }
658 }
659 }
660 }
661 None => {
662 let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
666
667 let version =
668 to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
669
670 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
671
672 let value =
673 extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
674
675 check_data_valid(value.len())?;
676
677 let pipeline = handler
678 .get_pipeline(&pipeline_name, version, query_ctx.clone())
679 .await?;
680
681 dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
682 }
683 }
684}
685
686pub(crate) fn extract_pipeline_params_map_from_headers(
687 headers: &HeaderMap,
688) -> ahash::HashMap<String, String> {
689 GreptimePipelineParams::parse_header_str_to_map(
690 headers
691 .get(GREPTIME_PIPELINE_PARAMS_HEADER)
692 .and_then(|v| v.to_str().ok()),
693 )
694}
695
696#[axum_macros::debug_handler]
697pub async fn log_ingester(
698 State(log_state): State<LogState>,
699 Query(query_params): Query<LogIngesterQueryParams>,
700 Extension(mut query_ctx): Extension<QueryContext>,
701 TypedHeader(content_type): TypedHeader<ContentType>,
702 headers: HeaderMap,
703 payload: Bytes,
704) -> Result<HttpResponse> {
705 let source = query_params.source.as_deref();
707 let response = match &log_state.log_validator {
708 Some(validator) => validator.validate(source, &payload).await,
709 None => None,
710 };
711 if let Some(response) = response {
712 return response;
713 }
714
715 let handler = log_state.log_handler;
716
717 let table_name = query_params.table.context(InvalidParameterSnafu {
718 reason: "table is required",
719 })?;
720
721 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
722
723 let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
724 reason: "pipeline_name is required",
725 })?;
726 let skip_error = query_params.skip_error.unwrap_or(false);
727 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
728 let pipeline = PipelineDefinition::from_name(
729 &pipeline_name,
730 version,
731 query_params.custom_time_index.map(|s| (s, ignore_errors)),
732 )
733 .context(PipelineSnafu)?;
734
735 let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
736
737 query_ctx.set_channel(Channel::Log);
738 let query_ctx = Arc::new(query_ctx);
739
740 let value = log_state
741 .ingest_interceptor
742 .as_ref()
743 .pre_pipeline(value, query_ctx.clone())?;
744
745 let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
746 if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
747 pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
748 }
749 let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
750
751 ingest_logs_inner(
752 handler,
753 pipeline,
754 vec![PipelineIngestRequest {
755 table: table_name,
756 values: value,
757 }],
758 query_ctx,
759 pipeline_params,
760 )
761 .await
762}
763
764#[derive(Debug, EnumIter)]
765enum EventPayloadResolverInner {
766 Json,
767 Ndjson,
768 Text,
769}
770
771impl Display for EventPayloadResolverInner {
772 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
773 match self {
774 EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
775 EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
776 EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
777 }
778 }
779}
780
781impl TryFrom<&ContentType> for EventPayloadResolverInner {
782 type Error = Error;
783
784 fn try_from(content_type: &ContentType) -> Result<Self> {
785 let mime: mime_guess::Mime = content_type.clone().into();
786 match (mime.type_(), mime.subtype()) {
787 (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
788 (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
789 Ok(EventPayloadResolverInner::Ndjson)
790 }
791 (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
792 _ => InvalidParameterSnafu {
793 reason: format!(
794 "invalid content type: {}, expected: one of {}",
795 content_type,
796 EventPayloadResolver::support_content_type_list().join(", ")
797 ),
798 }
799 .fail(),
800 }
801 }
802}
803
804#[derive(Debug)]
805struct EventPayloadResolver<'a> {
806 inner: EventPayloadResolverInner,
807 #[allow(dead_code)]
810 content_type: &'a ContentType,
811}
812
813impl EventPayloadResolver<'_> {
814 pub(super) fn support_content_type_list() -> Vec<String> {
815 EventPayloadResolverInner::iter()
816 .map(|x| x.to_string())
817 .collect()
818 }
819}
820
821impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
822 type Error = Error;
823
824 fn try_from(content_type: &'a ContentType) -> Result<Self> {
825 let inner = EventPayloadResolverInner::try_from(content_type)?;
826 Ok(EventPayloadResolver {
827 inner,
828 content_type,
829 })
830 }
831}
832
833impl EventPayloadResolver<'_> {
834 fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<VrlValue>> {
835 match self.inner {
836 EventPayloadResolverInner::Json => transform_ndjson_array_factory(
837 Deserializer::from_slice(&payload).into_iter(),
838 ignore_errors,
839 ),
840 EventPayloadResolverInner::Ndjson => {
841 let mut result = Vec::with_capacity(1000);
842 let mut buffer = Buffers::new(1000);
843 for (index, line) in payload.lines().enumerate() {
844 let mut line = match line {
845 Ok(line) if !line.is_empty() => line,
846 Ok(_) => continue, Err(_) if ignore_errors => continue,
848 Err(e) => {
849 warn!(e; "invalid string at index: {}", index);
850 return InvalidParameterSnafu {
851 reason: format!("invalid line at index: {}", index),
852 }
853 .fail();
854 }
855 };
856
857 if let Ok(v) = simd_json::serde::from_slice_with_buffers(
860 unsafe { line.as_bytes_mut() },
861 &mut buffer,
862 ) {
863 result.push(v);
864 } else if !ignore_errors {
865 warn!("invalid JSON at index: {}, content: {:?}", index, line);
866 return InvalidParameterSnafu {
867 reason: format!("invalid JSON at index: {}", index),
868 }
869 .fail();
870 }
871 }
872 Ok(result)
873 }
874 EventPayloadResolverInner::Text => {
875 let result = payload
876 .lines()
877 .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
878 .map(|line| {
879 let mut map = BTreeMap::new();
880 map.insert(
881 KeyString::from("message"),
882 VrlValue::Bytes(Bytes::from(line)),
883 );
884 VrlValue::Object(map)
885 })
886 .collect::<Vec<_>>();
887 Ok(result)
888 }
889 }
890 }
891}
892
893fn extract_pipeline_value_by_content_type(
894 content_type: ContentType,
895 payload: Bytes,
896 ignore_errors: bool,
897) -> Result<Vec<VrlValue>> {
898 EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
899 resolver
900 .parse_payload(payload, ignore_errors)
901 .map_err(|e| match &e {
902 Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
903 if reason.contains("invalid item:") {
904 InvalidParameterSnafu {
905 reason: "json format error, please check the date is valid JSON.",
906 }
907 .build()
908 } else {
909 e
910 }
911 }
912 _ => e,
913 })
914 })
915}
916
917pub(crate) async fn ingest_logs_inner(
918 handler: PipelineHandlerRef,
919 pipeline: PipelineDefinition,
920 log_ingest_requests: Vec<PipelineIngestRequest>,
921 query_ctx: QueryContextRef,
922 pipeline_params: GreptimePipelineParams,
923) -> Result<HttpResponse> {
924 let exec_timer = Instant::now();
927 let mut req = ContextReq::default();
928
929 let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
930 for pipeline_req in log_ingest_requests {
931 let requests =
932 run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
933
934 req.merge(requests);
935 }
936
937 execute_log_context_req(
938 handler,
939 req,
940 query_ctx,
941 exec_timer,
942 &METRIC_HTTP_LOGS_INGESTION_COUNTER,
943 &METRIC_HTTP_LOGS_INGESTION_ELAPSED,
944 )
945 .await
946}
947
948pub(crate) async fn execute_log_context_req(
949 handler: PipelineHandlerRef,
950 ctx_req: ContextReq,
951 query_ctx: QueryContextRef,
952 exec_timer: Instant,
953 counter: &IntCounterVec,
954 elapsed: &HistogramVec,
955) -> Result<HttpResponse> {
956 let db = query_ctx.get_db_string();
957
958 let mut outputs = Vec::with_capacity(ctx_req.map_len());
959 let mut total_rows: u64 = 0;
960 let mut fail = false;
961 for (temp_ctx, act_req) in ctx_req.as_req_iter(query_ctx) {
962 let output = handler.insert(act_req, temp_ctx).await;
963
964 if let Ok(Output {
965 data: OutputData::AffectedRows(rows),
966 meta: _,
967 }) = &output
968 {
969 total_rows += *rows as u64;
970 } else {
971 fail = true;
972 }
973 outputs.push(output);
974 }
975
976 if total_rows > 0 {
978 counter.with_label_values(&[db.as_str()]).inc_by(total_rows);
979 elapsed
980 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
981 .observe(exec_timer.elapsed().as_secs_f64());
982 }
983 if fail {
984 elapsed
985 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
986 .observe(exec_timer.elapsed().as_secs_f64());
987 }
988
989 let response = GreptimedbV1Response::from_output(outputs)
990 .await
991 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
992 Ok(response)
993}
994
995#[async_trait]
996pub trait LogValidator: Send + Sync {
997 async fn validate(&self, source: Option<&str>, payload: &Bytes)
1000 -> Option<Result<HttpResponse>>;
1001}
1002
1003pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
1004
1005#[derive(Clone)]
1007pub struct LogState {
1008 pub log_handler: PipelineHandlerRef,
1009 pub log_validator: Option<LogValidatorRef>,
1010 pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015
1016 use super::*;
1017
1018 #[test]
1019 fn test_transform_ndjson() {
1020 let s = "{\"a\": 1}\n{\"b\": 2}";
1021 let a = serde_json::to_string(
1022 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1023 )
1024 .unwrap();
1025 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1026
1027 let s = "{\"a\": 1}";
1028 let a = serde_json::to_string(
1029 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1030 )
1031 .unwrap();
1032 assert_eq!(a, "[{\"a\":1}]");
1033
1034 let s = "[{\"a\": 1}]";
1035 let a = serde_json::to_string(
1036 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1037 )
1038 .unwrap();
1039 assert_eq!(a, "[{\"a\":1}]");
1040
1041 let s = "[{\"a\": 1}, {\"b\": 2}]";
1042 let a = serde_json::to_string(
1043 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1044 )
1045 .unwrap();
1046 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1047 }
1048
1049 #[test]
1050 fn test_extract_by_content() {
1051 let payload = r#"
1052 {"a": 1}
1053 {"b": 2"}
1054 {"c": 1}
1055"#
1056 .as_bytes();
1057 let payload = Bytes::from_static(payload);
1058
1059 let fail_rest =
1060 extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
1061 assert!(fail_rest.is_ok());
1062 assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1}).into()]);
1063
1064 let fail_only_wrong =
1065 extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
1066 assert!(fail_only_wrong.is_ok());
1067
1068 let mut map1 = BTreeMap::new();
1069 map1.insert(KeyString::from("a"), VrlValue::Integer(1));
1070 let map1 = VrlValue::Object(map1);
1071 let mut map2 = BTreeMap::new();
1072 map2.insert(KeyString::from("c"), VrlValue::Integer(1));
1073 let map2 = VrlValue::Object(map2);
1074 assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
1075 }
1076}