1use std::collections::{BTreeMap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Instant;
23
24use api::v1::SemanticType;
25use axum::Extension;
26use axum::extract::{Query, State};
27use axum::http::{HeaderMap, StatusCode};
28use axum::response::IntoResponse;
29use bytes::Bytes;
30use chrono::{DateTime, Utc};
31use common_base::regex_pattern::NAME_PATTERN_REG;
32use common_error::ext::ErrorExt;
33use common_query::prelude::greptime_timestamp;
34use common_telemetry::{debug, error};
35use operator::insert::SPLUNK_PK_METADATA_ORDER_KEY;
36use pipeline::{
37 ContextReq, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GreptimePipelineParams, PipelineContext,
38 PipelineDefinition,
39};
40use serde_json::{Deserializer, json};
41use session::context::{Channel, QueryContext, QueryContextRef};
42use vrl::value::{KeyString, Value as VrlValue};
43
44use crate::error::{Result, status_code_to_http_status};
45use crate::http::HttpResponse;
46use crate::http::event::{
47 LogIngesterQueryParams, LogState, PipelineIngestRequest, execute_log_context_req,
48 extract_pipeline_params_map_from_headers, transform_ndjson_array_factory,
49};
50use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
51use crate::metrics::{METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED};
52use crate::pipeline::run_pipeline;
53use crate::query_handler::PipelineHandlerRef;
54
55const DEFAULT_SPLUNK_TABLE: &str = "splunk_logs";
58const HEC_HEALTHY_CODE: u32 = 17;
61
62fn hec_response(status: StatusCode, code: u32, text: &str) -> axum::response::Response {
64 (status, axum::Json(json!({ "text": text, "code": code }))).into_response()
65}
66
67fn parse_hec_events(body: &[u8]) -> Result<Vec<VrlValue>> {
70 let values = Deserializer::from_slice(body).into_iter::<VrlValue>();
71 transform_ndjson_array_factory(values, false)
73}
74
75fn parse_hec_time(value: &VrlValue) -> Option<DateTime<Utc>> {
78 let n: f64 = match value {
79 VrlValue::Integer(i) => *i as f64,
80 VrlValue::Float(f) => f.into_inner(),
81 VrlValue::Bytes(b) => std::str::from_utf8(b).ok()?.trim().parse().ok()?,
82 VrlValue::Timestamp(dt) => return Some(*dt),
83 _ => return None,
84 };
85 if !n.is_finite() {
86 return None;
87 }
88 const MILLIS_THRESHOLD: f64 = 1e12;
89 if n >= MILLIS_THRESHOLD {
91 DateTime::from_timestamp_millis(n as i64)
92 } else {
93 let secs = n.floor() as i64;
94 let nsecs = ((n - n.floor()) * 1e9) as u32;
95 DateTime::from_timestamp(secs, nsecs)
96 }
97}
98
99fn validate_event(event: &VrlValue) -> Option<(u32, &'static str)> {
102 let VrlValue::Object(obj) = event else {
103 return None;
104 };
105 match obj.get("event") {
106 None => return Some((12, "Event field is required")),
107 Some(value) if is_blank_event(value) => return Some((13, "Event field cannot be blank")),
108 _ => {}
109 }
110 if let Some(time) = obj.get("time")
111 && !matches!(time, VrlValue::Null)
112 && parse_hec_time(time).is_none()
113 {
114 return Some((6, "invalid data format"));
115 }
116 None
117}
118
119fn is_blank_event(value: &VrlValue) -> bool {
121 match value {
122 VrlValue::Null => true,
123 VrlValue::Bytes(b) => std::str::from_utf8(b).is_ok_and(|s| s.trim().is_empty()),
124 _ => false,
125 }
126}
127
128fn hec_event_to_map(
132 event: VrlValue,
133 query_table: Option<&str>,
134) -> Option<(String, VrlValue, Vec<String>)> {
135 let mut obj = match event {
136 VrlValue::Object(obj) => obj,
137 other => {
138 debug!("skipping non-object splunk HEC event: {other:?}");
139 return None;
140 }
141 };
142
143 let ts = obj
145 .remove("time")
146 .as_ref()
147 .and_then(parse_hec_time)
148 .unwrap_or_else(Utc::now);
149
150 let index = match obj.remove("index") {
152 Some(VrlValue::Bytes(b)) => Some(String::from_utf8_lossy(&b).into_owned()),
153 _ => None,
154 };
155 let table = index
156 .as_deref()
157 .and_then(sanitize_index)
158 .or_else(|| query_table.map(str::to_string))
159 .unwrap_or_else(|| DEFAULT_SPLUNK_TABLE.to_string());
160
161 let mut map: BTreeMap<KeyString, VrlValue> = BTreeMap::new();
162 map.insert(
163 KeyString::from(greptime_timestamp()),
164 VrlValue::Timestamp(ts),
165 );
166
167 let mut tag_names: Vec<String> = Vec::new();
168
169 if let Some(VrlValue::Object(fields)) = obj.remove("fields") {
171 for (k, v) in fields {
172 tag_names.push(k.as_str().to_string());
173 map.insert(k, v);
174 }
175 }
176
177 for key in ["host", "source", "sourcetype"] {
179 if let Some(v) = obj.remove(key) {
180 tag_names.push(key.to_string());
181 map.insert(KeyString::from(key), v);
182 }
183 }
184
185 for (k, v) in obj {
187 map.insert(k, v);
188 }
189
190 Some((table, VrlValue::Object(map), tag_names))
191}
192
193fn apply_tag_columns(
198 ctx_req: ContextReq,
199 tag_columns: &HashMap<String, HashSet<String>>,
200) -> ContextReq {
201 let mut reqs = ctx_req.all_req().collect::<Vec<_>>();
202 for req in &mut reqs {
203 let Some(rows) = req.rows.as_mut() else {
204 continue;
205 };
206 let Some(tags) = tag_columns.get(&req.table_name) else {
207 continue;
208 };
209 for col in &mut rows.schema {
210 if tags.contains(&col.column_name) {
211 col.semantic_type = SemanticType::Tag as i32;
212 }
213 }
214 }
215 ContextReq::default_opt_with_reqs(reqs)
216}
217
218fn sanitize_index(raw: &str) -> Option<String> {
220 let trimmed = raw.trim();
221 if trimmed.is_empty() {
222 return None;
223 }
224 if NAME_PATTERN_REG.is_match(trimmed) {
225 return Some(trimmed.to_string());
226 }
227 let mut out = String::with_capacity(trimmed.len());
228 for c in trimmed.chars() {
229 if c.is_ascii_alphanumeric() || matches!(c, '_' | ':' | '-' | '.' | '@' | '#') {
231 out.push(c);
232 } else {
233 out.push('_'); }
235 }
236
237 let first_ok = out
238 .chars()
239 .next()
240 .map(|c| c.is_ascii_alphabetic() || matches!(c, '_' | ':' | '-'))
241 .unwrap_or(false);
242
243 if !first_ok {
244 out.insert(0, '_');
245 }
246
247 NAME_PATTERN_REG.is_match(&out).then_some(out)
248}
249
250pub(crate) fn is_splunk_request<B>(req: &axum::extract::Request<B>) -> bool {
251 req.uri().path().starts_with("/v1/splunk/")
253}
254async fn ingest_events(
256 handler: PipelineHandlerRef,
257 pipeline: PipelineDefinition,
258 requests: Vec<PipelineIngestRequest>,
259 query_ctx: QueryContextRef,
260 pipeline_params: GreptimePipelineParams,
261 tag_columns: HashMap<String, HashSet<String>>,
262 apply_tags: bool,
263) -> Result<HttpResponse> {
264 let exec_timer = Instant::now();
265 let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
266
267 let mut ctx_req = ContextReq::default();
268 for req in requests {
269 ctx_req.merge(run_pipeline(&handler, &pipeline_ctx, req, &query_ctx, true).await?);
270 }
271
272 let ctx_req = if apply_tags {
273 apply_tag_columns(ctx_req, &tag_columns)
274 } else {
275 ctx_req
276 };
277
278 execute_log_context_req(
279 handler,
280 ctx_req,
281 query_ctx,
282 exec_timer,
283 &METRIC_HTTP_LOGS_INGESTION_COUNTER,
284 &METRIC_HTTP_LOGS_INGESTION_ELAPSED,
285 )
286 .await
287}
288
289#[axum_macros::debug_handler]
292pub async fn handle_health() -> impl IntoResponse {
293 hec_response(StatusCode::OK, HEC_HEALTHY_CODE, "HEC is healthy")
294}
295
296#[axum_macros::debug_handler]
300pub async fn handle_event(
301 State(log_state): State<LogState>,
302 Query(params): Query<LogIngesterQueryParams>,
303 Extension(mut query_ctx): Extension<QueryContext>,
304 headers: HeaderMap,
305 payload: Bytes,
306) -> impl IntoResponse {
307 query_ctx.set_channel(Channel::Splunk);
308 let events = match parse_hec_events(&payload) {
309 Ok(events) => events,
310 Err(_) => return hec_response(StatusCode::BAD_REQUEST, 6, "invalid data format"),
312 };
313 if events.is_empty() {
314 return hec_response(StatusCode::BAD_REQUEST, 5, "No data");
316 }
317
318 let query_table = params.table.as_deref();
320 let mut by_table: HashMap<String, Vec<VrlValue>> = HashMap::new();
321 let mut tag_columns: HashMap<String, HashSet<String>> = HashMap::new();
322 for event in events {
323 if let Some((code, text)) = validate_event(&event) {
326 return hec_response(StatusCode::BAD_REQUEST, code, text);
327 }
328 if let Some((table, map, tags)) = hec_event_to_map(event, query_table) {
329 tag_columns.entry(table.clone()).or_default().extend(tags);
330 by_table.entry(table).or_default().push(map);
331 }
332 }
333 let requests: Vec<PipelineIngestRequest> = by_table
334 .into_iter()
335 .map(|(table, values)| PipelineIngestRequest { table, values })
336 .collect();
337
338 if requests.is_empty() {
340 return hec_response(StatusCode::BAD_REQUEST, 6, "invalid data format");
341 }
342
343 if let Some(bad) = requests
345 .iter()
346 .find(|r| !NAME_PATTERN_REG.is_match(&r.table))
347 {
348 let msg = format!("incorrect index: {}", bad.table);
349 return hec_response(StatusCode::BAD_REQUEST, 7, &msg);
350 }
351
352 let pipeline_name = params.pipeline_name.clone().unwrap_or_else(|| {
354 headers
355 .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
356 .and_then(|v| v.to_str().ok())
357 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
358 .to_string()
359 });
360 let apply_tags = pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME;
362 if apply_tags {
363 query_ctx.set_extension(SPLUNK_PK_METADATA_ORDER_KEY, "true");
366 }
367 let custom_time_index = Some((format!("{};epoch;ns", greptime_timestamp()), false));
369 let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, custom_time_index) {
370 Ok(pipeline) => pipeline,
371 Err(_) => return hec_response(StatusCode::INTERNAL_SERVER_ERROR, 8, "pipeline error"),
372 };
373 let pipeline_params =
374 GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
375
376 match ingest_events(
377 log_state.log_handler,
378 pipeline,
379 requests,
380 Arc::new(query_ctx),
381 pipeline_params,
382 tag_columns,
383 apply_tags,
384 )
385 .await
386 {
387 Ok(_) => hec_response(StatusCode::OK, 0, "Success"),
389 Err(e) => {
390 error!(e; "failed to ingest splunk hec events");
391 let status = status_code_to_http_status(&e.status_code());
393 let code = if status.is_client_error() { 6 } else { 8 };
394 let msg = e.to_string();
395 hec_response(status, code, &msg)
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 fn events_for(body: &[u8]) -> Vec<VrlValue> {
405 parse_hec_events(body).unwrap()
406 }
407
408 #[test]
409 fn parses_single_object() {
410 let events = events_for(br#"{"event":"hello","time":1}"#);
411 assert_eq!(events, vec![json!({"event":"hello","time":1}).into()]);
412 }
413
414 #[test]
415 fn parses_concatenated_objects_without_separator() {
416 let events = events_for(br#"{"event":"a"}{"event":"b"}"#);
417 assert_eq!(
418 events,
419 vec![json!({"event":"a"}).into(), json!({"event":"b"}).into()]
420 );
421 }
422
423 #[test]
424 fn parses_newline_and_whitespace_separated_objects() {
425 let events = events_for(b"{\"event\":\"a\"}\n {\"event\":\"b\"}\t{\"event\":\"c\"}");
427 assert_eq!(events.len(), 3);
428 }
429
430 #[test]
431 fn parses_top_level_array_into_flat_events() {
432 let events = events_for(br#"[{"event":"a"},{"event":"b"}]"#);
433 assert_eq!(
434 events,
435 vec![json!({"event":"a"}).into(), json!({"event":"b"}).into()]
436 );
437 }
438
439 #[test]
440 fn flattens_mixed_array_and_trailing_object() {
441 let events = events_for(br#"[{"event":"a"},{"event":"b"}]{"event":"c"}"#);
443 assert_eq!(events.len(), 3);
444 }
445
446 #[test]
447 fn empty_or_whitespace_body_yields_no_events() {
448 assert!(events_for(b"").is_empty());
449 assert!(events_for(b" \n ").is_empty());
450 }
451
452 #[test]
453 fn malformed_json_is_rejected() {
454 assert!(parse_hec_events(br#"{"event":"a"}{bad}"#).is_err());
455 }
456
457 #[test]
460 fn parse_time_integer_seconds() {
461 let v: VrlValue = json!(1426279439).into();
462 assert_eq!(parse_hec_time(&v).unwrap().timestamp(), 1426279439);
463 }
464
465 #[test]
466 fn parse_time_fractional_seconds_keeps_millis() {
467 let v: VrlValue = json!(1426279439.5).into();
468 assert_eq!(
470 parse_hec_time(&v).unwrap().timestamp_millis(),
471 1426279439500
472 );
473 }
474
475 #[test]
476 fn parse_time_integer_millis() {
477 let v: VrlValue = json!(1447828325000_i64).into();
478 assert_eq!(parse_hec_time(&v).unwrap().timestamp(), 1447828325);
480 }
481
482 #[test]
483 fn parse_time_string_number() {
484 let v: VrlValue = json!("1426279439").into();
485 assert_eq!(parse_hec_time(&v).unwrap().timestamp(), 1426279439);
486 }
487
488 #[test]
489 fn parse_time_passthrough_timestamp() {
490 let dt = DateTime::from_timestamp_nanos(123_456_789);
491 assert_eq!(parse_hec_time(&VrlValue::Timestamp(dt)), Some(dt));
492 }
493
494 #[test]
495 fn parse_time_missing_or_invalid_is_none() {
496 assert!(parse_hec_time(&VrlValue::Null).is_none());
497 let not_num: VrlValue = json!("not a number").into();
498 assert!(parse_hec_time(¬_num).is_none());
499 let obj: VrlValue = json!({ "x": 1 }).into();
500 assert!(parse_hec_time(&obj).is_none());
501 }
502
503 #[test]
506 fn sanitize_keeps_valid_names() {
507 assert_eq!(sanitize_index("main").as_deref(), Some("main"));
508 assert_eq!(
509 sanitize_index("web-prod.2024").as_deref(),
510 Some("web-prod.2024")
511 );
512 assert_eq!(
513 sanitize_index("cpu:metrics").as_deref(),
514 Some("cpu:metrics")
515 );
516 }
517
518 #[test]
519 fn sanitize_replaces_invalid_chars() {
520 assert_eq!(
521 sanitize_index("my index/v2").as_deref(),
522 Some("my_index_v2")
523 );
524 }
525
526 #[test]
527 fn sanitize_fixes_leading_digit() {
528 assert_eq!(sanitize_index("123logs").as_deref(), Some("_123logs"));
529 }
530
531 #[test]
532 fn sanitize_empty_is_none() {
533 assert!(sanitize_index("").is_none());
534 assert!(sanitize_index(" ").is_none());
535 }
536
537 #[test]
538 fn sanitize_output_is_always_a_valid_table_name() {
539 for raw in [
541 "main",
542 "web-prod.2024",
543 "my index/v2",
544 "123",
545 "@#@#",
546 "...",
547 "日本語 logs",
548 "a/b\\c",
549 ] {
550 if let Some(name) = sanitize_index(raw) {
551 assert!(
552 NAME_PATTERN_REG.is_match(&name),
553 "sanitized {raw:?} -> {name:?} is not a valid table name"
554 );
555 }
556 }
557 }
558
559 #[test]
562 fn map_extracts_metadata_and_routes_by_index() {
563 let event: VrlValue = json!({
564 "time": 1426279439,
565 "host": "web-01",
566 "source": "nginx",
567 "sourcetype": "access",
568 "index": "web_logs",
569 "event": "GET /api 200",
570 "fields": { "region": "us-east" }
571 })
572 .into();
573
574 let (table, map, tags) = hec_event_to_map(event, None).unwrap();
575
576 assert_eq!(table, "web_logs");
578
579 let tagset: HashSet<&str> = tags.iter().map(String::as_str).collect();
581 assert_eq!(
582 tagset,
583 HashSet::from(["host", "source", "sourcetype", "region"])
584 );
585
586 let VrlValue::Object(m) = map else {
587 panic!("expected object");
588 };
589 assert_eq!(m.get("host"), Some(&VrlValue::from(json!("web-01"))));
591 assert_eq!(m.get("region"), Some(&VrlValue::from(json!("us-east"))));
592 assert_eq!(m.get("event"), Some(&VrlValue::from(json!("GET /api 200"))));
593 assert!(!m.contains_key("time"));
595 assert!(matches!(
596 m.get(greptime_timestamp()),
597 Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1426279439
598 ));
599 assert!(!m.contains_key("index"));
601 assert!(!m.contains_key("fields"));
602 }
603
604 #[test]
605 fn map_falls_back_to_query_table_then_default() {
606 let ev1: VrlValue = json!({ "event": "x" }).into();
607 let (t1, _, _) = hec_event_to_map(ev1, Some("from_query")).unwrap();
608 assert_eq!(t1, "from_query");
609
610 let ev2: VrlValue = json!({ "event": "x" }).into();
611 let (t2, _, _) = hec_event_to_map(ev2, None).unwrap();
612 assert_eq!(t2, "splunk_logs");
613 }
614
615 #[test]
616 fn map_sanitizes_index_for_table() {
617 let ev: VrlValue = json!({ "index": "web/prod", "event": "x" }).into();
618 let (table, _, _) = hec_event_to_map(ev, None).unwrap();
619 assert_eq!(table, "web_prod");
620 }
621
622 #[test]
623 fn map_rejects_non_object_event() {
624 let ev: VrlValue = json!("just a string").into();
625 assert!(hec_event_to_map(ev, None).is_none());
626 }
627
628 #[test]
631 fn validates_event() {
632 let check = |v: serde_json::Value| validate_event(&v.into());
633
634 assert_eq!(
636 check(json!({ "host": "h" })),
637 Some((12, "Event field is required"))
638 );
639 assert_eq!(
641 check(json!({ "event": "" })),
642 Some((13, "Event field cannot be blank"))
643 );
644 assert_eq!(
645 check(json!({ "event": " " })),
646 Some((13, "Event field cannot be blank"))
647 );
648 assert_eq!(
649 check(json!({ "event": null })),
650 Some((13, "Event field cannot be blank"))
651 );
652 assert_eq!(check(json!({ "event": "hello" })), None);
654 assert_eq!(check(json!({ "event": { "a": 1 } })), None);
655 assert_eq!(check(json!({ "event": 0 })), None);
656 assert_eq!(check(json!("just a string")), None);
658
659 let bad_time = Some((6, "invalid data format"));
661 assert_eq!(
662 check(json!({ "event": "x", "time": "not-a-time" })),
663 bad_time
664 );
665 assert_eq!(check(json!({ "event": "x", "time": { "a": 1 } })), bad_time);
666 assert_eq!(check(json!({ "event": "x", "time": 1700000000 })), None);
667 assert_eq!(check(json!({ "event": "x", "time": "1700000000" })), None);
668 assert_eq!(check(json!({ "event": "x" })), None);
670 assert_eq!(check(json!({ "event": "x", "time": null })), None);
671 }
672
673 #[test]
674 fn parses_minimal_events_in_both_batch_forms() {
675 let concatenated = r#"{
678 "event": "event 1",
679 "time": 1447828325
680}
681
682{
683 "event": "event 2",
684 "time": 1447828326
685}"#;
686 let array = r#"[
687 { "event": "event 1", "time": 1447828325 },
688 { "event": "event 2", "time": 1447828326 }
689]"#;
690
691 for body in [concatenated, array] {
692 let events = parse_hec_events(body.as_bytes()).unwrap();
693 assert_eq!(events.len(), 2);
694
695 let (table, map, tags) =
696 hec_event_to_map(events.into_iter().next().unwrap(), None).unwrap();
697 assert_eq!(table, "splunk_logs"); assert!(tags.is_empty()); let VrlValue::Object(m) = map else {
700 panic!("expected object");
701 };
702 assert_eq!(m.get("event"), Some(&VrlValue::from(json!("event 1"))));
703 assert!(matches!(
704 m.get(greptime_timestamp()),
705 Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1447828325
706 ));
707 assert!(!m.contains_key("host"));
708 }
709 }
710
711 #[test]
712 fn map_keeps_event_object_for_pipeline_flattening() {
713 let ev: VrlValue = json!({ "event": { "a": 1 } }).into();
714 let (_, map, _) = hec_event_to_map(ev, None).unwrap();
715 let VrlValue::Object(m) = map else {
716 panic!("expected object");
717 };
718 assert!(matches!(m.get("event"), Some(VrlValue::Object(_))));
719 }
720
721 #[test]
722 fn map_uses_ingest_time_when_time_absent() {
723 let ev: VrlValue = json!({ "event": "x" }).into();
724 let (_, map, _) = hec_event_to_map(ev, None).unwrap();
725 let VrlValue::Object(m) = map else {
726 panic!("expected object");
727 };
728 assert!(matches!(
729 m.get(greptime_timestamp()),
730 Some(VrlValue::Timestamp(_))
731 ));
732 }
733
734 #[test]
737 fn parses_real_client_payloads() {
738 let vector = concat!(
744 r#"{"event":{"message":"GET /api 200","status":"200"},"fields":{"region":"us-east"},"#,
745 r#""time":1781713834.069,"host":"web-01","index":"main","source":"vector-src","sourcetype":"vector_demo"}"#,
746 r#"{"event":{"message":"POST /login 401","status":"401"},"fields":{"region":"us-west"},"#,
747 r#""time":1781713834.119,"host":"web-02","index":"main","source":"vector-src","sourcetype":"vector_demo"}"#,
748 );
749 let events = parse_hec_events(vector.as_bytes()).unwrap();
750 assert_eq!(events.len(), 2); let (table, map, tags) =
752 hec_event_to_map(events.into_iter().next().unwrap(), None).unwrap();
753 assert_eq!(table, "main");
754 let tagset: HashSet<&str> = tags.iter().map(String::as_str).collect();
755 assert_eq!(
756 tagset,
757 HashSet::from(["host", "source", "sourcetype", "region"])
758 );
759 let VrlValue::Object(m) = map else {
760 panic!("expected object");
761 };
762 assert!(matches!(
763 m.get(greptime_timestamp()),
764 Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1781713834
765 ));
766 assert!(matches!(m.get("event"), Some(VrlValue::Object(_)))); let otel = concat!(
771 r#"{"event":"{\"level\":\"info\",\"msg\":\"login ok\"}","fields":{"log.file.name":"app.log"},"#,
772 r#""host":"unknown","source":"otel-src","sourcetype":"otel_st","index":"main","time":1781714234.6849608}"#,
773 "\n",
774 r#"{"event":"{\"level\":\"error\",\"msg\":\"disk full\"}","fields":{"log.file.name":"app.log"},"#,
775 r#""host":"unknown","source":"otel-src","sourcetype":"otel_st","index":"main","time":1781714234.6849632}"#,
776 );
777 let events = parse_hec_events(otel.as_bytes()).unwrap();
778 assert_eq!(events.len(), 2); let (table, map, tags) =
780 hec_event_to_map(events.into_iter().next().unwrap(), None).unwrap();
781 assert_eq!(table, "main");
782 let tagset: HashSet<&str> = tags.iter().map(String::as_str).collect();
783 assert_eq!(
785 tagset,
786 HashSet::from(["host", "source", "sourcetype", "log.file.name"])
787 );
788 let VrlValue::Object(m) = map else {
789 panic!("expected object");
790 };
791 assert!(matches!(
792 m.get(greptime_timestamp()),
793 Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1781714234
794 ));
795 assert!(matches!(m.get("event"), Some(VrlValue::Bytes(_)))); }
797
798 #[test]
799 fn tag_promotion_is_scoped_per_table() {
800 use api::v1::{ColumnDataType, ColumnSchema, RowInsertRequest, Rows};
801
802 fn field_col(name: &str) -> ColumnSchema {
803 ColumnSchema {
804 column_name: name.to_string(),
805 datatype: ColumnDataType::String as i32,
806 semantic_type: SemanticType::Field as i32,
807 datatype_extension: None,
808 options: None,
809 }
810 }
811 fn req(table: &str, cols: &[&str]) -> RowInsertRequest {
812 RowInsertRequest {
813 table_name: table.to_string(),
814 rows: Some(Rows {
815 schema: cols.iter().map(|c| field_col(c)).collect(),
816 rows: vec![],
817 }),
818 }
819 }
820
821 let ctx_req =
824 ContextReq::default_opt_with_reqs(vec![req("a", &["region"]), req("b", &["region"])]);
825 let mut tags: HashMap<String, HashSet<String>> = HashMap::new();
826 tags.insert("a".to_string(), HashSet::from(["region".to_string()]));
827 tags.insert("b".to_string(), HashSet::new());
828
829 let out = apply_tag_columns(ctx_req, &tags);
830
831 for r in out.ref_all_req() {
832 let region = r
833 .rows
834 .as_ref()
835 .unwrap()
836 .schema
837 .iter()
838 .find(|c| c.column_name == "region")
839 .unwrap();
840 let expected = match r.table_name.as_str() {
841 "a" => SemanticType::Tag as i32,
842 "b" => SemanticType::Field as i32, other => panic!("unexpected table {other}"),
844 };
845 assert_eq!(region.semantic_type, expected, "table {}", r.table_name);
846 }
847 }
848}