1use std::collections::BTreeMap;
16use std::fmt::Display;
17use std::io::BufRead;
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Instant;
21
22use api::helper::pb_value_to_value_ref;
23use async_trait::async_trait;
24use axum::body::Bytes;
25use axum::extract::{FromRequest, Multipart, Path, Query, Request, State};
26use axum::http::header::CONTENT_TYPE;
27use axum::http::{HeaderMap, StatusCode};
28use axum::response::{IntoResponse, Response};
29use axum::{Extension, Json};
30use axum_extra::TypedHeader;
31use common_catalog::consts::default_engine;
32use common_error::ext::{BoxedError, ErrorExt};
33use common_query::{Output, OutputData};
34use common_telemetry::{error, warn};
35use headers::ContentType;
36use lazy_static::lazy_static;
37use mime_guess::mime;
38use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create};
39use pipeline::util::to_pipeline_version;
40use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
41use serde::{Deserialize, Serialize};
42use serde_json::{Deserializer, Map, Value as JsonValue, json};
43use session::context::{Channel, QueryContext, QueryContextRef};
44use simd_json::Buffers;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::mito_engine_options::APPEND_MODE_KEY;
47use strum::{EnumIter, IntoEnumIterator};
48use table::table_reference::TableReference;
49use vrl::value::{KeyString, Value as VrlValue};
50
51use crate::error::{
52 Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
53 status_code_to_http_status,
54};
55use crate::http::HttpResponse;
56use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
57use crate::http::header::{
58 CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
59};
60use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput};
61use crate::http::result::greptime_result_v1::GreptimedbV1Response;
62use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
63use crate::metrics::{
64 METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED,
65 METRIC_SUCCESS_VALUE,
66};
67use crate::pipeline::run_pipeline;
68use crate::query_handler::PipelineHandlerRef;
69
70const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
71const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
72
73const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str =
74 "the pipeline has dispatcher or table_suffix, the table name may not be fixed";
75const CREATE_TABLE_SQL_TABLE_EXISTS: &str =
76 "table already exists, the CREATE TABLE SQL may be different";
77
78lazy_static! {
79 pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
80 pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
81 pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
82 pub static ref PB_CONTENT_TYPE: ContentType =
83 ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
84 pub static ref NDJSON_CONTENT_TYPE: ContentType =
85 ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
86}
87
88#[derive(Debug, Default, Serialize, Deserialize)]
90pub struct LogIngesterQueryParams {
91 pub db: Option<String>,
93 pub table: Option<String>,
95 pub pipeline_name: Option<String>,
97 pub version: Option<String>,
99 pub ignore_errors: Option<bool>,
101 pub source: Option<String>,
103 pub msg_field: Option<String>,
106 pub custom_time_index: Option<String>,
115 pub skip_error: Option<bool>,
121}
122
123#[derive(Debug, PartialEq)]
126pub(crate) struct PipelineIngestRequest {
127 pub table: String,
129 pub values: Vec<VrlValue>,
131}
132
133pub struct PipelineContent(String);
134
135impl<S> FromRequest<S> for PipelineContent
136where
137 S: Send + Sync,
138{
139 type Rejection = Response;
140
141 async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
142 let content_type_header = req.headers().get(CONTENT_TYPE);
143 let content_type = content_type_header.and_then(|value| value.to_str().ok());
144 if let Some(content_type) = content_type {
145 if content_type.ends_with("yaml") {
146 let payload = String::from_request(req, state)
147 .await
148 .map_err(IntoResponse::into_response)?;
149 return Ok(Self(payload));
150 }
151
152 if content_type.starts_with("multipart/form-data") {
153 let mut payload: Multipart = Multipart::from_request(req, state)
154 .await
155 .map_err(IntoResponse::into_response)?;
156 let file = payload
157 .next_field()
158 .await
159 .map_err(IntoResponse::into_response)?;
160 let payload = file
161 .ok_or(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())?
162 .text()
163 .await
164 .map_err(IntoResponse::into_response)?;
165 return Ok(Self(payload));
166 }
167 }
168
169 Err(StatusCode::UNSUPPORTED_MEDIA_TYPE.into_response())
170 }
171}
172
173#[axum_macros::debug_handler]
174pub async fn query_pipeline(
175 State(state): State<LogState>,
176 Extension(mut query_ctx): Extension<QueryContext>,
177 Query(query_params): Query<LogIngesterQueryParams>,
178 Path(pipeline_name): Path<String>,
179) -> Result<GreptimedbManageResponse> {
180 let start = Instant::now();
181 let handler = state.log_handler;
182 ensure!(
183 !pipeline_name.is_empty(),
184 InvalidParameterSnafu {
185 reason: "pipeline_name is required in path",
186 }
187 );
188
189 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
190
191 query_ctx.set_channel(Channel::Log);
192 let query_ctx = Arc::new(query_ctx);
193
194 let (pipeline, pipeline_version) = handler
195 .get_pipeline_str(&pipeline_name, version, query_ctx)
196 .await?;
197
198 Ok(GreptimedbManageResponse::from_pipeline(
199 pipeline_name,
200 query_params
201 .version
202 .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)),
203 start.elapsed().as_millis() as u64,
204 Some(pipeline),
205 ))
206}
207
208#[axum_macros::debug_handler]
210pub async fn query_pipeline_ddl(
211 State(state): State<LogState>,
212 Extension(mut query_ctx): Extension<QueryContext>,
213 Query(query_params): Query<LogIngesterQueryParams>,
214 Path(pipeline_name): Path<String>,
215) -> Result<GreptimedbManageResponse> {
216 let start = Instant::now();
217 let handler = state.log_handler;
218 ensure!(
219 !pipeline_name.is_empty(),
220 InvalidParameterSnafu {
221 reason: "pipeline_name is required in path",
222 }
223 );
224 ensure!(
225 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
226 InvalidParameterSnafu {
227 reason: "built-in pipelines don't have fixed table schema",
228 }
229 );
230 let table_name = query_params.table.context(InvalidParameterSnafu {
231 reason: "table name is required",
232 })?;
233
234 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
235
236 query_ctx.set_channel(Channel::Log);
237 let query_ctx = Arc::new(query_ctx);
238
239 let pipeline = handler
240 .get_pipeline(&pipeline_name, version, query_ctx.clone())
241 .await?;
242
243 let schemas_def = pipeline.schemas().context(InvalidParameterSnafu {
244 reason: "auto transform doesn't have fixed table schema",
245 })?;
246
247 let schema = query_ctx.current_schema();
248 let table_name_ref = TableReference {
249 catalog: query_ctx.current_catalog(),
250 schema: &schema,
251 table: &table_name,
252 };
253
254 let mut create_table_expr =
255 create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None)
256 .map_err(BoxedError::new)
257 .context(OtherSnafu)?;
258
259 create_table_expr
261 .table_options
262 .insert(APPEND_MODE_KEY.to_string(), "true".to_string());
263
264 let expr = expr_to_create(&create_table_expr, None)
265 .map_err(BoxedError::new)
266 .context(OtherSnafu)?;
267
268 let message = if handler.get_table(&table_name, &query_ctx).await?.is_some() {
269 Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
270 } else if pipeline.is_variant_table_name() {
271 Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
272 } else {
273 None
274 };
275
276 let sql = SqlOutput {
277 sql: format!("{:#}", expr),
278 message,
279 };
280
281 Ok(GreptimedbManageResponse::from_sql(
282 sql,
283 start.elapsed().as_millis() as u64,
284 ))
285}
286
287#[axum_macros::debug_handler]
288pub async fn add_pipeline(
289 State(state): State<LogState>,
290 Path(pipeline_name): Path<String>,
291 Extension(mut query_ctx): Extension<QueryContext>,
292 PipelineContent(payload): PipelineContent,
293) -> Result<GreptimedbManageResponse> {
294 let start = Instant::now();
295 let handler = state.log_handler;
296 ensure!(
297 !pipeline_name.is_empty(),
298 InvalidParameterSnafu {
299 reason: "pipeline_name is required in path",
300 }
301 );
302 ensure!(
303 !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
304 InvalidParameterSnafu {
305 reason: "pipeline_name cannot start with greptime_",
306 }
307 );
308 ensure!(
309 !payload.is_empty(),
310 InvalidParameterSnafu {
311 reason: "pipeline is required in body",
312 }
313 );
314
315 query_ctx.set_channel(Channel::Log);
316 let query_ctx = Arc::new(query_ctx);
317
318 let content_type = "yaml";
319 let result = handler
320 .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
321 .await;
322
323 result
324 .map(|pipeline| {
325 GreptimedbManageResponse::from_pipeline(
326 pipeline_name,
327 pipeline.0.to_timezone_aware_string(None),
328 start.elapsed().as_millis() as u64,
329 None,
330 )
331 })
332 .map_err(|e| {
333 error!(e; "failed to insert pipeline");
334 e
335 })
336}
337
338#[axum_macros::debug_handler]
339pub async fn delete_pipeline(
340 State(state): State<LogState>,
341 Extension(mut query_ctx): Extension<QueryContext>,
342 Query(query_params): Query<LogIngesterQueryParams>,
343 Path(pipeline_name): Path<String>,
344) -> Result<GreptimedbManageResponse> {
345 let start = Instant::now();
346 let handler = state.log_handler;
347 ensure!(
348 !pipeline_name.is_empty(),
349 InvalidParameterSnafu {
350 reason: "pipeline_name is required",
351 }
352 );
353
354 let version_str = query_params.version.context(InvalidParameterSnafu {
355 reason: "version is required",
356 })?;
357
358 let version = to_pipeline_version(Some(&version_str)).context(PipelineSnafu)?;
359
360 query_ctx.set_channel(Channel::Log);
361 let query_ctx = Arc::new(query_ctx);
362
363 handler
364 .delete_pipeline(&pipeline_name, version, query_ctx)
365 .await
366 .map(|v| {
367 if v.is_some() {
368 GreptimedbManageResponse::from_pipeline(
369 pipeline_name,
370 version_str,
371 start.elapsed().as_millis() as u64,
372 None,
373 )
374 } else {
375 GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64)
376 }
377 })
378 .map_err(|e| {
379 error!(e; "failed to delete pipeline");
380 e
381 })
382}
383
384fn transform_ndjson_array_factory(
387 values: impl IntoIterator<Item = Result<VrlValue, serde_json::Error>>,
388 ignore_error: bool,
389) -> Result<Vec<VrlValue>> {
390 values
391 .into_iter()
392 .try_fold(Vec::with_capacity(100), |mut acc_array, item| match item {
393 Ok(item_value) => {
394 match item_value {
395 VrlValue::Array(item_array) => {
396 acc_array.extend(item_array);
397 }
398 VrlValue::Object(_) => {
399 acc_array.push(item_value);
400 }
401 _ => {
402 if !ignore_error {
403 warn!("invalid item in array: {:?}", item_value);
404 return InvalidParameterSnafu {
405 reason: format!("invalid item: {} in array", item_value),
406 }
407 .fail();
408 }
409 }
410 }
411 Ok(acc_array)
412 }
413 Err(_) if !ignore_error => item.map(|x| vec![x]).context(ParseJsonSnafu),
414 Err(_) => {
415 warn!("invalid item in array: {:?}", item);
416 Ok(acc_array)
417 }
418 })
419}
420
421async fn dryrun_pipeline_inner(
423 value: Vec<VrlValue>,
424 pipeline: Arc<pipeline::Pipeline>,
425 pipeline_handler: PipelineHandlerRef,
426 query_ctx: &QueryContextRef,
427) -> Result<Response> {
428 let params = GreptimePipelineParams::default();
429
430 let pipeline_def = PipelineDefinition::Resolved(pipeline);
431 let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms, query_ctx.channel());
432 let results = run_pipeline(
433 &pipeline_handler,
434 &pipeline_ctx,
435 PipelineIngestRequest {
436 table: "dry_run".to_owned(),
437 values: value,
438 },
439 query_ctx,
440 true,
441 )
442 .await?;
443
444 let column_type_key = "column_type";
445 let data_type_key = "data_type";
446 let name_key = "name";
447
448 let results = results
449 .all_req()
450 .filter_map(|row| {
451 if let Some(rows) = row.rows {
452 let table_name = row.table_name;
453 let result_schema = rows.schema;
454
455 let schema = result_schema
456 .iter()
457 .map(|cs| {
458 let mut map = Map::new();
459 map.insert(
460 name_key.to_string(),
461 JsonValue::String(cs.column_name.clone()),
462 );
463 map.insert(
464 data_type_key.to_string(),
465 JsonValue::String(cs.datatype().as_str_name().to_string()),
466 );
467 map.insert(
468 column_type_key.to_string(),
469 JsonValue::String(cs.semantic_type().as_str_name().to_string()),
470 );
471 map.insert(
472 "fulltext".to_string(),
473 JsonValue::Bool(
474 cs.options
475 .clone()
476 .is_some_and(|x| x.options.contains_key("fulltext")),
477 ),
478 );
479 JsonValue::Object(map)
480 })
481 .collect::<Vec<_>>();
482
483 let rows = rows
484 .rows
485 .into_iter()
486 .map(|row| {
487 row.values
488 .into_iter()
489 .enumerate()
490 .map(|(idx, v)| {
491 let mut map = Map::new();
492 let value_ref = pb_value_to_value_ref(
493 &v,
494 result_schema[idx].datatype_extension.as_ref(),
495 );
496 let greptime_value: datatypes::value::Value = value_ref.into();
497 let serde_json_value =
498 serde_json::Value::try_from(greptime_value).unwrap();
499 map.insert("value".to_string(), serde_json_value);
500 map.insert("key".to_string(), schema[idx][name_key].clone());
501 map.insert(
502 "semantic_type".to_string(),
503 schema[idx][column_type_key].clone(),
504 );
505 map.insert(
506 "data_type".to_string(),
507 schema[idx][data_type_key].clone(),
508 );
509 JsonValue::Object(map)
510 })
511 .collect()
512 })
513 .collect();
514
515 let mut result = Map::new();
516 result.insert("schema".to_string(), JsonValue::Array(schema));
517 result.insert("rows".to_string(), JsonValue::Array(rows));
518 result.insert("table_name".to_string(), JsonValue::String(table_name));
519 let result = JsonValue::Object(result);
520 Some(result)
521 } else {
522 None
523 }
524 })
525 .collect();
526 Ok(Json(JsonValue::Array(results)).into_response())
527}
528
529#[derive(Debug, Default, Serialize, Deserialize)]
535pub struct PipelineDryrunParams {
536 pub pipeline_name: Option<String>,
537 pub pipeline_version: Option<String>,
538 pub pipeline: Option<String>,
539 pub data_type: Option<String>,
540 pub data: String,
541}
542
543fn check_pipeline_dryrun_params_valid(payload: &Bytes) -> Option<PipelineDryrunParams> {
547 match serde_json::from_slice::<PipelineDryrunParams>(payload) {
548 Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
550 Ok(_) => None,
552 Err(_) => None,
554 }
555}
556
557fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
559 pipeline_name.context(InvalidParameterSnafu {
560 reason: "pipeline_name is required",
561 })
562}
563
564fn check_data_valid(data_len: usize) -> Result<()> {
566 ensure!(
567 data_len <= 10,
568 InvalidParameterSnafu {
569 reason: "data is required",
570 }
571 );
572 Ok(())
573}
574
575fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
576 let body = Json(json!({
577 "error": format!("{}: {}", step_msg,e.output_msg()),
578 }));
579
580 (status_code_to_http_status(&e.status_code()), body).into_response()
581}
582
583fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<VrlValue>> {
587 if let Ok(content_type) = ContentType::from_str(&data_type) {
588 extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
589 } else {
590 InvalidParameterSnafu {
591 reason: format!(
592 "invalid content type: {}, expected: one of {}",
593 data_type,
594 EventPayloadResolver::support_content_type_list().join(", ")
595 ),
596 }
597 .fail()
598 }
599}
600
601#[axum_macros::debug_handler]
602pub async fn pipeline_dryrun(
603 State(log_state): State<LogState>,
604 Query(query_params): Query<LogIngesterQueryParams>,
605 Extension(mut query_ctx): Extension<QueryContext>,
606 TypedHeader(content_type): TypedHeader<ContentType>,
607 payload: Bytes,
608) -> Result<Response> {
609 let handler = log_state.log_handler;
610
611 query_ctx.set_channel(Channel::Log);
612 let query_ctx = Arc::new(query_ctx);
613
614 match check_pipeline_dryrun_params_valid(&payload) {
615 Some(params) => {
616 let data = parse_dryrun_data(
617 params.data_type.unwrap_or("application/json".to_string()),
618 params.data,
619 )?;
620
621 check_data_valid(data.len())?;
622
623 match params.pipeline {
624 None => {
625 let version = to_pipeline_version(params.pipeline_version.as_deref())
626 .context(PipelineSnafu)?;
627 let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
628 let pipeline = handler
629 .get_pipeline(&pipeline_name, version, query_ctx.clone())
630 .await?;
631 dryrun_pipeline_inner(data, pipeline, handler, &query_ctx).await
632 }
633 Some(pipeline) => {
634 let pipeline = handler.build_pipeline(&pipeline);
635 match pipeline {
636 Ok(pipeline) => {
637 match dryrun_pipeline_inner(
638 data,
639 Arc::new(pipeline),
640 handler,
641 &query_ctx,
642 )
643 .await
644 {
645 Ok(response) => Ok(response),
646 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
647 "Failed to exec pipeline",
648 e,
649 )),
650 }
651 }
652 Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
653 "Failed to build pipeline",
654 e,
655 )),
656 }
657 }
658 }
659 }
660 None => {
661 let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
665
666 let version =
667 to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
668
669 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
670
671 let value =
672 extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
673
674 check_data_valid(value.len())?;
675
676 let pipeline = handler
677 .get_pipeline(&pipeline_name, version, query_ctx.clone())
678 .await?;
679
680 dryrun_pipeline_inner(value, pipeline, handler, &query_ctx).await
681 }
682 }
683}
684
685pub(crate) fn extract_pipeline_params_map_from_headers(
686 headers: &HeaderMap,
687) -> ahash::HashMap<String, String> {
688 GreptimePipelineParams::parse_header_str_to_map(
689 headers
690 .get(GREPTIME_PIPELINE_PARAMS_HEADER)
691 .and_then(|v| v.to_str().ok()),
692 )
693}
694
695#[axum_macros::debug_handler]
696pub async fn log_ingester(
697 State(log_state): State<LogState>,
698 Query(query_params): Query<LogIngesterQueryParams>,
699 Extension(mut query_ctx): Extension<QueryContext>,
700 TypedHeader(content_type): TypedHeader<ContentType>,
701 headers: HeaderMap,
702 payload: Bytes,
703) -> Result<HttpResponse> {
704 let source = query_params.source.as_deref();
706 let response = match &log_state.log_validator {
707 Some(validator) => validator.validate(source, &payload).await,
708 None => None,
709 };
710 if let Some(response) = response {
711 return response;
712 }
713
714 let handler = log_state.log_handler;
715
716 let table_name = query_params.table.context(InvalidParameterSnafu {
717 reason: "table is required",
718 })?;
719
720 let ignore_errors = query_params.ignore_errors.unwrap_or(false);
721
722 let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
723 reason: "pipeline_name is required",
724 })?;
725 let skip_error = query_params.skip_error.unwrap_or(false);
726 let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
727 let pipeline = PipelineDefinition::from_name(
728 &pipeline_name,
729 version,
730 query_params.custom_time_index.map(|s| (s, ignore_errors)),
731 )
732 .context(PipelineSnafu)?;
733
734 let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
735
736 query_ctx.set_channel(Channel::Log);
737 let query_ctx = Arc::new(query_ctx);
738
739 let value = log_state
740 .ingest_interceptor
741 .as_ref()
742 .pre_pipeline(value, query_ctx.clone())?;
743
744 let mut pipeline_params_map = extract_pipeline_params_map_from_headers(&headers);
745 if !pipeline_params_map.contains_key(GREPTIME_PIPELINE_SKIP_ERROR_KEY) && skip_error {
746 pipeline_params_map.insert(GREPTIME_PIPELINE_SKIP_ERROR_KEY.to_string(), "true".into());
747 }
748 let pipeline_params = GreptimePipelineParams::from_map(pipeline_params_map);
749
750 ingest_logs_inner(
751 handler,
752 pipeline,
753 vec![PipelineIngestRequest {
754 table: table_name,
755 values: value,
756 }],
757 query_ctx,
758 pipeline_params,
759 )
760 .await
761}
762
763#[derive(Debug, EnumIter)]
764enum EventPayloadResolverInner {
765 Json,
766 Ndjson,
767 Text,
768}
769
770impl Display for EventPayloadResolverInner {
771 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
772 match self {
773 EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
774 EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
775 EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
776 }
777 }
778}
779
780impl TryFrom<&ContentType> for EventPayloadResolverInner {
781 type Error = Error;
782
783 fn try_from(content_type: &ContentType) -> Result<Self> {
784 let mime: mime_guess::Mime = content_type.clone().into();
785 match (mime.type_(), mime.subtype()) {
786 (mime::APPLICATION, mime::JSON) => Ok(EventPayloadResolverInner::Json),
787 (mime::APPLICATION, subtype) if subtype == CONTENT_TYPE_NDJSON_SUBTYPE_STR => {
788 Ok(EventPayloadResolverInner::Ndjson)
789 }
790 (mime::TEXT, mime::PLAIN) => Ok(EventPayloadResolverInner::Text),
791 _ => InvalidParameterSnafu {
792 reason: format!(
793 "invalid content type: {}, expected: one of {}",
794 content_type,
795 EventPayloadResolver::support_content_type_list().join(", ")
796 ),
797 }
798 .fail(),
799 }
800 }
801}
802
803#[derive(Debug)]
804struct EventPayloadResolver<'a> {
805 inner: EventPayloadResolverInner,
806 #[allow(dead_code)]
809 content_type: &'a ContentType,
810}
811
812impl EventPayloadResolver<'_> {
813 pub(super) fn support_content_type_list() -> Vec<String> {
814 EventPayloadResolverInner::iter()
815 .map(|x| x.to_string())
816 .collect()
817 }
818}
819
820impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
821 type Error = Error;
822
823 fn try_from(content_type: &'a ContentType) -> Result<Self> {
824 let inner = EventPayloadResolverInner::try_from(content_type)?;
825 Ok(EventPayloadResolver {
826 inner,
827 content_type,
828 })
829 }
830}
831
832impl EventPayloadResolver<'_> {
833 fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<VrlValue>> {
834 match self.inner {
835 EventPayloadResolverInner::Json => transform_ndjson_array_factory(
836 Deserializer::from_slice(&payload).into_iter(),
837 ignore_errors,
838 ),
839 EventPayloadResolverInner::Ndjson => {
840 let mut result = Vec::with_capacity(1000);
841 let mut buffer = Buffers::new(1000);
842 for (index, line) in payload.lines().enumerate() {
843 let mut line = match line {
844 Ok(line) if !line.is_empty() => line,
845 Ok(_) => continue, Err(_) if ignore_errors => continue,
847 Err(e) => {
848 warn!(e; "invalid string at index: {}", index);
849 return InvalidParameterSnafu {
850 reason: format!("invalid line at index: {}", index),
851 }
852 .fail();
853 }
854 };
855
856 if let Ok(v) = simd_json::serde::from_slice_with_buffers(
859 unsafe { line.as_bytes_mut() },
860 &mut buffer,
861 ) {
862 result.push(v);
863 } else if !ignore_errors {
864 warn!("invalid JSON at index: {}, content: {:?}", index, line);
865 return InvalidParameterSnafu {
866 reason: format!("invalid JSON at index: {}", index),
867 }
868 .fail();
869 }
870 }
871 Ok(result)
872 }
873 EventPayloadResolverInner::Text => {
874 let result = payload
875 .lines()
876 .filter_map(|line| line.ok().filter(|line| !line.is_empty()))
877 .map(|line| {
878 let mut map = BTreeMap::new();
879 map.insert(
880 KeyString::from("message"),
881 VrlValue::Bytes(Bytes::from(line)),
882 );
883 VrlValue::Object(map)
884 })
885 .collect::<Vec<_>>();
886 Ok(result)
887 }
888 }
889 }
890}
891
892fn extract_pipeline_value_by_content_type(
893 content_type: ContentType,
894 payload: Bytes,
895 ignore_errors: bool,
896) -> Result<Vec<VrlValue>> {
897 EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
898 resolver
899 .parse_payload(payload, ignore_errors)
900 .map_err(|e| match &e {
901 Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
902 if reason.contains("invalid item:") {
903 InvalidParameterSnafu {
904 reason: "json format error, please check the date is valid JSON.",
905 }
906 .build()
907 } else {
908 e
909 }
910 }
911 _ => e,
912 })
913 })
914}
915
916pub(crate) async fn ingest_logs_inner(
917 handler: PipelineHandlerRef,
918 pipeline: PipelineDefinition,
919 log_ingest_requests: Vec<PipelineIngestRequest>,
920 query_ctx: QueryContextRef,
921 pipeline_params: GreptimePipelineParams,
922) -> Result<HttpResponse> {
923 let db = query_ctx.get_db_string();
924 let exec_timer = std::time::Instant::now();
925
926 let mut req = ContextReq::default();
927
928 let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
929 for pipeline_req in log_ingest_requests {
930 let requests =
931 run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
932
933 req.merge(requests);
934 }
935
936 let mut outputs = Vec::new();
937 let mut total_rows: u64 = 0;
938 let mut fail = false;
939 for (temp_ctx, act_req) in req.as_req_iter(query_ctx) {
940 let output = handler.insert(act_req, temp_ctx).await;
941
942 if let Ok(Output {
943 data: OutputData::AffectedRows(rows),
944 meta: _,
945 }) = &output
946 {
947 total_rows += *rows as u64;
948 } else {
949 fail = true;
950 }
951 outputs.push(output);
952 }
953
954 if total_rows > 0 {
955 METRIC_HTTP_LOGS_INGESTION_COUNTER
956 .with_label_values(&[db.as_str()])
957 .inc_by(total_rows);
958 METRIC_HTTP_LOGS_INGESTION_ELAPSED
959 .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
960 .observe(exec_timer.elapsed().as_secs_f64());
961 }
962 if fail {
963 METRIC_HTTP_LOGS_INGESTION_ELAPSED
964 .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
965 .observe(exec_timer.elapsed().as_secs_f64());
966 }
967
968 let response = GreptimedbV1Response::from_output(outputs)
969 .await
970 .with_execution_time(exec_timer.elapsed().as_millis() as u64);
971 Ok(response)
972}
973
974#[async_trait]
975pub trait LogValidator: Send + Sync {
976 async fn validate(&self, source: Option<&str>, payload: &Bytes)
979 -> Option<Result<HttpResponse>>;
980}
981
982pub type LogValidatorRef = Arc<dyn LogValidator + 'static>;
983
984#[derive(Clone)]
986pub struct LogState {
987 pub log_handler: PipelineHandlerRef,
988 pub log_validator: Option<LogValidatorRef>,
989 pub ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
990}
991
992#[cfg(test)]
993mod tests {
994
995 use super::*;
996
997 #[test]
998 fn test_transform_ndjson() {
999 let s = "{\"a\": 1}\n{\"b\": 2}";
1000 let a = serde_json::to_string(
1001 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1002 )
1003 .unwrap();
1004 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1005
1006 let s = "{\"a\": 1}";
1007 let a = serde_json::to_string(
1008 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1009 )
1010 .unwrap();
1011 assert_eq!(a, "[{\"a\":1}]");
1012
1013 let s = "[{\"a\": 1}]";
1014 let a = serde_json::to_string(
1015 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1016 )
1017 .unwrap();
1018 assert_eq!(a, "[{\"a\":1}]");
1019
1020 let s = "[{\"a\": 1}, {\"b\": 2}]";
1021 let a = serde_json::to_string(
1022 &transform_ndjson_array_factory(Deserializer::from_str(s).into_iter(), false).unwrap(),
1023 )
1024 .unwrap();
1025 assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
1026 }
1027
1028 #[test]
1029 fn test_extract_by_content() {
1030 let payload = r#"
1031 {"a": 1}
1032 {"b": 2"}
1033 {"c": 1}
1034"#
1035 .as_bytes();
1036 let payload = Bytes::from_static(payload);
1037
1038 let fail_rest =
1039 extract_pipeline_value_by_content_type(ContentType::json(), payload.clone(), true);
1040 assert!(fail_rest.is_ok());
1041 assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1}).into()]);
1042
1043 let fail_only_wrong =
1044 extract_pipeline_value_by_content_type(NDJSON_CONTENT_TYPE.clone(), payload, true);
1045 assert!(fail_only_wrong.is_ok());
1046
1047 let mut map1 = BTreeMap::new();
1048 map1.insert(KeyString::from("a"), VrlValue::Integer(1));
1049 let map1 = VrlValue::Object(map1);
1050 let mut map2 = BTreeMap::new();
1051 map2.insert(KeyString::from("c"), VrlValue::Integer(1));
1052 let map2 = VrlValue::Object(map2);
1053 assert_eq!(fail_only_wrong.unwrap(), vec![map1, map2]);
1054 }
1055}