1use std::io::BufRead;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Instant;
19
20use api::v1::RowInsertRequests;
21use async_trait::async_trait;
22use axum::body::Bytes;
23use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
24use axum::http::header::CONTENT_TYPE;
25use axum::http::{HeaderMap, StatusCode};
26use axum::response::{IntoResponse, Response};
27use axum::{Extension, Json};
28use axum_extra::TypedHeader;
29use common_error::ext::ErrorExt;
30use common_query::{Output, OutputData};
31use common_telemetry::{error, warn};
32use datatypes::value::column_data_to_json;
33use headers::ContentType;
34use lazy_static::lazy_static;
35use pipeline::util::to_pipeline_version;
36use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap};
37use serde::{Deserialize, Serialize};
38use serde_json::{json, Deserializer, Map, Value};
39use session::context::{Channel, QueryContext, QueryContextRef};
40use snafu::{ensure, OptionExt, ResultExt};
41
42use crate::error::{
43 status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu,
44 Result, UnsupportedContentTypeSnafu,
45};
46use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
47use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
48use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
49use crate::http::result::greptime_result_v1::GreptimedbV1Response;
50use crate::http::HttpResponse;
51use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
52use crate::metrics::{
53 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
54 METRIC_SUCCESS_VALUE,
55};
56use crate::pipeline::run_pipeline;
57use crate::query_handler::PipelineHandlerRef;
58
59const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
60
61lazy_static! {
62 pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
63 pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
64 pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
65 pub static ref PB_CONTENT_TYPE: ContentType =
66 ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
67 pub static ref NDJSON_CONTENT_TYPE: ContentType =
68 ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
69}
70
71#[derive(Debug, Default, Serialize, Deserialize)]
73pub struct LogIngesterQueryParams {
74 pub db: Option<String>,
76 pub table: Option<String>,
78 pub pipeline_name: Option<String>,
80 pub version: Option<String>,
82 pub ignore_errors: Option<bool>,
84 pub source: Option<String>,
86 pub msg_field: Option<String>,
89 pub custom_time_index: Option<String>,
98}
99
100#[derive(Debug, PartialEq)]
103pub(crate) struct PipelineIngestRequest {
104 pub table: String,
106 pub values: Vec<PipelineMap>,
108}
109
110pub struct PipelineContent(String);
111
112impl<S> FromRequest<S> for PipelineContent
113where
114 S: Send + Sync,
115{
116 type Rejection = Response;
117
118 async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
119 let content_type_header = req.headers().get(CONTENT_TYPE);
120 let content_type = content_type_header.and_then(|value| value.to_str().ok());
121 if let Some(content_type) = content_type {
122 if content_type.ends_with("yaml") {
123 let payload = String::from_request(req, state)
124 .await
125 .map_err(IntoResponse::into_response)?;
126 return Ok(Self(payload));
127 }
128
129 if content_type.starts_with("multipart/form-data") {
130 let mut payload: Multipart = Multipart::from_request(req, state)
131 .await
132 .map_err(IntoResponse::into_response)?;
133 let file = payload
134 .next_field()
135 .await
136 .map_err(IntoResponse::into_response)?;
137 let payload = file
138 .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
139 .text()
140 .await
141 .map_err(IntoResponse::into_response)?;
142 return Ok(Self(payload));
143 }
144 }
145
146 Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
147 }
148}
149
150#[axum_macros::debug_handler]
151pub async fn query_pipeline(
152 State(state): State<LogState>,
153 Extension(mut query_ctx): Extension<QueryContext>,
154 Query(query_params): Query<LogIngesterQueryParams>,
155 Path(pipeline_name): Path<String>,
156) -> Result<GreptimedbManageResponse> {
157 let start = Instant::now();
158 let handler = state.log_handler;
159 ensure!(
160 !pipeline_name.is_empty(),
161 InvalidParameterSnafu {
162 reason: "pipeline_name is required in path",
163 }
164 );
165
166 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
167
168 query_ctx.set_channel(Channel::Http);
169 let query_ctx = Arc::new(query_ctx);
170
171 let (pipeline, pipeline_version) = handler
172 .get_pipeline_str(&pipeline_name, version, query_ctx)
173 .await?;
174
175 Ok(GreptimedbManageResponse::from_pipeline(
176 pipeline_name,
177 query_params
178 .version
179 .unwrap_or(pipeline_version.0.to_iso8601_string()),
180 start.elapsed().as_millis() as u64,
181 Some(pipeline),
182 ))
183}
184
185#[axum_macros::debug_handler]
186pub async fn add_pipeline(
187 State(state): State<LogState>,
188 Path(pipeline_name): Path<String>,
189 Extension(mut query_ctx): Extension<QueryContext>,
190 PipelineContent(payload): PipelineContent,
191) -> Result<GreptimedbManageResponse> {
192 let start = Instant::now();
193 let handler = state.log_handler;
194 ensure!(
195 !pipeline_name.is_empty(),
196 InvalidParameterSnafu {
197 reason: "pipeline_name is required in path",
198 }
199 );
200 ensure!(
201 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
202 InvalidParameterSnafu {
203 reason: "pipeline_name cannot start with greptime_",
204 }
205 );
206 ensure!(
207 !payload.is_empty(),
208 InvalidParameterSnafu {
209 reason: "pipeline is required in body",
210 }
211 );
212
213 query_ctx.set_channel(Channel::Http);
214 let query_ctx = Arc::new(query_ctx);
215
216 let content_type = "yaml";
217 let result = handler
218 .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
219 .await;
220
221 result
222 .map(|pipeline| {
223 GreptimedbManageResponse::from_pipeline(
224 pipeline_name,
225 pipeline.0.to_timezone_aware_string(None),
226 start.elapsed().as_millis() as u64,
227 None,
228 )
229 })
230 .map_err(|e| {
231 error!(e; "failed to insert pipeline");
232 e
233 })
234}
235
236#[axum_macros::debug_handler]
237pub async fn delete_pipeline(
238 State(state): State<LogState>,
239 Extension(mut query_ctx): Extension<QueryContext>,
240 Query(query_params): Query<LogIngesterQueryParams>,
241 Path(pipeline_name): Path<String>,
242) -> Result<GreptimedbManageResponse> {
243 let start = Instant::now();
244 let handler = state.log_handler;
245 ensure!(
246 !pipeline_name.is_empty(),
247 InvalidParameterSnafu {
248 reason: "pipeline_name is required",
249 }
250 );
251
252 let version_str = query_params.version.context(InvalidParameterSnafu {
253 reason: "version is required",
254 })?;
255
256 let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
257
258 query_ctx.set_channel(Channel::Http);
259 let query_ctx = Arc::new(query_ctx);
260
261 handler
262 .delete_pipeline(&pipeline_name, version, query_ctx)
263 .await
264 .map(|v| {
265 if v.is_some() {
266 GreptimedbManageResponse::from_pipeline(
267 pipeline_name,
268 version_str,
269 start.elapsed().as_millis() as u64,
270 None,
271 )
272 } else {
273 GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
274 }
275 })
276 .map_err(|e| {
277 error!(e; "failed to delete pipeline");
278 e
279 })
280}
281
282fn transform_ndjson_array_factory(
285 values: impl IntoIterator<Item = Result<Value, serde_json::Error>>,
286 ignore_error: bool,
287) -> Result<Vec<Value>> {
288 values
289 .into_iter()
290 .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
291 Ok(item_value) => {
292 match item_value {
293 Value::Array(item_array) => {
294 acc_array.extend(item_array);
295 }
296 Value::Object(_) => {
297 acc_array.push(item_value);
298 }
299 _ => {
300 if !ignore_error {
301 warn!("invalid item in array: {:?}", item_value);
302 return InvalidParameterSnafu {
303 reason: format!("invalid item:{} in array", item_value),
304 }
305 .fail();
306 }
307 }
308 }
309 Ok(acc_array)
310 }
311 Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
312 Err(_) => {
313 warn!("invalid item in array: {:?}", item);
314 Ok(acc_array)
315 }
316 })
317}
318
319async fn dryrun_pipeline_inner(
321 value: Vec<PipelineMap>,
322 pipeline: Arc<pipeline::Pipeline>,
323 pipeline_handler: PipelineHandlerRef,
324 query_ctx: &QueryContextRef,
325) -> Result<Response> {
326 let params = GreptimePipelineParams::default();
327
328 let pipeline_def = PipelineDefinition::Resolved(pipeline);
329 let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms);
330 let results = run_pipeline(
331 &pipeline_handler,
332 &pipeline_ctx,
333 PipelineIngestRequest {
334 table: "dry_run".to_owned(),
335 values: value,
336 },
337 query_ctx,
338 true,
339 )
340 .await?;
341
342 let colume_type_key = "colume_type";
343 let data_type_key = "data_type";
344 let name_key = "name";
345
346 let results = results
347 .into_iter()
348 .filter_map(|row| {
349 if let Some(rows) = row.rows {
350 let table_name = row.table_name;
351 let schema = rows.schema;
352
353 let schema = schema
354 .iter()
355 .map(|cs| {
356 let mut map = Map::new();
357 map.insert(name_key.to_string(), Value::String(cs.column_name.clone()));
358 map.insert(
359 data_type_key.to_string(),
360 Value::String(cs.datatype().as_str_name().to_string()),
361 );
362 map.insert(
363 colume_type_key.to_string(),
364 Value::String(cs.semantic_type().as_str_name().to_string()),
365 );
366 map.insert(
367 "fulltext".to_string(),
368 Value::Bool(
369 cs.options
370 .clone()
371 .is_some_and(|x| x.options.contains_key("fulltext")),
372 ),
373 );
374 Value::Object(map)
375 })
376 .collect::<Vec<_>>();
377
378 let rows = rows
379 .rows
380 .into_iter()
381 .map(|row| {
382 row.values
383 .into_iter()
384 .enumerate()
385 .map(|(idx, v)| {
386 v.value_data
387 .map(|d| {
388 let mut map = Map::new();
389 map.insert("value".to_string(), column_data_to_json(d));
390 map.insert(
391 "key".to_string(),
392 schema[idx][name_key].clone(),
393 );
394 map.insert(
395 "semantic_type".to_string(),
396 schema[idx][colume_type_key].clone(),
397 );
398 map.insert(
399 "data_type".to_string(),
400 schema[idx][data_type_key].clone(),
401 );
402 Value::Object(map)
403 })
404 .unwrap_or(Value::Null)
405 })
406 .collect()
407 })
408 .collect();
409
410 let mut result = Map::new();
411 result.insert("schema".to_string(), Value::Array(schema));
412 result.insert("rows".to_string(), Value::Array(rows));
413 result.insert("table_name".to_string(), Value::String(table_name));
414 let result = Value::Object(result);
415 Some(result)
416 } else {
417 None
418 }
419 })
420 .collect();
421 Ok(Json(Value::Array(results)).into_response())
422}
423
424#[derive(Debug, Default, Serialize, Deserialize)]
430pub struct PipelineDryrunParams {
431 pub pipeline_name: Option<String>,
432 pub pipeline_version: Option<String>,
433 pub pipeline: Option<String>,
434 pub data: Vec<Value>,
435}
436
437fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
441 match serde_json::from_slice::<PipelineDryrunParams>(payload) {
442 Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
444 Ok(_) => None,
446 Err(_) => None,
448 }
449}
450
451fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
453 pipeline_name.context(InvalidParameterSnafu {
454 reason: "pipeline_name is required",
455 })
456}
457
458fn check_data_valid(data_len: usize) -> Result<()> {
460 ensure!(
461 data_len <= 10,
462 InvalidParameterSnafu {
463 reason: "data is required",
464 }
465 );
466 Ok(())
467}
468
469fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
470 let body = Json(json!({
471 "error": format!("{}: {}", step_msg,e.output_msg()),
472 }));
473
474 (status_code_to_http_status(&e.status_code()), body).into_response()
475}
476
477#[axum_macros::debug_handler]
478pub async fn pipeline_dryrun(
479 State(log_state): State<LogState>,
480 Query(query_params): Query<LogIngesterQueryParams>,
481 Extension(mut query_ctx): Extension<QueryContext>,
482 TypedHeader(content_type): TypedHeader<ContentType>,
483 payload: Bytes,
484) -> Result<Response> {
485 let handler = log_state.log_handler;
486
487 query_ctx.set_channel(Channel::Http);
488 let query_ctx = Arc::new(query_ctx);
489
490 match check_pipeline_dryrun_params_valid(&payload) {
491 Some(params) => {
492 let data = pipeline::json_array_to_map(params.data).context(PipelineSnafu)?;
493
494 check_data_valid(data.len())?;
495
496 match params.pipeline {
497 None => {
498 let version = to_pipeline_version(params.pipeline_version.as_deref())
499 .context(PipelineSnafu)?;
500 let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
501 let pipeline = handler
502 .get_pipeline(&pipeline_name, version, query_ctx.clone())
503 .await?;
504 dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
505 }
506 Some(pipeline) => {
507 let pipeline = handler.build_pipeline(&pipeline);
508 match pipeline {
509 Ok(pipeline) => {
510 match dryrun_pipeline_inner(
511 data,
512 Arc::new(pipeline),
513 handler,
514 &query_ctx,
515 )
516 .await
517 {
518 Ok(response) => Ok(response),
519 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
520 "Failed to exec pipeline",
521 e,
522 )),
523 }
524 }
525 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
526 "Failed to build pipeline",
527 e,
528 )),
529 }
530 }
531 }
532 }
533 None => {
534 let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
538
539 let version =
540 to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
541
542 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
543
544 let value =
545 extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
546
547 check_data_valid(value.len())?;
548
549 let pipeline = handler
550 .get_pipeline(&pipeline_name, version, query_ctx.clone())
551 .await?;
552
553 dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
554 }
555 }
556}
557
558#[axum_macros::debug_handler]
559pub async fn log_ingester(
560 State(log_state): State<LogState>,
561 Query(query_params): Query<LogIngesterQueryParams>,
562 Extension(mut query_ctx): Extension<QueryContext>,
563 TypedHeader(content_type): TypedHeader<ContentType>,
564 headers: HeaderMap,
565 payload: Bytes,
566) -> Result<HttpResponse> {
567 let source = query_params.source.as_deref();
569 let response = match &log_state.log_validator {
570 Some(validator) => validator.validate(source, &payload).await,
571 None => None,
572 };
573 if let Some(response) = response {
574 return response;
575 }
576
577 let handler = log_state.log_handler;
578
579 let table_name = query_params.table.context(InvalidParameterSnafu {
580 reason: "table is required",
581 })?;
582
583 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
584
585 let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
586 reason: "pipeline_name is required",
587 })?;
588 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
589 let pipeline = PipelineDefinition::from_name(
590 &pipeline_name,
591 version,
592 query_params.custom_time_index.map(|s| (s, ignore_errors)),
593 )
594 .context(PipelineSnafu)?;
595
596 let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
597
598 query_ctx.set_channel(Channel::Http);
599 let query_ctx = Arc::new(query_ctx);
600
601 let value = log_state
602 .ingest_interceptor
603 .as_ref()
604 .pre_pipeline(value, query_ctx.clone())?;
605
606 ingest_logs_inner(
607 handler,
608 pipeline,
609 vec![PipelineIngestRequest {
610 table: table_name,
611 values: value,
612 }],
613 query_ctx,
614 headers,
615 )
616 .await
617}
618
619fn extract_pipeline_value_by_content_type(
620 content_type: ContentType,
621 payload: Bytes,
622 ignore_errors: bool,
623) -> Result<Vec<PipelineMap>> {
624 Ok(match content_type {
625 ct if ct == *JSON_CONTENT_TYPE => {
626 pipeline::json_array_to_map(transform_ndjson_array_factory(
628 Deserializer::from_slice(&payload).into_iter(),
629 ignore_errors,
630 )?)
631 .context(PipelineSnafu)?
632 }
633 ct if ct == *NDJSON_CONTENT_TYPE => {
634 let mut result = Vec::with_capacity(1000);
635 for (index, line) in payload.lines().enumerate() {
636 let mut line = match line {
637 Ok(line) if !line.is_empty() => line,
638 Ok(_) => continue, Err(_) if ignore_errors => continue,
640 Err(e) => {
641 warn!(e; "invalid string at index: {}", index);
642 return InvalidParameterSnafu {
643 reason: format!("invalid line at index: {}", index),
644 }
645 .fail();
646 }
647 };
648
649 if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
652 let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
653 result.push(v);
654 } else if !ignore_errors {
655 warn!("invalid JSON at index: {}, content: {:?}", index, line);
656 return InvalidParameterSnafu {
657 reason: format!("invalid JSON at index: {}", index),
658 }
659 .fail();
660 }
661 }
662 result
663 }
664 ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
665 .lines()
666 .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
667 .map(|line| {
668 let mut map = PipelineMap::new();
669 map.insert("message".to_string(), pipeline::Value::String(line));
670 map
671 })
672 .collect::<Vec<_>>(),
673
674 _ => UnsupportedContentTypeSnafu { content_type }.fail()?,
675 })
676}
677
678pub(crate) async fn ingest_logs_inner(
679 handler: PipelineHandlerRef,
680 pipeline: PipelineDefinition,
681 log_ingest_requests: Vec<PipelineIngestRequest>,
682 query_ctx: QueryContextRef,
683 headers: HeaderMap,
684) -> Result<HttpResponse> {
685 let db = query_ctx.get_db_string();
686 let exec_timer = std::time::Instant::now();
687
688 let mut insert_requests = Vec::with_capacity(log_ingest_requests.len());
689
690 let pipeline_params = GreptimePipelineParams::from_params(
691 headers
692 .get(GREPTIME_PIPELINE_PARAMS_HEADER)
693 .and_then(|v| v.to_str().ok()),
694 );
695
696 let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params);
697 for pipeline_req in log_ingest_requests {
698 let requests =
699 run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
700
701 insert_requests.extend(requests);
702 }
703
704 let output = handler
705 .insert(
706 RowInsertRequests {
707 inserts: insert_requests,
708 },
709 query_ctx,
710 )
711 .await;
712
713 if let Ok(Output {
714 data: OutputData::AffectedRows(rows),
715 meta: _,
716 }) = &output
717 {
718 METRIC_HTTP_LOGS_INGESTION_COUNTER
719 .with_label_values(&[db.as_str()])
720 .inc_by(*rows as u64);
721 METRIC_HTTP_LOGS_INGESTION_ELAPSED
722 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
723 .observe(exec_timer.elapsed().as_secs_f64());
724 } else {
725 METRIC_HTTP_LOGS_INGESTION_ELAPSED
726 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
727 .observe(exec_timer.elapsed().as_secs_f64());
728 }
729
730 let response = GreptimedbV1Response::from_output(vec![output])
731 .await
732 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
733 Ok(response)
734}
735
736#[async_trait]
737pub trait LogValidator: Send + Sync {
738 async fn validate(&self, source: Option<&str>, payload: &Bytes)
741 -> Option<Result<HttpResponse>>;
742}
743
744pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
745
746#[derive(Clone)]
748pub struct LogState {
749 pub log_handler: PipelineHandlerRef,
750 pub log_validator: Option<LogValidatorRef>,
751 pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757
758 #[test]
759 fn test_transform_ndjson() {
760 let s = "{\"a\": 1}\n{\"b\": 2}";
761 let a = Value::Array(
762 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
763 )
764 .to_string();
765 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
766
767 let s = "{\"a\": 1}";
768 let a = Value::Array(
769 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
770 )
771 .to_string();
772 assert_eq!(a, "[{\"a\":1}]");
773
774 let s = "[{\"a\": 1}]";
775 let a = Value::Array(
776 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
777 )
778 .to_string();
779 assert_eq!(a, "[{\"a\":1}]");
780
781 let s = "[{\"a\": 1}, {\"b\": 2}]";
782 let a = Value::Array(
783 transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
784 )
785 .to_string();
786 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
787 }
788
789 #[test]
790 fn test_extract_by_content() {
791 let payload = r#"
792 {"a": 1}
793 {"b": 2"}
794 {"c": 1}
795"#
796 .as_bytes();
797 let payload = Bytes::from_static(payload);
798
799 let fail_rest =
800 extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
801 assert!(fail_rest.is_ok());
802 assert_eq!(
803 fail_rest.unwrap(),
804 pipeline::json_array_to_map(vec![json!({"a": 1})]).unwrap()
805 );
806
807 let fail_only_wrong =
808 extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
809 assert!(fail_only_wrong.is_ok());
810
811 let mut map1 = PipelineMap::new();
812 map1.insert("a".to_string(), pipeline::Value::Uint64(1));
813 let mut map2 = PipelineMap::new();
814 map2.insert("c".to_string(), pipeline::Value::Uint64(1));
815 assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
816 }
817}