1use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use async_trait::async_trait;
23use axum::body::Bytes;
24use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
25use axum::http::header::CONTENT_TYPE;
26use axum::http::{HeaderMap, StatusCode};
27use axum::response::{IntoResponse, Response};
28use axum::{Extension, Json};
29use axum_extra::TypedHeader;
30use common_catalog::consts::default_engine;
31use common_error::ext::{BoxedError, ErrorExt};
32use common_query::{Output, OutputData};
33use common_telemetry::{error, warn};
34use datatypes::value::column_data_to_json;
35use headers::ContentType;
36use lazy_static::lazy_static;
37use mime_guess::mime;
38use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create};
39use pipeline::util::to_pipeline_version;
40use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
41use serde::{Deserialize, Serialize};
42use serde_json::{Deserializer, Map, Value as JsonValue, json};
43use session::context::{Channel, QueryContext, QueryContextRef};
44use simd_json::Buffers;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::mito_engine_options::APPEND_MODE_KEY;
47use strum::{EnumIter, IntoEnumIterator};
48use table::table_reference::TableReference;
49use vrl::value::{KeyString, Value as VrlValue};
50
51use crate::error::{
52 CatalogSnafu, Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
53 status_code_to_http_status,
54};
55use crate::http::HttpResponse;
56use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
57use crate::http::header::{
58 CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
59};
60use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput};
61use crate::http::result::greptime_result_v1::GreptimedbV1Response;
62use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
63use crate::metrics::{
64 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
65 METRIC_SUCCESS_VALUE,
66};
67use crate::pipeline::run_pipeline;
68use crate::query_handler::PipelineHandlerRef;
69
70const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
71const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
72
73const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str =
74 "the pipeline has dispatcher or table_suffix, the table name may not be fixed";
75const CREATE_TABLE_SQL_TABLE_EXISTS: &str =
76 "table already exists, the CREATE TABLE SQL may be different";
77
78lazy_static! {
79 pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
80 pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
81 pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
82 pub static ref PB_CONTENT_TYPE: ContentType =
83 ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
84 pub static ref NDJSON_CONTENT_TYPE: ContentType =
85 ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
86}
87
88#[derive(Debug, Default, Serialize, Deserialize)]
90pub struct LogIngesterQueryParams {
91 pub db: Option<String>,
93 pub table: Option<String>,
95 pub pipeline_name: Option<String>,
97 pub version: Option<String>,
99 pub ignore_errors: Option<bool>,
101 pub source: Option<String>,
103 pub msg_field: Option<String>,
106 pub custom_time_index: Option<String>,
115 pub skip_error: Option<bool>,
121}
122
123#[derive(Debug, PartialEq)]
126pub(crate) struct PipelineIngestRequest {
127 pub table: String,
129 pub values: Vec<VrlValue>,
131}
132
133pub struct PipelineContent(String);
134
135impl<S> FromRequest<S> for PipelineContent
136where
137 S: Send + Sync,
138{
139 type Rejection = Response;
140
141 async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
142 let content_type_header = req.headers().get(CONTENT_TYPE);
143 let content_type = content_type_header.and_then(|value| value.to_str().ok());
144 if let Some(content_type) = content_type {
145 if content_type.ends_with("yaml") {
146 let payload = String::from_request(req, state)
147 .await
148 .map_err(IntoResponse::into_response)?;
149 return Ok(Self(payload));
150 }
151
152 if content_type.starts_with("multipart/form-data") {
153 let mut payload: Multipart = Multipart::from_request(req, state)
154 .await
155 .map_err(IntoResponse::into_response)?;
156 let file = payload
157 .next_field()
158 .await
159 .map_err(IntoResponse::into_response)?;
160 let payload = file
161 .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
162 .text()
163 .await
164 .map_err(IntoResponse::into_response)?;
165 return Ok(Self(payload));
166 }
167 }
168
169 Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
170 }
171}
172
173#[axum_macros::debug_handler]
174pub async fn query_pipeline(
175 State(state): State<LogState>,
176 Extension(mut query_ctx): Extension<QueryContext>,
177 Query(query_params): Query<LogIngesterQueryParams>,
178 Path(pipeline_name): Path<String>,
179) -> Result<GreptimedbManageResponse> {
180 let start = Instant::now();
181 let handler = state.log_handler;
182 ensure!(
183 !pipeline_name.is_empty(),
184 InvalidParameterSnafu {
185 reason: "pipeline_name is required in path",
186 }
187 );
188
189 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
190
191 query_ctx.set_channel(Channel::Log);
192 let query_ctx = Arc::new(query_ctx);
193
194 let (pipeline, pipeline_version) = handler
195 .get_pipeline_str(&pipeline_name, version, query_ctx)
196 .await?;
197
198 Ok(GreptimedbManageResponse::from_pipeline(
199 pipeline_name,
200 query_params
201 .version
202 .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
203 start.elapsed().as_millis() as u64,
204 Some(pipeline),
205 ))
206}
207
208#[axum_macros::debug_handler]
210pub async fn query_pipeline_ddl(
211 State(state): State<LogState>,
212 Extension(mut query_ctx): Extension<QueryContext>,
213 Query(query_params): Query<LogIngesterQueryParams>,
214 Path(pipeline_name): Path<String>,
215) -> Result<GreptimedbManageResponse> {
216 let start = Instant::now();
217 let handler = state.log_handler;
218 ensure!(
219 !pipeline_name.is_empty(),
220 InvalidParameterSnafu {
221 reason: "pipeline_name is required in path",
222 }
223 );
224 ensure!(
225 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
226 InvalidParameterSnafu {
227 reason: "built-in pipelines don't have fixed table schema",
228 }
229 );
230 let table_name = query_params.table.context(InvalidParameterSnafu {
231 reason: "table name is required",
232 })?;
233
234 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
235
236 query_ctx.set_channel(Channel::Log);
237 let query_ctx = Arc::new(query_ctx);
238
239 let pipeline = handler
240 .get_pipeline(&pipeline_name, version, query_ctx.clone())
241 .await?;
242
243 let schemas_def = pipeline.schemas().context(InvalidParameterSnafu {
244 reason: "auto transform doesn't have fixed table schema",
245 })?;
246
247 let schema = query_ctx.current_schema();
248 let table_name_ref = TableReference {
249 catalog: query_ctx.current_catalog(),
250 schema: &schema,
251 table: &table_name,
252 };
253
254 let mut create_table_expr =
255 create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None)
256 .map_err(BoxedError::new)
257 .context(OtherSnafu)?;
258
259 create_table_expr
261 .table_options
262 .insert(APPEND_MODE_KEY.to_string(), "true".to_string());
263
264 let expr = expr_to_create(&create_table_expr, None)
265 .map_err(BoxedError::new)
266 .context(OtherSnafu)?;
267
268 let message = if handler
269 .get_table(&table_name, &query_ctx)
270 .await
271 .context(CatalogSnafu)?
272 .is_some()
273 {
274 Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
275 } else if pipeline.is_variant_table_name() {
276 Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
277 } else {
278 None
279 };
280
281 let sql = SqlOutput {
282 sql: format!("{:#}", expr),
283 message,
284 };
285
286 Ok(GreptimedbManageResponse::from_sql(
287 sql,
288 start.elapsed().as_millis() as u64,
289 ))
290}
291
292#[axum_macros::debug_handler]
293pub async fn add_pipeline(
294 State(state): State<LogState>,
295 Path(pipeline_name): Path<String>,
296 Extension(mut query_ctx): Extension<QueryContext>,
297 PipelineContent(payload): PipelineContent,
298) -> Result<GreptimedbManageResponse> {
299 let start = Instant::now();
300 let handler = state.log_handler;
301 ensure!(
302 !pipeline_name.is_empty(),
303 InvalidParameterSnafu {
304 reason: "pipeline_name is required in path",
305 }
306 );
307 ensure!(
308 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
309 InvalidParameterSnafu {
310 reason: "pipeline_name cannot start with greptime_",
311 }
312 );
313 ensure!(
314 !payload.is_empty(),
315 InvalidParameterSnafu {
316 reason: "pipeline is required in body",
317 }
318 );
319
320 query_ctx.set_channel(Channel::Log);
321 let query_ctx = Arc::new(query_ctx);
322
323 let content_type = "yaml";
324 let result = handler
325 .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
326 .await;
327
328 result
329 .map(|pipeline| {
330 GreptimedbManageResponse::from_pipeline(
331 pipeline_name,
332 pipeline.0.to_timezone_aware_string(None),
333 start.elapsed().as_millis() as u64,
334 None,
335 )
336 })
337 .map_err(|e| {
338 error!(e; "failed to insert pipeline");
339 e
340 })
341}
342
343#[axum_macros::debug_handler]
344pub async fn delete_pipeline(
345 State(state): State<LogState>,
346 Extension(mut query_ctx): Extension<QueryContext>,
347 Query(query_params): Query<LogIngesterQueryParams>,
348 Path(pipeline_name): Path<String>,
349) -> Result<GreptimedbManageResponse> {
350 let start = Instant::now();
351 let handler = state.log_handler;
352 ensure!(
353 !pipeline_name.is_empty(),
354 InvalidParameterSnafu {
355 reason: "pipeline_name is required",
356 }
357 );
358
359 let version_str = query_params.version.context(InvalidParameterSnafu {
360 reason: "version is required",
361 })?;
362
363 let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
364
365 query_ctx.set_channel(Channel::Log);
366 let query_ctx = Arc::new(query_ctx);
367
368 handler
369 .delete_pipeline(&pipeline_name, version, query_ctx)
370 .await
371 .map(|v| {
372 if v.is_some() {
373 GreptimedbManageResponse::from_pipeline(
374 pipeline_name,
375 version_str,
376 start.elapsed().as_millis() as u64,
377 None,
378 )
379 } else {
380 GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
381 }
382 })
383 .map_err(|e| {
384 error!(e; "failed to delete pipeline");
385 e
386 })
387}
388
389fn transform_ndjson_array_factory(
392 values: impl IntoIterator<Item = Result<VrlValue, serde_json::Error>>,
393 ignore_error: bool,
394) -> Result<Vec<VrlValue>> {
395 values
396 .into_iter()
397 .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
398 Ok(item_value) => {
399 match item_value {
400 VrlValue::Array(item_array) => {
401 acc_array.extend(item_array);
402 }
403 VrlValue::Object(_) => {
404 acc_array.push(item_value);
405 }
406 _ => {
407 if !ignore_error {
408 warn!("invalid item in array: {:?}", item_value);
409 return InvalidParameterSnafu {
410 reason: format!("invalid item: {} in array", item_value),
411 }
412 .fail();
413 }
414 }
415 }
416 Ok(acc_array)
417 }
418 Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
419 Err(_) => {
420 warn!("invalid item in array: {:?}", item);
421 Ok(acc_array)
422 }
423 })
424}
425
426async fn dryrun_pipeline_inner(
428 value: Vec<VrlValue>,
429 pipeline: Arc<pipeline::Pipeline>,
430 pipeline_handler: PipelineHandlerRef,
431 query_ctx: &QueryContextRef,
432) -> Result<Response> {
433 let params = GreptimePipelineParams::default();
434
435 let pipeline_def = PipelineDefinition::Resolved(pipeline);
436 let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms, query_ctx.channel());
437 let results = run_pipeline(
438 &pipeline_handler,
439 &pipeline_ctx,
440 PipelineIngestRequest {
441 table: "dry_run".to_owned(),
442 values: value,
443 },
444 query_ctx,
445 true,
446 )
447 .await?;
448
449 let colume_type_key = "colume_type";
450 let data_type_key = "data_type";
451 let name_key = "name";
452
453 let results = results
454 .all_req()
455 .filter_map(|row| {
456 if let Some(rows) = row.rows {
457 let table_name = row.table_name;
458 let schema = rows.schema;
459
460 let schema = schema
461 .iter()
462 .map(|cs| {
463 let mut map = Map::new();
464 map.insert(
465 name_key.to_string(),
466 JsonValue::String(cs.column_name.clone()),
467 );
468 map.insert(
469 data_type_key.to_string(),
470 JsonValue::String(cs.datatype().as_str_name().to_string()),
471 );
472 map.insert(
473 colume_type_key.to_string(),
474 JsonValue::String(cs.semantic_type().as_str_name().to_string()),
475 );
476 map.insert(
477 "fulltext".to_string(),
478 JsonValue::Bool(
479 cs.options
480 .clone()
481 .is_some_and(|x| x.options.contains_key("fulltext")),
482 ),
483 );
484 JsonValue::Object(map)
485 })
486 .collect::<Vec<_>>();
487
488 let rows = rows
489 .rows
490 .into_iter()
491 .map(|row| {
492 row.values
493 .into_iter()
494 .enumerate()
495 .map(|(idx, v)| {
496 v.value_data
497 .map(|d| {
498 let mut map = Map::new();
499 map.insert("value".to_string(), column_data_to_json(d));
500 map.insert(
501 "key".to_string(),
502 schema[idx][name_key].clone(),
503 );
504 map.insert(
505 "semantic_type".to_string(),
506 schema[idx][colume_type_key].clone(),
507 );
508 map.insert(
509 "data_type".to_string(),
510 schema[idx][data_type_key].clone(),
511 );
512 JsonValue::Object(map)
513 })
514 .unwrap_or(JsonValue::Null)
515 })
516 .collect()
517 })
518 .collect();
519
520 let mut result = Map::new();
521 result.insert("schema".to_string(), JsonValue::Array(schema));
522 result.insert("rows".to_string(), JsonValue::Array(rows));
523 result.insert("table_name".to_string(), JsonValue::String(table_name));
524 let result = JsonValue::Object(result);
525 Some(result)
526 } else {
527 None
528 }
529 })
530 .collect();
531 Ok(Json(JsonValue::Array(results)).into_response())
532}
533
534#[derive(Debug, Default, Serialize, Deserialize)]
540pub struct PipelineDryrunParams {
541 pub pipeline_name: Option<String>,
542 pub pipeline_version: Option<String>,
543 pub pipeline: Option<String>,
544 pub data_type: Option<String>,
545 pub data: String,
546}
547
548fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
552 match serde_json::from_slice::<PipelineDryrunParams>(payload) {
553 Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
555 Ok(_) => None,
557 Err(_) => None,
559 }
560}
561
562fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
564 pipeline_name.context(InvalidParameterSnafu {
565 reason: "pipeline_name is required",
566 })
567}
568
569fn check_data_valid(data_len: usize) -> Result<()> {
571 ensure!(
572 data_len <= 10,
573 InvalidParameterSnafu {
574 reason: "data is required",
575 }
576 );
577 Ok(())
578}
579
580fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
581 let body = Json(json!({
582 "error": format!("{}: {}", step_msg,e.output_msg()),
583 }));
584
585 (status_code_to_http_status(&e.status_code()), body).into_response()
586}
587
588fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<VrlValue>> {
592 if let Ok(content_type) = ContentType::from_str(&data_type) {
593 extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
594 } else {
595 InvalidParameterSnafu {
596 reason: format!(
597 "invalid content type: {}, expected: one of {}",
598 data_type,
599 EventPayloadResolver::support_content_type_list().join(", ")
600 ),
601 }
602 .fail()
603 }
604}
605
606#[axum_macros::debug_handler]
607pub async fn pipeline_dryrun(
608 State(log_state): State<LogState>,
609 Query(query_params): Query<LogIngesterQueryParams>,
610 Extension(mut query_ctx): Extension<QueryContext>,
611 TypedHeader(content_type): TypedHeader<ContentType>,
612 payload: Bytes,
613) -> Result<Response> {
614 let handler = log_state.log_handler;
615
616 query_ctx.set_channel(Channel::Log);
617 let query_ctx = Arc::new(query_ctx);
618
619 match check_pipeline_dryrun_params_valid(&payload) {
620 Some(params) => {
621 let data = parse_dryrun_data(
622 params.data_type.unwrap_or("application/json".to_string()),
623 params.data,
624 )?;
625
626 check_data_valid(data.len())?;
627
628 match params.pipeline {
629 None => {
630 let version = to_pipeline_version(params.pipeline_version.as_deref())
631 .context(PipelineSnafu)?;
632 let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
633 let pipeline = handler
634 .get_pipeline(&pipeline_name, version, query_ctx.clone())
635 .await?;
636 dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
637 }
638 Some(pipeline) => {
639 let pipeline = handler.build_pipeline(&pipeline);
640 match pipeline {
641 Ok(pipeline) => {
642 match dryrun_pipeline_inner(
643 data,
644 Arc::new(pipeline),
645 handler,
646 &query_ctx,
647 )
648 .await
649 {
650 Ok(response) => Ok(response),
651 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
652 "Failed to exec pipeline",
653 e,
654 )),
655 }
656 }
657 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
658 "Failed to build pipeline",
659 e,
660 )),
661 }
662 }
663 }
664 }
665 None => {
666 let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
670
671 let version =
672 to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
673
674 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
675
676 let value =
677 extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
678
679 check_data_valid(value.len())?;
680
681 let pipeline = handler
682 .get_pipeline(&pipeline_name, version, query_ctx.clone())
683 .await?;
684
685 dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
686 }
687 }
688}
689
690pub(crate) fn extract_pipeline_params_map_from_headers(
691 headers: &HeaderMap,
692) -> ahash::HashMap<String, String> {
693 GreptimePipelineParams::parse_header_str_to_map(
694 headers
695 .get(GREPTIME_PIPELINE_PARAMS_HEADER)
696 .and_then(|v| v.to_str().ok()),
697 )
698}
699
700#[axum_macros::debug_handler]
701pub async fn log_ingester(
702 State(log_state): State<LogState>,
703 Query(query_params): Query<LogIngesterQueryParams>,
704 Extension(mut query_ctx): Extension<QueryContext>,
705 TypedHeader(content_type): TypedHeader<ContentType>,
706 headers: HeaderMap,
707 payload: Bytes,
708) -> Result<HttpResponse> {
709 let source = query_params.source.as_deref();
711 let response = match &log_state.log_validator {
712 Some(validator) => validator.validate(source, &payload).await,
713 None => None,
714 };
715 if let Some(response) = response {
716 return response;
717 }
718
719 let handler = log_state.log_handler;
720
721 let table_name = query_params.table.context(InvalidParameterSnafu {
722 reason: "table is required",
723 })?;
724
725 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
726
727 let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
728 reason: "pipeline_name is required",
729 })?;
730 let skip_error = query_params.skip_error.unwrap_or(false);
731 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
732 let pipeline = PipelineDefinition::from_name(
733 &pipeline_name,
734 version,
735 query_params.custom_time_index.map(|s| (s, ignore_errors)),
736 )
737 .context(PipelineSnafu)?;
738
739 let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
740
741 query_ctx.set_channel(Channel::Log);
742 let query_ctx = Arc::new(query_ctx);
743
744 let value = log_state
745 .ingest_interceptor
746 .as_ref()
747 .pre_pipeline(value, query_ctx.clone())?;
748
749 let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
750 if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
751 pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
752 }
753 let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
754
755 ingest_logs_inner(
756 handler,
757 pipeline,
758 vec![PipelineIngestRequest {
759 table: table_name,
760 values: value,
761 }],
762 query_ctx,
763 pipeline_params,
764 )
765 .await
766}
767
768#[derive(Debug, EnumIter)]
769enum EventPayloadResolverInner {
770 Json,
771 Ndjson,
772 Text,
773}
774
775impl Display for EventPayloadResolverInner {
776 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
777 match self {
778 EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
779 EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
780 EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
781 }
782 }
783}
784
785impl TryFrom<&ContentType> for EventPayloadResolverInner {
786 type Error = Error;
787
788 fn try_from(content_type: &ContentType) -> Result<Self> {
789 let mime: mime_guess::Mime = content_type.clone().into();
790 match (mime.type_(), mime.subtype()) {
791 (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
792 (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
793 Ok(EventPayloadResolverInner::Ndjson)
794 }
795 (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
796 _ => InvalidParameterSnafu {
797 reason: format!(
798 "invalid content type: {}, expected: one of {}",
799 content_type,
800 EventPayloadResolver::support_content_type_list().join(", ")
801 ),
802 }
803 .fail(),
804 }
805 }
806}
807
808#[derive(Debug)]
809struct EventPayloadResolver<'a> {
810 inner: EventPayloadResolverInner,
811 #[allow(dead_code)]
814 content_type: &'a ContentType,
815}
816
817impl EventPayloadResolver<'_> {
818 pub(super) fn support_content_type_list() -> Vec<String> {
819 EventPayloadResolverInner::iter()
820 .map(|x| x.to_string())
821 .collect()
822 }
823}
824
825impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
826 type Error = Error;
827
828 fn try_from(content_type: &'a ContentType) -> Result<Self> {
829 let inner = EventPayloadResolverInner::try_from(content_type)?;
830 Ok(EventPayloadResolver {
831 inner,
832 content_type,
833 })
834 }
835}
836
837impl EventPayloadResolver<'_> {
838 fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<VrlValue>> {
839 match self.inner {
840 EventPayloadResolverInner::Json => transform_ndjson_array_factory(
841 Deserializer::from_slice(&payload).into_iter(),
842 ignore_errors,
843 ),
844 EventPayloadResolverInner::Ndjson => {
845 let mut result = Vec::with_capacity(1000);
846 let mut buffer = Buffers::new(1000);
847 for (index, line) in payload.lines().enumerate() {
848 let mut line = match line {
849 Ok(line) if !line.is_empty() => line,
850 Ok(_) => continue, Err(_) if ignore_errors => continue,
852 Err(e) => {
853 warn!(e; "invalid string at index: {}", index);
854 return InvalidParameterSnafu {
855 reason: format!("invalid line at index: {}", index),
856 }
857 .fail();
858 }
859 };
860
861 if let Ok(v) = simd_json::serde::from_slice_with_buffers(
864 unsafe { line.as_bytes_mut() },
865 &mut buffer,
866 ) {
867 result.push(v);
868 } else if !ignore_errors {
869 warn!("invalid JSON at index: {}, content: {:?}", index, line);
870 return InvalidParameterSnafu {
871 reason: format!("invalid JSON at index: {}", index),
872 }
873 .fail();
874 }
875 }
876 Ok(result)
877 }
878 EventPayloadResolverInner::Text => {
879 let result = payload
880 .lines()
881 .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
882 .map(|line| {
883 let mut map = BTreeMap::new();
884 map.insert(
885 KeyString::from("message"),
886 VrlValue::Bytes(Bytes::from(line)),
887 );
888 VrlValue::Object(map)
889 })
890 .collect::<Vec<_>>();
891 Ok(result)
892 }
893 }
894 }
895}
896
897fn extract_pipeline_value_by_content_type(
898 content_type: ContentType,
899 payload: Bytes,
900 ignore_errors: bool,
901) -> Result<Vec<VrlValue>> {
902 EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
903 resolver
904 .parse_payload(payload, ignore_errors)
905 .map_err(|e| match &e {
906 Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
907 if reason.contains("invalid item:") {
908 InvalidParameterSnafu {
909 reason: "json format error, please check the date is valid JSON.",
910 }
911 .build()
912 } else {
913 e
914 }
915 }
916 _ => e,
917 })
918 })
919}
920
921pub(crate) async fn ingest_logs_inner(
922 handler: PipelineHandlerRef,
923 pipeline: PipelineDefinition,
924 log_ingest_requests: Vec<PipelineIngestRequest>,
925 query_ctx: QueryContextRef,
926 pipeline_params: GreptimePipelineParams,
927) -> Result<HttpResponse> {
928 let db = query_ctx.get_db_string();
929 let exec_timer = std::time::Instant::now();
930
931 let mut req = ContextReq::default();
932
933 let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
934 for pipeline_req in log_ingest_requests {
935 let requests =
936 run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
937
938 req.merge(requests);
939 }
940
941 let mut outputs = Vec::new();
942 let mut total_rows: u64 = 0;
943 let mut fail = false;
944 for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
945 let output = handler.insert(act_req, temp_ctx).await;
946
947 if let Ok(Output {
948 data: OutputData::AffectedRows(rows),
949 meta: _,
950 }) = &output
951 {
952 total_rows += *rows as u64;
953 } else {
954 fail = true;
955 }
956 outputs.push(output);
957 }
958
959 if total_rows > 0 {
960 METRIC_HTTP_LOGS_INGESTION_COUNTER
961 .with_label_values(&[db.as_str()])
962 .inc_by(total_rows);
963 METRIC_HTTP_LOGS_INGESTION_ELAPSED
964 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
965 .observe(exec_timer.elapsed().as_secs_f64());
966 }
967 if fail {
968 METRIC_HTTP_LOGS_INGESTION_ELAPSED
969 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
970 .observe(exec_timer.elapsed().as_secs_f64());
971 }
972
973 let response = GreptimedbV1Response::from_output(outputs)
974 .await
975 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
976 Ok(response)
977}
978
979#[async_trait]
980pub trait LogValidator: Send + Sync {
981 async fn validate(&self, source: Option<&str>, payload: &Bytes)
984 -> Option<Result<HttpResponse>>;
985}
986
987pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
988
989#[derive(Clone)]
991pub struct LogState {
992 pub log_handler: PipelineHandlerRef,
993 pub log_validator: Option<LogValidatorRef>,
994 pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
995}
996
997#[cfg(test)]
998mod tests {
999
1000 use super::*;
1001
1002 #[test]
1003 fn test_transform_ndjson() {
1004 let s = "{\"a\": 1}\n{\"b\": 2}";
1005 let a = serde_json::to_string(
1006 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1007 )
1008 .unwrap();
1009 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1010
1011 let s = "{\"a\": 1}";
1012 let a = serde_json::to_string(
1013 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1014 )
1015 .unwrap();
1016 assert_eq!(a, "[{\"a\":1}]");
1017
1018 let s = "[{\"a\": 1}]";
1019 let a = serde_json::to_string(
1020 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1021 )
1022 .unwrap();
1023 assert_eq!(a, "[{\"a\":1}]");
1024
1025 let s = "[{\"a\": 1}, {\"b\": 2}]";
1026 let a = serde_json::to_string(
1027 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1028 )
1029 .unwrap();
1030 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1031 }
1032
1033 #[test]
1034 fn test_extract_by_content() {
1035 let payload = r#"
1036 {"a": 1}
1037 {"b": 2"}
1038 {"c": 1}
1039"#
1040 .as_bytes();
1041 let payload = Bytes::from_static(payload);
1042
1043 let fail_rest =
1044 extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
1045 assert!(fail_rest.is_ok());
1046 assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1}).into()]);
1047
1048 let fail_only_wrong =
1049 extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
1050 assert!(fail_only_wrong.is_ok());
1051
1052 let mut map1 = BTreeMap::new();
1053 map1.insert(KeyString::from("a"), VrlValue::Integer(1));
1054 let map1 = VrlValue::Object(map1);
1055 let mut map2 = BTreeMap::new();
1056 map2.insert(KeyString::from("c"), VrlValue::Integer(1));
1057 let map2 = VrlValue::Object(map2);
1058 assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
1059 }
1060}