1use std::fmt::Display;
16use std::io::BufRead;
17use std::str::FromStr;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_trait::async_trait;
22use axum::body::Bytes;
23use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
24use axum::http::header::CONTENT_TYPE;
25use axum::http::{HeaderMap, StatusCode};
26use axum::response::{IntoResponse, Response};
27use axum::{Extension, Json};
28use axum_extra::TypedHeader;
29use common_error::ext::ErrorExt;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use datatypes::value::column_data_to_json;
33use headers::ContentType;
34use lazy_static::lazy_static;
35use pipeline::util::to_pipeline_version;
36use pipeline::{
37 ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap,
38};
39use serde::{Deserialize, Serialize};
40use serde_json::{json, Deserializer, Map, Value};
41use session::context::{Channel, QueryContext, QueryContextRef};
42use snafu::{ensure, OptionExt, ResultExt};
43use strum::{EnumIter, IntoEnumIterator};
44
45use crate::error::{
46 status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
47};
48use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
49use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
50use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
51use crate::http::result::greptime_result_v1::GreptimedbV1Response;
52use crate::http::HttpResponse;
53use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
54use crate::metrics::{
55 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
56 METRIC_SUCCESS_VALUE,
57};
58use crate::pipeline::run_pipeline;
59use crate::query_handler::PipelineHandlerRef;
60
61const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
62
63lazy_static! {
64 pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
65 pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
66 pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
67 pub static ref PB_CONTENT_TYPE: ContentType =
68 ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
69 pub static ref NDJSON_CONTENT_TYPE: ContentType =
70 ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
71}
72
73#[derive(Debug, Default, Serialize, Deserialize)]
75pub struct LogIngesterQueryParams {
76 pub db: Option<String>,
78 pub table: Option<String>,
80 pub pipeline_name: Option<String>,
82 pub version: Option<String>,
84 pub ignore_errors: Option<bool>,
86 pub source: Option<String>,
88 pub msg_field: Option<String>,
91 pub custom_time_index: Option<String>,
100}
101
102#[derive(Debug, PartialEq)]
105pub(crate) struct PipelineIngestRequest {
106 pub table: String,
108 pub values: Vec<PipelineMap>,
110}
111
112pub struct PipelineContent(String);
113
114impl<S> FromRequest<S> for PipelineContent
115where
116 S: Send + Sync,
117{
118 type Rejection = Response;
119
120 async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
121 let content_type_header = req.headers().get(CONTENT_TYPE);
122 let content_type = content_type_header.and_then(|value| value.to_str().ok());
123 if let Some(content_type) = content_type {
124 if content_type.ends_with("yaml") {
125 let payload = String::from_request(req, state)
126 .await
127 .map_err(IntoResponse::into_response)?;
128 return Ok(Self(payload));
129 }
130
131 if content_type.starts_with("multipart/form-data") {
132 let mut payload: Multipart = Multipart::from_request(req, state)
133 .await
134 .map_err(IntoResponse::into_response)?;
135 let file = payload
136 .next_field()
137 .await
138 .map_err(IntoResponse::into_response)?;
139 let payload = file
140 .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
141 .text()
142 .await
143 .map_err(IntoResponse::into_response)?;
144 return Ok(Self(payload));
145 }
146 }
147
148 Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
149 }
150}
151
152#[axum_macros::debug_handler]
153pub async fn query_pipeline(
154 State(state): State<LogState>,
155 Extension(mut query_ctx): Extension<QueryContext>,
156 Query(query_params): Query<LogIngesterQueryParams>,
157 Path(pipeline_name): Path<String>,
158) -> Result<GreptimedbManageResponse> {
159 let start = Instant::now();
160 let handler = state.log_handler;
161 ensure!(
162 !pipeline_name.is_empty(),
163 InvalidParameterSnafu {
164 reason: "pipeline_name is required in path",
165 }
166 );
167
168 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
169
170 query_ctx.set_channel(Channel::Http);
171 let query_ctx = Arc::new(query_ctx);
172
173 let (pipeline, pipeline_version) = handler
174 .get_pipeline_str(&pipeline_name, version, query_ctx)
175 .await?;
176
177 Ok(GreptimedbManageResponse::from_pipeline(
178 pipeline_name,
179 query_params
180 .version
181 .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
182 start.elapsed().as_millis() as u64,
183 Some(pipeline),
184 ))
185}
186
187#[axum_macros::debug_handler]
188pub async fn add_pipeline(
189 State(state): State<LogState>,
190 Path(pipeline_name): Path<String>,
191 Extension(mut query_ctx): Extension<QueryContext>,
192 PipelineContent(payload): PipelineContent,
193) -> Result<GreptimedbManageResponse> {
194 let start = Instant::now();
195 let handler = state.log_handler;
196 ensure!(
197 !pipeline_name.is_empty(),
198 InvalidParameterSnafu {
199 reason: "pipeline_name is required in path",
200 }
201 );
202 ensure!(
203 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
204 InvalidParameterSnafu {
205 reason: "pipeline_name cannot start with greptime_",
206 }
207 );
208 ensure!(
209 !payload.is_empty(),
210 InvalidParameterSnafu {
211 reason: "pipeline is required in body",
212 }
213 );
214
215 query_ctx.set_channel(Channel::Http);
216 let query_ctx = Arc::new(query_ctx);
217
218 let content_type = "yaml";
219 let result = handler
220 .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
221 .await;
222
223 result
224 .map(|pipeline| {
225 GreptimedbManageResponse::from_pipeline(
226 pipeline_name,
227 pipeline.0.to_timezone_aware_string(None),
228 start.elapsed().as_millis() as u64,
229 None,
230 )
231 })
232 .map_err(|e| {
233 error!(e; "failed to insert pipeline");
234 e
235 })
236}
237
238#[axum_macros::debug_handler]
239pub async fn delete_pipeline(
240 State(state): State<LogState>,
241 Extension(mut query_ctx): Extension<QueryContext>,
242 Query(query_params): Query<LogIngesterQueryParams>,
243 Path(pipeline_name): Path<String>,
244) -> Result<GreptimedbManageResponse> {
245 let start = Instant::now();
246 let handler = state.log_handler;
247 ensure!(
248 !pipeline_name.is_empty(),
249 InvalidParameterSnafu {
250 reason: "pipeline_name is required",
251 }
252 );
253
254 let version_str = query_params.version.context(InvalidParameterSnafu {
255 reason: "version is required",
256 })?;
257
258 let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
259
260 query_ctx.set_channel(Channel::Http);
261 let query_ctx = Arc::new(query_ctx);
262
263 handler
264 .delete_pipeline(&pipeline_name, version, query_ctx)
265 .await
266 .map(|v| {
267 if v.is_some() {
268 GreptimedbManageResponse::from_pipeline(
269 pipeline_name,
270 version_str,
271 start.elapsed().as_millis() as u64,
272 None,
273 )
274 } else {
275 GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
276 }
277 })
278 .map_err(|e| {
279 error!(e; "failed to delete pipeline");
280 e
281 })
282}
283
284fn transform_ndjson_array_factory(
287 values: impl IntoIterator<Item = Result<Value, serde_json::Error>>,
288 ignore_error: bool,
289) -> Result<Vec<Value>> {
290 values
291 .into_iter()
292 .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
293 Ok(item_value) => {
294 match item_value {
295 Value::Array(item_array) => {
296 acc_array.extend(item_array);
297 }
298 Value::Object(_) => {
299 acc_array.push(item_value);
300 }
301 _ => {
302 if !ignore_error {
303 warn!("invalid item in array: {:?}", item_value);
304 return InvalidParameterSnafu {
305 reason: format!("invalid item: {} in array", item_value),
306 }
307 .fail();
308 }
309 }
310 }
311 Ok(acc_array)
312 }
313 Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
314 Err(_) => {
315 warn!("invalid item in array: {:?}", item);
316 Ok(acc_array)
317 }
318 })
319}
320
321async fn dryrun_pipeline_inner(
323 value: Vec<PipelineMap>,
324 pipeline: Arc<pipeline::Pipeline>,
325 pipeline_handler: PipelineHandlerRef,
326 query_ctx: &QueryContextRef,
327) -> Result<Response> {
328 let params = GreptimePipelineParams::default();
329
330 let pipeline_def = PipelineDefinition::Resolved(pipeline);
331 let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms, query_ctx.channel());
332 let results = run_pipeline(
333 &pipeline_handler,
334 &pipeline_ctx,
335 PipelineIngestRequest {
336 table: "dry_run".to_owned(),
337 values: value,
338 },
339 query_ctx,
340 true,
341 )
342 .await?;
343
344 let colume_type_key = "colume_type";
345 let data_type_key = "data_type";
346 let name_key = "name";
347
348 let results = results
349 .all_req()
350 .filter_map(|row| {
351 if let Some(rows) = row.rows {
352 let table_name = row.table_name;
353 let schema = rows.schema;
354
355 let schema = schema
356 .iter()
357 .map(|cs| {
358 let mut map = Map::new();
359 map.insert(name_key.to_string(), Value::String(cs.column_name.clone()));
360 map.insert(
361 data_type_key.to_string(),
362 Value::String(cs.datatype().as_str_name().to_string()),
363 );
364 map.insert(
365 colume_type_key.to_string(),
366 Value::String(cs.semantic_type().as_str_name().to_string()),
367 );
368 map.insert(
369 "fulltext".to_string(),
370 Value::Bool(
371 cs.options
372 .clone()
373 .is_some_and(|x| x.options.contains_key("fulltext")),
374 ),
375 );
376 Value::Object(map)
377 })
378 .collect::<Vec<_>>();
379
380 let rows = rows
381 .rows
382 .into_iter()
383 .map(|row| {
384 row.values
385 .into_iter()
386 .enumerate()
387 .map(|(idx, v)| {
388 v.value_data
389 .map(|d| {
390 let mut map = Map::new();
391 map.insert("value".to_string(), column_data_to_json(d));
392 map.insert(
393 "key".to_string(),
394 schema[idx][name_key].clone(),
395 );
396 map.insert(
397 "semantic_type".to_string(),
398 schema[idx][colume_type_key].clone(),
399 );
400 map.insert(
401 "data_type".to_string(),
402 schema[idx][data_type_key].clone(),
403 );
404 Value::Object(map)
405 })
406 .unwrap_or(Value::Null)
407 })
408 .collect()
409 })
410 .collect();
411
412 let mut result = Map::new();
413 result.insert("schema".to_string(), Value::Array(schema));
414 result.insert("rows".to_string(), Value::Array(rows));
415 result.insert("table_name".to_string(), Value::String(table_name));
416 let result = Value::Object(result);
417 Some(result)
418 } else {
419 None
420 }
421 })
422 .collect();
423 Ok(Json(Value::Array(results)).into_response())
424}
425
426#[derive(Debug, Default, Serialize, Deserialize)]
432pub struct PipelineDryrunParams {
433 pub pipeline_name: Option<String>,
434 pub pipeline_version: Option<String>,
435 pub pipeline: Option<String>,
436 pub data_type: Option<String>,
437 pub data: String,
438}
439
440fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
444 match serde_json::from_slice::<PipelineDryrunParams>(payload) {
445 Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
447 Ok(_) => None,
449 Err(_) => None,
451 }
452}
453
454fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
456 pipeline_name.context(InvalidParameterSnafu {
457 reason: "pipeline_name is required",
458 })
459}
460
461fn check_data_valid(data_len: usize) -> Result<()> {
463 ensure!(
464 data_len <= 10,
465 InvalidParameterSnafu {
466 reason: "data is required",
467 }
468 );
469 Ok(())
470}
471
472fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
473 let body = Json(json!({
474 "error": format!("{}: {}", step_msg,e.output_msg()),
475 }));
476
477 (status_code_to_http_status(&e.status_code()), body).into_response()
478}
479
480fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<PipelineMap>> {
484 if let Ok(content_type) = ContentType::from_str(&data_type) {
485 extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
486 } else {
487 InvalidParameterSnafu {
488 reason: format!(
489 "invalid content type: {}, expected: one of {}",
490 data_type,
491 EventPayloadResolver::support_content_type_list().join(", ")
492 ),
493 }
494 .fail()
495 }
496}
497
498#[axum_macros::debug_handler]
499pub async fn pipeline_dryrun(
500 State(log_state): State<LogState>,
501 Query(query_params): Query<LogIngesterQueryParams>,
502 Extension(mut query_ctx): Extension<QueryContext>,
503 TypedHeader(content_type): TypedHeader<ContentType>,
504 payload: Bytes,
505) -> Result<Response> {
506 let handler = log_state.log_handler;
507
508 query_ctx.set_channel(Channel::Http);
509 let query_ctx = Arc::new(query_ctx);
510
511 match check_pipeline_dryrun_params_valid(&payload) {
512 Some(params) => {
513 let data = parse_dryrun_data(
514 params.data_type.unwrap_or("application/json".to_string()),
515 params.data,
516 )?;
517
518 check_data_valid(data.len())?;
519
520 match params.pipeline {
521 None => {
522 let version = to_pipeline_version(params.pipeline_version.as_deref())
523 .context(PipelineSnafu)?;
524 let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
525 let pipeline = handler
526 .get_pipeline(&pipeline_name, version, query_ctx.clone())
527 .await?;
528 dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
529 }
530 Some(pipeline) => {
531 let pipeline = handler.build_pipeline(&pipeline);
532 match pipeline {
533 Ok(pipeline) => {
534 match dryrun_pipeline_inner(
535 data,
536 Arc::new(pipeline),
537 handler,
538 &query_ctx,
539 )
540 .await
541 {
542 Ok(response) => Ok(response),
543 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
544 "Failed to exec pipeline",
545 e,
546 )),
547 }
548 }
549 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
550 "Failed to build pipeline",
551 e,
552 )),
553 }
554 }
555 }
556 }
557 None => {
558 let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
562
563 let version =
564 to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
565
566 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
567
568 let value =
569 extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
570
571 check_data_valid(value.len())?;
572
573 let pipeline = handler
574 .get_pipeline(&pipeline_name, version, query_ctx.clone())
575 .await?;
576
577 dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
578 }
579 }
580}
581
582#[axum_macros::debug_handler]
583pub async fn log_ingester(
584 State(log_state): State<LogState>,
585 Query(query_params): Query<LogIngesterQueryParams>,
586 Extension(mut query_ctx): Extension<QueryContext>,
587 TypedHeader(content_type): TypedHeader<ContentType>,
588 headers: HeaderMap,
589 payload: Bytes,
590) -> Result<HttpResponse> {
591 let source = query_params.source.as_deref();
593 let response = match &log_state.log_validator {
594 Some(validator) => validator.validate(source, &payload).await,
595 None => None,
596 };
597 if let Some(response) = response {
598 return response;
599 }
600
601 let handler = log_state.log_handler;
602
603 let table_name = query_params.table.context(InvalidParameterSnafu {
604 reason: "table is required",
605 })?;
606
607 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
608
609 let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
610 reason: "pipeline_name is required",
611 })?;
612 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
613 let pipeline = PipelineDefinition::from_name(
614 &pipeline_name,
615 version,
616 query_params.custom_time_index.map(|s| (s, ignore_errors)),
617 )
618 .context(PipelineSnafu)?;
619
620 let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
621
622 query_ctx.set_channel(Channel::Http);
623 let query_ctx = Arc::new(query_ctx);
624
625 let value = log_state
626 .ingest_interceptor
627 .as_ref()
628 .pre_pipeline(value, query_ctx.clone())?;
629
630 ingest_logs_inner(
631 handler,
632 pipeline,
633 vec![PipelineIngestRequest {
634 table: table_name,
635 values: value,
636 }],
637 query_ctx,
638 headers,
639 )
640 .await
641}
642
643#[derive(Debug, EnumIter)]
644enum EventPayloadResolverInner {
645 Json,
646 Ndjson,
647 Text,
648}
649
650impl Display for EventPayloadResolverInner {
651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652 match self {
653 EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
654 EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
655 EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
656 }
657 }
658}
659
660impl TryFrom<&ContentType> for EventPayloadResolverInner {
661 type Error = Error;
662
663 fn try_from(content_type: &ContentType) -> Result<Self> {
664 match content_type {
665 x if *x == *JSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Json),
666 x if *x == *NDJSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Ndjson),
667 x if *x == *TEXT_CONTENT_TYPE || *x == *TEXT_UTF8_CONTENT_TYPE => {
668 Ok(EventPayloadResolverInner::Text)
669 }
670 _ => InvalidParameterSnafu {
671 reason: format!(
672 "invalid content type: {}, expected: one of {}",
673 content_type,
674 EventPayloadResolver::support_content_type_list().join(", ")
675 ),
676 }
677 .fail(),
678 }
679 }
680}
681
682#[derive(Debug)]
683struct EventPayloadResolver<'a> {
684 inner: EventPayloadResolverInner,
685 #[allow(dead_code)]
688 content_type: &'a ContentType,
689}
690
691impl EventPayloadResolver<'_> {
692 pub(super) fn support_content_type_list() -> Vec<String> {
693 EventPayloadResolverInner::iter()
694 .map(|x| x.to_string())
695 .collect()
696 }
697}
698
699impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
700 type Error = Error;
701
702 fn try_from(content_type: &'a ContentType) -> Result<Self> {
703 let inner = EventPayloadResolverInner::try_from(content_type)?;
704 Ok(EventPayloadResolver {
705 inner,
706 content_type,
707 })
708 }
709}
710
711impl EventPayloadResolver<'_> {
712 fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<PipelineMap>> {
713 match self.inner {
714 EventPayloadResolverInner::Json => {
715 pipeline::json_array_to_map(transform_ndjson_array_factory(
716 Deserializer::from_slice(&payload).into_iter(),
717 ignore_errors,
718 )?)
719 .context(PipelineSnafu)
720 }
721 EventPayloadResolverInner::Ndjson => {
722 let mut result = Vec::with_capacity(1000);
723 for (index, line) in payload.lines().enumerate() {
724 let mut line = match line {
725 Ok(line) if !line.is_empty() => line,
726 Ok(_) => continue, Err(_) if ignore_errors => continue,
728 Err(e) => {
729 warn!(e; "invalid string at index: {}", index);
730 return InvalidParameterSnafu {
731 reason: format!("invalid line at index: {}", index),
732 }
733 .fail();
734 }
735 };
736
737 if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
740 let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
741 result.push(v);
742 } else if !ignore_errors {
743 warn!("invalid JSON at index: {}, content: {:?}", index, line);
744 return InvalidParameterSnafu {
745 reason: format!("invalid JSON at index: {}", index),
746 }
747 .fail();
748 }
749 }
750 Ok(result)
751 }
752 EventPayloadResolverInner::Text => {
753 let result = payload
754 .lines()
755 .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
756 .map(|line| {
757 let mut map = PipelineMap::new();
758 map.insert("message".to_string(), pipeline::Value::String(line));
759 map
760 })
761 .collect::<Vec<_>>();
762 Ok(result)
763 }
764 }
765 }
766}
767
768fn extract_pipeline_value_by_content_type(
769 content_type: ContentType,
770 payload: Bytes,
771 ignore_errors: bool,
772) -> Result<Vec<PipelineMap>> {
773 EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
774 resolver
775 .parse_payload(payload, ignore_errors)
776 .map_err(|e| match &e {
777 Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
778 if reason.contains("invalid item:") {
779 InvalidParameterSnafu {
780 reason: "json format error, please check the date is valid JSON.",
781 }
782 .build()
783 } else {
784 e
785 }
786 }
787 _ => e,
788 })
789 })
790}
791
792pub(crate) async fn ingest_logs_inner(
793 handler: PipelineHandlerRef,
794 pipeline: PipelineDefinition,
795 log_ingest_requests: Vec<PipelineIngestRequest>,
796 query_ctx: QueryContextRef,
797 headers: HeaderMap,
798) -> Result<HttpResponse> {
799 let db = query_ctx.get_db_string();
800 let exec_timer = std::time::Instant::now();
801
802 let mut req = ContextReq::default();
803
804 let pipeline_params = GreptimePipelineParams::from_params(
805 headers
806 .get(GREPTIME_PIPELINE_PARAMS_HEADER)
807 .and_then(|v| v.to_str().ok()),
808 );
809
810 let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
811 for pipeline_req in log_ingest_requests {
812 let requests =
813 run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
814
815 req.merge(requests);
816 }
817
818 let mut outputs = Vec::new();
819 let mut total_rows: u64 = 0;
820 let mut fail = false;
821 for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
822 let output = handler.insert(act_req, temp_ctx).await;
823
824 if let Ok(Output {
825 data: OutputData::AffectedRows(rows),
826 meta: _,
827 }) = &output
828 {
829 total_rows += *rows as u64;
830 } else {
831 fail = true;
832 }
833 outputs.push(output);
834 }
835
836 if total_rows > 0 {
837 METRIC_HTTP_LOGS_INGESTION_COUNTER
838 .with_label_values(&[db.as_str()])
839 .inc_by(total_rows);
840 METRIC_HTTP_LOGS_INGESTION_ELAPSED
841 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
842 .observe(exec_timer.elapsed().as_secs_f64());
843 }
844 if fail {
845 METRIC_HTTP_LOGS_INGESTION_ELAPSED
846 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
847 .observe(exec_timer.elapsed().as_secs_f64());
848 }
849
850 let response = GreptimedbV1Response::from_output(outputs)
851 .await
852 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
853 Ok(response)
854}
855
856#[async_trait]
857pub trait LogValidator: Send + Sync {
858 async fn validate(&self, source: Option<&str>, payload: &Bytes)
861 -> Option<Result<HttpResponse>>;
862}
863
864pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
865
866#[derive(Clone)]
868pub struct LogState {
869 pub log_handler: PipelineHandlerRef,
870 pub log_validator: Option<LogValidatorRef>,
871 pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877
878 #[test]
879 fn test_transform_ndjson() {
880 let s = "{\"a\": 1}\n{\"b\": 2}";
881 let a = Value::Array(
882 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
883 )
884 .to_string();
885 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
886
887 let s = "{\"a\": 1}";
888 let a = Value::Array(
889 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
890 )
891 .to_string();
892 assert_eq!(a, "[{\"a\":1}]");
893
894 let s = "[{\"a\": 1}]";
895 let a = Value::Array(
896 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
897 )
898 .to_string();
899 assert_eq!(a, "[{\"a\":1}]");
900
901 let s = "[{\"a\": 1}, {\"b\": 2}]";
902 let a = Value::Array(
903 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
904 )
905 .to_string();
906 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
907 }
908
909 #[test]
910 fn test_extract_by_content() {
911 let payload = r#"
912 {"a": 1}
913 {"b": 2"}
914 {"c": 1}
915"#
916 .as_bytes();
917 let payload = Bytes::from_static(payload);
918
919 let fail_rest =
920 extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
921 assert!(fail_rest.is_ok());
922 assert_eq!(
923 fail_rest.unwrap(),
924 pipeline::json_array_to_map(vec![json!({"a": 1})]).unwrap()
925 );
926
927 let fail_only_wrong =
928 extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
929 assert!(fail_only_wrong.is_ok());
930
931 let mut map1 = PipelineMap::new();
932 map1.insert("a".to_string(), pipeline::Value::Uint64(1));
933 let mut map2 = PipelineMap::new();
934 map2.insert("c".to_string(), pipeline::Value::Uint64(1));
935 assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
936 }
937}