1use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use async_trait::async_trait;
23use axum::body::Bytes;
24use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
25use axum::http::header::CONTENT_TYPE;
26use axum::http::{HeaderMap, StatusCode};
27use axum::response::{IntoResponse, Response};
28use axum::{Extension, Json};
29use axum_extra::TypedHeader;
30use common_error::ext::ErrorExt;
31use common_query::{Output, OutputData};
32use common_telemetry::{error, warn};
33use datatypes::value::column_data_to_json;
34use headers::ContentType;
35use lazy_static::lazy_static;
36use mime_guess::mime;
37use pipeline::util::to_pipeline_version;
38use pipeline::{
39 ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value as PipelineValue,
40};
41use serde::{Deserialize, Serialize};
42use serde_json::{json, Deserializer, Map, Value as JsonValue};
43use session::context::{Channel, QueryContext, QueryContextRef};
44use snafu::{ensure, OptionExt, ResultExt};
45use strum::{EnumIter, IntoEnumIterator};
46
47use crate::error::{
48 status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
49};
50use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
51use crate::http::header::{
52 CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
53};
54use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
55use crate::http::result::greptime_result_v1::GreptimedbV1Response;
56use crate::http::HttpResponse;
57use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
58use crate::metrics::{
59 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
60 METRIC_SUCCESS_VALUE,
61};
62use crate::pipeline::run_pipeline;
63use crate::query_handler::PipelineHandlerRef;
64
65const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
66const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
67
68lazy_static! {
69 pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
70 pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
71 pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
72 pub static ref PB_CONTENT_TYPE: ContentType =
73 ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
74 pub static ref NDJSON_CONTENT_TYPE: ContentType =
75 ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
76}
77
78#[derive(Debug, Default, Serialize, Deserialize)]
80pub struct LogIngesterQueryParams {
81 pub db: Option<String>,
83 pub table: Option<String>,
85 pub pipeline_name: Option<String>,
87 pub version: Option<String>,
89 pub ignore_errors: Option<bool>,
91 pub source: Option<String>,
93 pub msg_field: Option<String>,
96 pub custom_time_index: Option<String>,
105 pub skip_error: Option<bool>,
111}
112
113#[derive(Debug, PartialEq)]
116pub(crate) struct PipelineIngestRequest {
117 pub table: String,
119 pub values: Vec<PipelineValue>,
121}
122
123pub struct PipelineContent(String);
124
125impl<S> FromRequest<S> for PipelineContent
126where
127 S: Send + Sync,
128{
129 type Rejection = Response;
130
131 async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
132 let content_type_header = req.headers().get(CONTENT_TYPE);
133 let content_type = content_type_header.and_then(|value| value.to_str().ok());
134 if let Some(content_type) = content_type {
135 if content_type.ends_with("yaml") {
136 let payload = String::from_request(req, state)
137 .await
138 .map_err(IntoResponse::into_response)?;
139 return Ok(Self(payload));
140 }
141
142 if content_type.starts_with("multipart/form-data") {
143 let mut payload: Multipart = Multipart::from_request(req, state)
144 .await
145 .map_err(IntoResponse::into_response)?;
146 let file = payload
147 .next_field()
148 .await
149 .map_err(IntoResponse::into_response)?;
150 let payload = file
151 .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
152 .text()
153 .await
154 .map_err(IntoResponse::into_response)?;
155 return Ok(Self(payload));
156 }
157 }
158
159 Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
160 }
161}
162
163#[axum_macros::debug_handler]
164pub async fn query_pipeline(
165 State(state): State<LogState>,
166 Extension(mut query_ctx): Extension<QueryContext>,
167 Query(query_params): Query<LogIngesterQueryParams>,
168 Path(pipeline_name): Path<String>,
169) -> Result<GreptimedbManageResponse> {
170 let start = Instant::now();
171 let handler = state.log_handler;
172 ensure!(
173 !pipeline_name.is_empty(),
174 InvalidParameterSnafu {
175 reason: "pipeline_name is required in path",
176 }
177 );
178
179 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
180
181 query_ctx.set_channel(Channel::Http);
182 let query_ctx = Arc::new(query_ctx);
183
184 let (pipeline, pipeline_version) = handler
185 .get_pipeline_str(&pipeline_name, version, query_ctx)
186 .await?;
187
188 Ok(GreptimedbManageResponse::from_pipeline(
189 pipeline_name,
190 query_params
191 .version
192 .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
193 start.elapsed().as_millis() as u64,
194 Some(pipeline),
195 ))
196}
197
198#[axum_macros::debug_handler]
199pub async fn add_pipeline(
200 State(state): State<LogState>,
201 Path(pipeline_name): Path<String>,
202 Extension(mut query_ctx): Extension<QueryContext>,
203 PipelineContent(payload): PipelineContent,
204) -> Result<GreptimedbManageResponse> {
205 let start = Instant::now();
206 let handler = state.log_handler;
207 ensure!(
208 !pipeline_name.is_empty(),
209 InvalidParameterSnafu {
210 reason: "pipeline_name is required in path",
211 }
212 );
213 ensure!(
214 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
215 InvalidParameterSnafu {
216 reason: "pipeline_name cannot start with greptime_",
217 }
218 );
219 ensure!(
220 !payload.is_empty(),
221 InvalidParameterSnafu {
222 reason: "pipeline is required in body",
223 }
224 );
225
226 query_ctx.set_channel(Channel::Http);
227 let query_ctx = Arc::new(query_ctx);
228
229 let content_type = "yaml";
230 let result = handler
231 .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
232 .await;
233
234 result
235 .map(|pipeline| {
236 GreptimedbManageResponse::from_pipeline(
237 pipeline_name,
238 pipeline.0.to_timezone_aware_string(None),
239 start.elapsed().as_millis() as u64,
240 None,
241 )
242 })
243 .map_err(|e| {
244 error!(e; "failed to insert pipeline");
245 e
246 })
247}
248
249#[axum_macros::debug_handler]
250pub async fn delete_pipeline(
251 State(state): State<LogState>,
252 Extension(mut query_ctx): Extension<QueryContext>,
253 Query(query_params): Query<LogIngesterQueryParams>,
254 Path(pipeline_name): Path<String>,
255) -> Result<GreptimedbManageResponse> {
256 let start = Instant::now();
257 let handler = state.log_handler;
258 ensure!(
259 !pipeline_name.is_empty(),
260 InvalidParameterSnafu {
261 reason: "pipeline_name is required",
262 }
263 );
264
265 let version_str = query_params.version.context(InvalidParameterSnafu {
266 reason: "version is required",
267 })?;
268
269 let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
270
271 query_ctx.set_channel(Channel::Http);
272 let query_ctx = Arc::new(query_ctx);
273
274 handler
275 .delete_pipeline(&pipeline_name, version, query_ctx)
276 .await
277 .map(|v| {
278 if v.is_some() {
279 GreptimedbManageResponse::from_pipeline(
280 pipeline_name,
281 version_str,
282 start.elapsed().as_millis() as u64,
283 None,
284 )
285 } else {
286 GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
287 }
288 })
289 .map_err(|e| {
290 error!(e; "failed to delete pipeline");
291 e
292 })
293}
294
295fn transform_ndjson_array_factory(
298 values: impl IntoIterator<Item = Result<JsonValue, serde_json::Error>>,
299 ignore_error: bool,
300) -> Result<Vec<JsonValue>> {
301 values
302 .into_iter()
303 .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
304 Ok(item_value) => {
305 match item_value {
306 JsonValue::Array(item_array) => {
307 acc_array.extend(item_array);
308 }
309 JsonValue::Object(_) => {
310 acc_array.push(item_value);
311 }
312 _ => {
313 if !ignore_error {
314 warn!("invalid item in array: {:?}", item_value);
315 return InvalidParameterSnafu {
316 reason: format!("invalid item: {} in array", item_value),
317 }
318 .fail();
319 }
320 }
321 }
322 Ok(acc_array)
323 }
324 Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
325 Err(_) => {
326 warn!("invalid item in array: {:?}", item);
327 Ok(acc_array)
328 }
329 })
330}
331
332async fn dryrun_pipeline_inner(
334 value: Vec<PipelineValue>,
335 pipeline: Arc<pipeline::Pipeline>,
336 pipeline_handler: PipelineHandlerRef,
337 query_ctx: &QueryContextRef,
338) -> Result<Response> {
339 let params = GreptimePipelineParams::default();
340
341 let pipeline_def = PipelineDefinition::Resolved(pipeline);
342 let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms, query_ctx.channel());
343 let results = run_pipeline(
344 &pipeline_handler,
345 &pipeline_ctx,
346 PipelineIngestRequest {
347 table: "dry_run".to_owned(),
348 values: value,
349 },
350 query_ctx,
351 true,
352 )
353 .await?;
354
355 let colume_type_key = "colume_type";
356 let data_type_key = "data_type";
357 let name_key = "name";
358
359 let results = results
360 .all_req()
361 .filter_map(|row| {
362 if let Some(rows) = row.rows {
363 let table_name = row.table_name;
364 let schema = rows.schema;
365
366 let schema = schema
367 .iter()
368 .map(|cs| {
369 let mut map = Map::new();
370 map.insert(
371 name_key.to_string(),
372 JsonValue::String(cs.column_name.clone()),
373 );
374 map.insert(
375 data_type_key.to_string(),
376 JsonValue::String(cs.datatype().as_str_name().to_string()),
377 );
378 map.insert(
379 colume_type_key.to_string(),
380 JsonValue::String(cs.semantic_type().as_str_name().to_string()),
381 );
382 map.insert(
383 "fulltext".to_string(),
384 JsonValue::Bool(
385 cs.options
386 .clone()
387 .is_some_and(|x| x.options.contains_key("fulltext")),
388 ),
389 );
390 JsonValue::Object(map)
391 })
392 .collect::<Vec<_>>();
393
394 let rows = rows
395 .rows
396 .into_iter()
397 .map(|row| {
398 row.values
399 .into_iter()
400 .enumerate()
401 .map(|(idx, v)| {
402 v.value_data
403 .map(|d| {
404 let mut map = Map::new();
405 map.insert("value".to_string(), column_data_to_json(d));
406 map.insert(
407 "key".to_string(),
408 schema[idx][name_key].clone(),
409 );
410 map.insert(
411 "semantic_type".to_string(),
412 schema[idx][colume_type_key].clone(),
413 );
414 map.insert(
415 "data_type".to_string(),
416 schema[idx][data_type_key].clone(),
417 );
418 JsonValue::Object(map)
419 })
420 .unwrap_or(JsonValue::Null)
421 })
422 .collect()
423 })
424 .collect();
425
426 let mut result = Map::new();
427 result.insert("schema".to_string(), JsonValue::Array(schema));
428 result.insert("rows".to_string(), JsonValue::Array(rows));
429 result.insert("table_name".to_string(), JsonValue::String(table_name));
430 let result = JsonValue::Object(result);
431 Some(result)
432 } else {
433 None
434 }
435 })
436 .collect();
437 Ok(Json(JsonValue::Array(results)).into_response())
438}
439
440#[derive(Debug, Default, Serialize, Deserialize)]
446pub struct PipelineDryrunParams {
447 pub pipeline_name: Option<String>,
448 pub pipeline_version: Option<String>,
449 pub pipeline: Option<String>,
450 pub data_type: Option<String>,
451 pub data: String,
452}
453
454fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
458 match serde_json::from_slice::<PipelineDryrunParams>(payload) {
459 Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
461 Ok(_) => None,
463 Err(_) => None,
465 }
466}
467
468fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
470 pipeline_name.context(InvalidParameterSnafu {
471 reason: "pipeline_name is required",
472 })
473}
474
475fn check_data_valid(data_len: usize) -> Result<()> {
477 ensure!(
478 data_len <= 10,
479 InvalidParameterSnafu {
480 reason: "data is required",
481 }
482 );
483 Ok(())
484}
485
486fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
487 let body = Json(json!({
488 "error": format!("{}: {}", step_msg,e.output_msg()),
489 }));
490
491 (status_code_to_http_status(&e.status_code()), body).into_response()
492}
493
494fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<PipelineValue>> {
498 if let Ok(content_type) = ContentType::from_str(&data_type) {
499 extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
500 } else {
501 InvalidParameterSnafu {
502 reason: format!(
503 "invalid content type: {}, expected: one of {}",
504 data_type,
505 EventPayloadResolver::support_content_type_list().join(", ")
506 ),
507 }
508 .fail()
509 }
510}
511
512#[axum_macros::debug_handler]
513pub async fn pipeline_dryrun(
514 State(log_state): State<LogState>,
515 Query(query_params): Query<LogIngesterQueryParams>,
516 Extension(mut query_ctx): Extension<QueryContext>,
517 TypedHeader(content_type): TypedHeader<ContentType>,
518 payload: Bytes,
519) -> Result<Response> {
520 let handler = log_state.log_handler;
521
522 query_ctx.set_channel(Channel::Http);
523 let query_ctx = Arc::new(query_ctx);
524
525 match check_pipeline_dryrun_params_valid(&payload) {
526 Some(params) => {
527 let data = parse_dryrun_data(
528 params.data_type.unwrap_or("application/json".to_string()),
529 params.data,
530 )?;
531
532 check_data_valid(data.len())?;
533
534 match params.pipeline {
535 None => {
536 let version = to_pipeline_version(params.pipeline_version.as_deref())
537 .context(PipelineSnafu)?;
538 let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
539 let pipeline = handler
540 .get_pipeline(&pipeline_name, version, query_ctx.clone())
541 .await?;
542 dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
543 }
544 Some(pipeline) => {
545 let pipeline = handler.build_pipeline(&pipeline);
546 match pipeline {
547 Ok(pipeline) => {
548 match dryrun_pipeline_inner(
549 data,
550 Arc::new(pipeline),
551 handler,
552 &query_ctx,
553 )
554 .await
555 {
556 Ok(response) => Ok(response),
557 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
558 "Failed to exec pipeline",
559 e,
560 )),
561 }
562 }
563 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
564 "Failed to build pipeline",
565 e,
566 )),
567 }
568 }
569 }
570 }
571 None => {
572 let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
576
577 let version =
578 to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
579
580 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
581
582 let value =
583 extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
584
585 check_data_valid(value.len())?;
586
587 let pipeline = handler
588 .get_pipeline(&pipeline_name, version, query_ctx.clone())
589 .await?;
590
591 dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
592 }
593 }
594}
595
596pub(crate) fn extract_pipeline_params_map_from_headers(
597 headers: &HeaderMap,
598) -> ahash::HashMap<String, String> {
599 GreptimePipelineParams::parse_header_str_to_map(
600 headers
601 .get(GREPTIME_PIPELINE_PARAMS_HEADER)
602 .and_then(|v| v.to_str().ok()),
603 )
604}
605
606#[axum_macros::debug_handler]
607pub async fn log_ingester(
608 State(log_state): State<LogState>,
609 Query(query_params): Query<LogIngesterQueryParams>,
610 Extension(mut query_ctx): Extension<QueryContext>,
611 TypedHeader(content_type): TypedHeader<ContentType>,
612 headers: HeaderMap,
613 payload: Bytes,
614) -> Result<HttpResponse> {
615 let source = query_params.source.as_deref();
617 let response = match &log_state.log_validator {
618 Some(validator) => validator.validate(source, &payload).await,
619 None => None,
620 };
621 if let Some(response) = response {
622 return response;
623 }
624
625 let handler = log_state.log_handler;
626
627 let table_name = query_params.table.context(InvalidParameterSnafu {
628 reason: "table is required",
629 })?;
630
631 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
632
633 let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
634 reason: "pipeline_name is required",
635 })?;
636 let skip_error = query_params.skip_error.unwrap_or(false);
637 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
638 let pipeline = PipelineDefinition::from_name(
639 &pipeline_name,
640 version,
641 query_params.custom_time_index.map(|s| (s, ignore_errors)),
642 )
643 .context(PipelineSnafu)?;
644
645 let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
646
647 query_ctx.set_channel(Channel::Http);
648 let query_ctx = Arc::new(query_ctx);
649
650 let value = log_state
651 .ingest_interceptor
652 .as_ref()
653 .pre_pipeline(value, query_ctx.clone())?;
654
655 let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
656 if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
657 pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
658 }
659 let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
660
661 ingest_logs_inner(
662 handler,
663 pipeline,
664 vec![PipelineIngestRequest {
665 table: table_name,
666 values: value,
667 }],
668 query_ctx,
669 pipeline_params,
670 )
671 .await
672}
673
674#[derive(Debug, EnumIter)]
675enum EventPayloadResolverInner {
676 Json,
677 Ndjson,
678 Text,
679}
680
681impl Display for EventPayloadResolverInner {
682 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
683 match self {
684 EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
685 EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
686 EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
687 }
688 }
689}
690
691impl TryFrom<&ContentType> for EventPayloadResolverInner {
692 type Error = Error;
693
694 fn try_from(content_type: &ContentType) -> Result<Self> {
695 let mime: mime_guess::Mime = content_type.clone().into();
696 match (mime.type_(), mime.subtype()) {
697 (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
698 (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
699 Ok(EventPayloadResolverInner::Ndjson)
700 }
701 (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
702 _ => InvalidParameterSnafu {
703 reason: format!(
704 "invalid content type: {}, expected: one of {}",
705 content_type,
706 EventPayloadResolver::support_content_type_list().join(", ")
707 ),
708 }
709 .fail(),
710 }
711 }
712}
713
714#[derive(Debug)]
715struct EventPayloadResolver<'a> {
716 inner: EventPayloadResolverInner,
717 #[allow(dead_code)]
720 content_type: &'a ContentType,
721}
722
723impl EventPayloadResolver<'_> {
724 pub(super) fn support_content_type_list() -> Vec<String> {
725 EventPayloadResolverInner::iter()
726 .map(|x| x.to_string())
727 .collect()
728 }
729}
730
731impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
732 type Error = Error;
733
734 fn try_from(content_type: &'a ContentType) -> Result<Self> {
735 let inner = EventPayloadResolverInner::try_from(content_type)?;
736 Ok(EventPayloadResolver {
737 inner,
738 content_type,
739 })
740 }
741}
742
743impl EventPayloadResolver<'_> {
744 fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<PipelineValue>> {
745 match self.inner {
746 EventPayloadResolverInner::Json => {
747 pipeline::json_array_to_map(transform_ndjson_array_factory(
748 Deserializer::from_slice(&payload).into_iter(),
749 ignore_errors,
750 )?)
751 .context(PipelineSnafu)
752 }
753 EventPayloadResolverInner::Ndjson => {
754 let mut result = Vec::with_capacity(1000);
755 for (index, line) in payload.lines().enumerate() {
756 let mut line = match line {
757 Ok(line) if !line.is_empty() => line,
758 Ok(_) => continue, Err(_) if ignore_errors => continue,
760 Err(e) => {
761 warn!(e; "invalid string at index: {}", index);
762 return InvalidParameterSnafu {
763 reason: format!("invalid line at index: {}", index),
764 }
765 .fail();
766 }
767 };
768
769 if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
772 let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
773 result.push(v);
774 } else if !ignore_errors {
775 warn!("invalid JSON at index: {}, content: {:?}", index, line);
776 return InvalidParameterSnafu {
777 reason: format!("invalid JSON at index: {}", index),
778 }
779 .fail();
780 }
781 }
782 Ok(result)
783 }
784 EventPayloadResolverInner::Text => {
785 let result = payload
786 .lines()
787 .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
788 .map(|line| {
789 let mut map = BTreeMap::new();
790 map.insert("message".to_string(), PipelineValue::String(line));
791 PipelineValue::Map(map.into())
792 })
793 .collect::<Vec<_>>();
794 Ok(result)
795 }
796 }
797 }
798}
799
800fn extract_pipeline_value_by_content_type(
801 content_type: ContentType,
802 payload: Bytes,
803 ignore_errors: bool,
804) -> Result<Vec<PipelineValue>> {
805 EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
806 resolver
807 .parse_payload(payload, ignore_errors)
808 .map_err(|e| match &e {
809 Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
810 if reason.contains("invalid item:") {
811 InvalidParameterSnafu {
812 reason: "json format error, please check the date is valid JSON.",
813 }
814 .build()
815 } else {
816 e
817 }
818 }
819 _ => e,
820 })
821 })
822}
823
824pub(crate) async fn ingest_logs_inner(
825 handler: PipelineHandlerRef,
826 pipeline: PipelineDefinition,
827 log_ingest_requests: Vec<PipelineIngestRequest>,
828 query_ctx: QueryContextRef,
829 pipeline_params: GreptimePipelineParams,
830) -> Result<HttpResponse> {
831 let db = query_ctx.get_db_string();
832 let exec_timer = std::time::Instant::now();
833
834 let mut req = ContextReq::default();
835
836 let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
837 for pipeline_req in log_ingest_requests {
838 let requests =
839 run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
840
841 req.merge(requests);
842 }
843
844 let mut outputs = Vec::new();
845 let mut total_rows: u64 = 0;
846 let mut fail = false;
847 for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
848 let output = handler.insert(act_req, temp_ctx).await;
849
850 if let Ok(Output {
851 data: OutputData::AffectedRows(rows),
852 meta: _,
853 }) = &output
854 {
855 total_rows += *rows as u64;
856 } else {
857 fail = true;
858 }
859 outputs.push(output);
860 }
861
862 if total_rows > 0 {
863 METRIC_HTTP_LOGS_INGESTION_COUNTER
864 .with_label_values(&[db.as_str()])
865 .inc_by(total_rows);
866 METRIC_HTTP_LOGS_INGESTION_ELAPSED
867 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
868 .observe(exec_timer.elapsed().as_secs_f64());
869 }
870 if fail {
871 METRIC_HTTP_LOGS_INGESTION_ELAPSED
872 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
873 .observe(exec_timer.elapsed().as_secs_f64());
874 }
875
876 let response = GreptimedbV1Response::from_output(outputs)
877 .await
878 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
879 Ok(response)
880}
881
882#[async_trait]
883pub trait LogValidator: Send + Sync {
884 async fn validate(&self, source: Option<&str>, payload: &Bytes)
887 -> Option<Result<HttpResponse>>;
888}
889
890pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
891
892#[derive(Clone)]
894pub struct LogState {
895 pub log_handler: PipelineHandlerRef,
896 pub log_validator: Option<LogValidatorRef>,
897 pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
898}
899
900#[cfg(test)]
901mod tests {
902 use super::*;
903
904 #[test]
905 fn test_transform_ndjson() {
906 let s = "{\"a\": 1}\n{\"b\": 2}";
907 let a = JsonValue::Array(
908 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
909 )
910 .to_string();
911 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
912
913 let s = "{\"a\": 1}";
914 let a = JsonValue::Array(
915 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
916 )
917 .to_string();
918 assert_eq!(a, "[{\"a\":1}]");
919
920 let s = "[{\"a\": 1}]";
921 let a = JsonValue::Array(
922 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
923 )
924 .to_string();
925 assert_eq!(a, "[{\"a\":1}]");
926
927 let s = "[{\"a\": 1}, {\"b\": 2}]";
928 let a = JsonValue::Array(
929 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
930 )
931 .to_string();
932 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
933 }
934
935 #[test]
936 fn test_extract_by_content() {
937 let payload = r#"
938 {"a": 1}
939 {"b": 2"}
940 {"c": 1}
941"#
942 .as_bytes();
943 let payload = Bytes::from_static(payload);
944
945 let fail_rest =
946 extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
947 assert!(fail_rest.is_ok());
948 assert_eq!(
949 fail_rest.unwrap(),
950 pipeline::json_array_to_map(vec![json!({"a": 1})]).unwrap()
951 );
952
953 let fail_only_wrong =
954 extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
955 assert!(fail_only_wrong.is_ok());
956
957 let mut map1 = BTreeMap::new();
958 map1.insert("a".to_string(), PipelineValue::Uint64(1));
959 let map1 = PipelineValue::Map(map1.into());
960 let mut map2 = BTreeMap::new();
961 map2.insert("c".to_string(), PipelineValue::Uint64(1));
962 let map2 = PipelineValue::Map(map2.into());
963 assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
964 }
965}