Skip to main content

servers/http/
splunk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Splunk HTTP Event Collector (HEC) compatible ingestion endpoint.
16//!
17//! Clients point their base endpoint at `/v1/splunk`, so the full paths are e.g.
18//! `/v1/splunk/services/collector/event` and `/v1/splunk/services/collector/health`.
19
20use std::collections::{BTreeMap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Instant;
23
24use api::v1::SemanticType;
25use axum::Extension;
26use axum::extract::{Query, State};
27use axum::http::{HeaderMap, StatusCode};
28use axum::response::IntoResponse;
29use bytes::Bytes;
30use chrono::{DateTime, Utc};
31use common_base::regex_pattern::NAME_PATTERN_REG;
32use common_error::ext::ErrorExt;
33use common_query::prelude::greptime_timestamp;
34use common_telemetry::{debug, error};
35use operator::insert::SPLUNK_PK_METADATA_ORDER_KEY;
36use pipeline::{
37    ContextReq, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GreptimePipelineParams, PipelineContext,
38    PipelineDefinition,
39};
40use serde_json::{Deserializer, json};
41use session::context::{Channel, QueryContext, QueryContextRef};
42use vrl::value::{KeyString, Value as VrlValue};
43
44use crate::error::{Result, status_code_to_http_status};
45use crate::http::HttpResponse;
46use crate::http::event::{
47    LogIngesterQueryParams, LogState, PipelineIngestRequest, execute_log_context_req,
48    extract_pipeline_params_map_from_headers, transform_ndjson_array_factory,
49};
50use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
51use crate::metrics::{METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED};
52use crate::pipeline::run_pipeline;
53use crate::query_handler::PipelineHandlerRef;
54
55/// Default table used when neither the event's `index` nor a `?table=` query
56/// param is provided.
57const DEFAULT_SPLUNK_TABLE: &str = "splunk_logs";
58/// HEC response code for a healthy collector. Splunk returns
59/// `{"text":"HEC is healthy","code":17}`.
60const HEC_HEALTHY_CODE: u32 = 17;
61
62/// HEC response body `{"text", "code"}`; clients branch on `code`.
63fn hec_response(status: StatusCode, code: u32, text: &str) -> axum::response::Response {
64    (status, axum::Json(json!({ "text": text, "code": code }))).into_response()
65}
66
67/// Parses a HEC body into a flat list of events. Handles both batch forms: objects
68/// concatenated with any/no separator, and a top-level array (flattened).
69fn parse_hec_events(body: &[u8]) -> Result<Vec<VrlValue>> {
70    let values = Deserializer::from_slice(body).into_iter::<VrlValue>();
71    // ignore_error = false: reject the whole batch on a malformed value.
72    transform_ndjson_array_factory(values, false)
73}
74
75/// HEC `time`: epoch seconds (optionally fractional); values past ~1e12 are read as
76/// milliseconds. `None` if absent/unparseable (caller falls back to ingest time).
77fn parse_hec_time(value: &VrlValue) -> Option<DateTime<Utc>> {
78    let n: f64 = match value {
79        VrlValue::Integer(i) => *i as f64,
80        VrlValue::Float(f) => f.into_inner(),
81        VrlValue::Bytes(b) => std::str::from_utf8(b).ok()?.trim().parse().ok()?,
82        VrlValue::Timestamp(dt) => return Some(*dt),
83        _ => return None,
84    };
85    if !n.is_finite() {
86        return None;
87    }
88    const MILLIS_THRESHOLD: f64 = 1e12;
89    // Safe (`Option`-returning) constructors: out-of-range input yields `None`, not a panic.
90    if n >= MILLIS_THRESHOLD {
91        DateTime::from_timestamp_millis(n as i64)
92    } else {
93        let secs = n.floor() as i64;
94        let nsecs = ((n - n.floor()) * 1e9) as u32;
95        DateTime::from_timestamp(secs, nsecs)
96    }
97}
98
99/// `event` missing -> 12, `event` blank -> 13.
100/// present, non-null but unparsable `time` -> 6.
101fn validate_event(event: &VrlValue) -> Option<(u32, &'static str)> {
102    let VrlValue::Object(obj) = event else {
103        return None;
104    };
105    match obj.get("event") {
106        None => return Some((12, "Event field is required")),
107        Some(value) if is_blank_event(value) => return Some((13, "Event field cannot be blank")),
108        _ => {}
109    }
110    if let Some(time) = obj.get("time")
111        && !matches!(time, VrlValue::Null)
112        && parse_hec_time(time).is_none()
113    {
114        return Some((6, "invalid data format"));
115    }
116    None
117}
118
119/// A HEC `event` value is blank if it's `null` or an empty/whitespace-only string.
120fn is_blank_event(value: &VrlValue) -> bool {
121    match value {
122        VrlValue::Null => true,
123        VrlValue::Bytes(b) => std::str::from_utf8(b).is_ok_and(|s| s.trim().is_empty()),
124        _ => false,
125    }
126}
127
128/// Maps one HEC event to `(table, per-event map, tag names)`: `time`->timestamp,
129/// `index`->table, host/source/sourcetype/`fields`->tags, `event`+rest->data.
130/// `None` if the event isn't a JSON object.
131fn hec_event_to_map(
132    event: VrlValue,
133    query_table: Option<&str>,
134) -> Option<(String, VrlValue, Vec<String>)> {
135    let mut obj = match event {
136        VrlValue::Object(obj) => obj,
137        other => {
138            debug!("skipping non-object splunk HEC event: {other:?}");
139            return None;
140        }
141    };
142
143    // Timestamp: HEC `time` is honored first, else ingest time.
144    let ts = obj
145        .remove("time")
146        .as_ref()
147        .and_then(parse_hec_time)
148        .unwrap_or_else(Utc::now);
149
150    // Table routing: `index` (consumed) -> `?table=` -> default.
151    let index = match obj.remove("index") {
152        Some(VrlValue::Bytes(b)) => Some(String::from_utf8_lossy(&b).into_owned()),
153        _ => None,
154    };
155    let table = index
156        .as_deref()
157        .and_then(sanitize_index)
158        .or_else(|| query_table.map(str::to_string))
159        .unwrap_or_else(|| DEFAULT_SPLUNK_TABLE.to_string());
160
161    let mut map: BTreeMap<KeyString, VrlValue> = BTreeMap::new();
162    map.insert(
163        KeyString::from(greptime_timestamp()),
164        VrlValue::Timestamp(ts),
165    );
166
167    let mut tag_names: Vec<String> = Vec::new();
168
169    // `fields` is flat: spread its keys to top-level columns, all tags.
170    if let Some(VrlValue::Object(fields)) = obj.remove("fields") {
171        for (k, v) in fields {
172            tag_names.push(k.as_str().to_string());
173            map.insert(k, v);
174        }
175    }
176
177    // host / source / sourcetype are tags.
178    for key in ["host", "source", "sourcetype"] {
179        if let Some(v) = obj.remove(key) {
180            tag_names.push(key.to_string());
181            map.insert(KeyString::from(key), v);
182        }
183    }
184
185    // `event` and any remaining keys are data columns.
186    for (k, v) in obj {
187        map.insert(k, v);
188    }
189
190    Some((table, VrlValue::Object(map), tag_names))
191}
192
193/// Retags `Field` columns to `Tag` per table (identity makes everything a Field) so the
194/// insert path adds them to the primary key. Tags are scoped by table name so a batch
195/// targeting multiple tables can't cross-promote a same-named field. Identity-only:
196/// rebuilds under the default opt.
197fn apply_tag_columns(
198    ctx_req: ContextReq,
199    tag_columns: &HashMap<String, HashSet<String>>,
200) -> ContextReq {
201    let mut reqs = ctx_req.all_req().collect::<Vec<_>>();
202    for req in &mut reqs {
203        let Some(rows) = req.rows.as_mut() else {
204            continue;
205        };
206        let Some(tags) = tag_columns.get(&req.table_name) else {
207            continue;
208        };
209        for col in &mut rows.schema {
210            if tags.contains(&col.column_name) {
211                col.semantic_type = SemanticType::Tag as i32;
212            }
213        }
214    }
215    ContextReq::default_opt_with_reqs(reqs)
216}
217
218/// Coerces a Splunk `index` into a valid table name (`NAME_PATTERN`); `None` if empty.
219fn sanitize_index(raw: &str) -> Option<String> {
220    let trimmed = raw.trim();
221    if trimmed.is_empty() {
222        return None;
223    }
224    if NAME_PATTERN_REG.is_match(trimmed) {
225        return Some(trimmed.to_string());
226    }
227    let mut out = String::with_capacity(trimmed.len());
228    for c in trimmed.chars() {
229        // body-allowed set
230        if c.is_ascii_alphanumeric() || matches!(c, '_' | ':' | '-' | '.' | '@' | '#') {
231            out.push(c);
232        } else {
233            out.push('_'); // spaces, slashes, unicode, etc. → '_'
234        }
235    }
236
237    let first_ok = out
238        .chars()
239        .next()
240        .map(|c| c.is_ascii_alphabetic() || matches!(c, '_' | ':' | '-'))
241        .unwrap_or(false);
242
243    if !first_ok {
244        out.insert(0, '_');
245    }
246
247    NAME_PATTERN_REG.is_match(&out).then_some(out)
248}
249
250pub(crate) fn is_splunk_request<B>(req: &axum::extract::Request<B>) -> bool {
251    // Match only `/v1/splunk/<subpath>`
252    req.uri().path().starts_with("/v1/splunk/")
253}
254/// Like `ingest_logs_inner`, but retags metadata columns (identity default) before insert.
255async fn ingest_events(
256    handler: PipelineHandlerRef,
257    pipeline: PipelineDefinition,
258    requests: Vec<PipelineIngestRequest>,
259    query_ctx: QueryContextRef,
260    pipeline_params: GreptimePipelineParams,
261    tag_columns: HashMap<String, HashSet<String>>,
262    apply_tags: bool,
263) -> Result<HttpResponse> {
264    let exec_timer = Instant::now();
265    let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params, query_ctx.channel());
266
267    let mut ctx_req = ContextReq::default();
268    for req in requests {
269        ctx_req.merge(run_pipeline(&handler, &pipeline_ctx, req, &query_ctx, true).await?);
270    }
271
272    let ctx_req = if apply_tags {
273        apply_tag_columns(ctx_req, &tag_columns)
274    } else {
275        ctx_req
276    };
277
278    execute_log_context_req(
279        handler,
280        ctx_req,
281        query_ctx,
282        exec_timer,
283        &METRIC_HTTP_LOGS_INGESTION_COUNTER,
284        &METRIC_HTTP_LOGS_INGESTION_ELAPSED,
285    )
286    .await
287}
288
289/// `GET /services/collector/health` (+ `/1.0`). Public (see `PUBLIC_API_PREFIX`),
290/// since clients probe it before sending. `ack`/`token` query params are ignored.
291#[axum_macros::debug_handler]
292pub async fn handle_health() -> impl IntoResponse {
293    hec_response(StatusCode::OK, HEC_HEALTHY_CODE, "HEC is healthy")
294}
295
296/// `POST /services/collector/event` (+ `/services/collector`, `/event/1.0` aliases).
297/// Parses HEC events, runs them through the pipeline (identity default, overridable),
298/// and inserts with metadata columns as tags.
299#[axum_macros::debug_handler]
300pub async fn handle_event(
301    State(log_state): State<LogState>,
302    Query(params): Query<LogIngesterQueryParams>,
303    Extension(mut query_ctx): Extension<QueryContext>,
304    headers: HeaderMap,
305    payload: Bytes,
306) -> impl IntoResponse {
307    query_ctx.set_channel(Channel::Splunk);
308    let events = match parse_hec_events(&payload) {
309        Ok(events) => events,
310        // HEC code 6 == "invalid data format".
311        Err(_) => return hec_response(StatusCode::BAD_REQUEST, 6, "invalid data format"),
312    };
313    if events.is_empty() {
314        // HEC code 5 == "No data".
315        return hec_response(StatusCode::BAD_REQUEST, 5, "No data");
316    }
317
318    // Map each event -> (table, per-event map, tag names); group by table.
319    let query_table = params.table.as_deref();
320    let mut by_table: HashMap<String, Vec<VrlValue>> = HashMap::new();
321    let mut tag_columns: HashMap<String, HashSet<String>> = HashMap::new();
322    for event in events {
323        // Reject the batch on an invalid event: missing/blank `event` (12/13) or an
324        // unparsable `time` (6).
325        if let Some((code, text)) = validate_event(&event) {
326            return hec_response(StatusCode::BAD_REQUEST, code, text);
327        }
328        if let Some((table, map, tags)) = hec_event_to_map(event, query_table) {
329            tag_columns.entry(table.clone()).or_default().extend(tags);
330            by_table.entry(table).or_default().push(map);
331        }
332    }
333    let requests: Vec<PipelineIngestRequest> = by_table
334        .into_iter()
335        .map(|(table, values)| PipelineIngestRequest { table, values })
336        .collect();
337
338    // Events parsed but none were JSON objects, so nothing is ingestable. HEC code 6 == "invalid data format".
339    if requests.is_empty() {
340        return hec_response(StatusCode::BAD_REQUEST, 6, "invalid data format");
341    }
342
343    // Bad table name (e.g. invalid `?table=`) -> HEC code 7 ("incorrect index").
344    if let Some(bad) = requests
345        .iter()
346        .find(|r| !NAME_PATTERN_REG.is_match(&r.table))
347    {
348        let msg = format!("incorrect index: {}", bad.table);
349        return hec_response(StatusCode::BAD_REQUEST, 7, &msg);
350    }
351
352    // Pipeline: identity by default; override via `pipeline` param or header.
353    let pipeline_name = params.pipeline_name.clone().unwrap_or_else(|| {
354        headers
355            .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
356            .and_then(|v| v.to_str().ok())
357            .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
358            .to_string()
359    });
360    // Only post-process tags for the identity default; respect a user pipeline's schema.
361    let apply_tags = pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME;
362    if apply_tags {
363        // Ask the insert path to lead the primary key with our metadata tags. Scoped to the
364        // identity path so a user pipeline's own key order is left untouched.
365        query_ctx.set_extension(SPLUNK_PK_METADATA_ORDER_KEY, "true");
366    }
367    // custom_time_index so timestamp doesn't get overridden by identity pipeline.
368    let custom_time_index = Some((format!("{};epoch;ns", greptime_timestamp()), false));
369    let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, custom_time_index) {
370        Ok(pipeline) => pipeline,
371        Err(_) => return hec_response(StatusCode::INTERNAL_SERVER_ERROR, 8, "pipeline error"),
372    };
373    let pipeline_params =
374        GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
375
376    match ingest_events(
377        log_state.log_handler,
378        pipeline,
379        requests,
380        Arc::new(query_ctx),
381        pipeline_params,
382        tag_columns,
383        apply_tags,
384    )
385    .await
386    {
387        // HEC code 0 == "Success".
388        Ok(_) => hec_response(StatusCode::OK, 0, "Success"),
389        Err(e) => {
390            error!(e; "failed to ingest splunk hec events");
391            // client errors -> HEC code 6, else 8.
392            let status = status_code_to_http_status(&e.status_code());
393            let code = if status.is_client_error() { 6 } else { 8 };
394            let msg = e.to_string();
395            hec_response(status, code, &msg)
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    fn events_for(body: &[u8]) -> Vec<VrlValue> {
405        parse_hec_events(body).unwrap()
406    }
407
408    #[test]
409    fn parses_single_object() {
410        let events = events_for(br#"{"event":"hello","time":1}"#);
411        assert_eq!(events, vec![json!({"event":"hello","time":1}).into()]);
412    }
413
414    #[test]
415    fn parses_concatenated_objects_without_separator() {
416        let events = events_for(br#"{"event":"a"}{"event":"b"}"#);
417        assert_eq!(
418            events,
419            vec![json!({"event":"a"}).into(), json!({"event":"b"}).into()]
420        );
421    }
422
423    #[test]
424    fn parses_newline_and_whitespace_separated_objects() {
425        // newline, leading spaces, and a tab between objects — none are required.
426        let events = events_for(b"{\"event\":\"a\"}\n  {\"event\":\"b\"}\t{\"event\":\"c\"}");
427        assert_eq!(events.len(), 3);
428    }
429
430    #[test]
431    fn parses_top_level_array_into_flat_events() {
432        let events = events_for(br#"[{"event":"a"},{"event":"b"}]"#);
433        assert_eq!(
434            events,
435            vec![json!({"event":"a"}).into(), json!({"event":"b"}).into()]
436        );
437    }
438
439    #[test]
440    fn flattens_mixed_array_and_trailing_object() {
441        // a top-level array immediately followed by a bare object.
442        let events = events_for(br#"[{"event":"a"},{"event":"b"}]{"event":"c"}"#);
443        assert_eq!(events.len(), 3);
444    }
445
446    #[test]
447    fn empty_or_whitespace_body_yields_no_events() {
448        assert!(events_for(b"").is_empty());
449        assert!(events_for(b"   \n  ").is_empty());
450    }
451
452    #[test]
453    fn malformed_json_is_rejected() {
454        assert!(parse_hec_events(br#"{"event":"a"}{bad}"#).is_err());
455    }
456
457    // ---- parse_hec_time ----
458
459    #[test]
460    fn parse_time_integer_seconds() {
461        let v: VrlValue = json!(1426279439).into();
462        assert_eq!(parse_hec_time(&v).unwrap().timestamp(), 1426279439);
463    }
464
465    #[test]
466    fn parse_time_fractional_seconds_keeps_millis() {
467        let v: VrlValue = json!(1426279439.5).into();
468        // the .5s must survive (not be truncated like EpochProcessor would).
469        assert_eq!(
470            parse_hec_time(&v).unwrap().timestamp_millis(),
471            1426279439500
472        );
473    }
474
475    #[test]
476    fn parse_time_integer_millis() {
477        let v: VrlValue = json!(1447828325000_i64).into();
478        // past the millis threshold -> read as ms, same instant as 1447828325s.
479        assert_eq!(parse_hec_time(&v).unwrap().timestamp(), 1447828325);
480    }
481
482    #[test]
483    fn parse_time_string_number() {
484        let v: VrlValue = json!("1426279439").into();
485        assert_eq!(parse_hec_time(&v).unwrap().timestamp(), 1426279439);
486    }
487
488    #[test]
489    fn parse_time_passthrough_timestamp() {
490        let dt = DateTime::from_timestamp_nanos(123_456_789);
491        assert_eq!(parse_hec_time(&VrlValue::Timestamp(dt)), Some(dt));
492    }
493
494    #[test]
495    fn parse_time_missing_or_invalid_is_none() {
496        assert!(parse_hec_time(&VrlValue::Null).is_none());
497        let not_num: VrlValue = json!("not a number").into();
498        assert!(parse_hec_time(&not_num).is_none());
499        let obj: VrlValue = json!({ "x": 1 }).into();
500        assert!(parse_hec_time(&obj).is_none());
501    }
502
503    // ---- sanitize_index ----
504
505    #[test]
506    fn sanitize_keeps_valid_names() {
507        assert_eq!(sanitize_index("main").as_deref(), Some("main"));
508        assert_eq!(
509            sanitize_index("web-prod.2024").as_deref(),
510            Some("web-prod.2024")
511        );
512        assert_eq!(
513            sanitize_index("cpu:metrics").as_deref(),
514            Some("cpu:metrics")
515        );
516    }
517
518    #[test]
519    fn sanitize_replaces_invalid_chars() {
520        assert_eq!(
521            sanitize_index("my index/v2").as_deref(),
522            Some("my_index_v2")
523        );
524    }
525
526    #[test]
527    fn sanitize_fixes_leading_digit() {
528        assert_eq!(sanitize_index("123logs").as_deref(), Some("_123logs"));
529    }
530
531    #[test]
532    fn sanitize_empty_is_none() {
533        assert!(sanitize_index("").is_none());
534        assert!(sanitize_index("   ").is_none());
535    }
536
537    #[test]
538    fn sanitize_output_is_always_a_valid_table_name() {
539        // Invariant: a non-empty input never yields a name the create path would reject.
540        for raw in [
541            "main",
542            "web-prod.2024",
543            "my index/v2",
544            "123",
545            "@#@#",
546            "...",
547            "日本語 logs",
548            "a/b\\c",
549        ] {
550            if let Some(name) = sanitize_index(raw) {
551                assert!(
552                    NAME_PATTERN_REG.is_match(&name),
553                    "sanitized {raw:?} -> {name:?} is not a valid table name"
554                );
555            }
556        }
557    }
558
559    // ---- hec_event_to_map ----
560
561    #[test]
562    fn map_extracts_metadata_and_routes_by_index() {
563        let event: VrlValue = json!({
564            "time": 1426279439,
565            "host": "web-01",
566            "source": "nginx",
567            "sourcetype": "access",
568            "index": "web_logs",
569            "event": "GET /api 200",
570            "fields": { "region": "us-east" }
571        })
572        .into();
573
574        let (table, map, tags) = hec_event_to_map(event, None).unwrap();
575
576        // `index` -> table name.
577        assert_eq!(table, "web_logs");
578
579        // tags = host/source/sourcetype + each `fields` key.
580        let tagset: HashSet<&str> = tags.iter().map(String::as_str).collect();
581        assert_eq!(
582            tagset,
583            HashSet::from(["host", "source", "sourcetype", "region"])
584        );
585
586        let VrlValue::Object(m) = map else {
587            panic!("expected object");
588        };
589        // metadata + fields became columns with their values.
590        assert_eq!(m.get("host"), Some(&VrlValue::from(json!("web-01"))));
591        assert_eq!(m.get("region"), Some(&VrlValue::from(json!("us-east"))));
592        assert_eq!(m.get("event"), Some(&VrlValue::from(json!("GET /api 200"))));
593        // `time` became the timestamp column, not a `time` column.
594        assert!(!m.contains_key("time"));
595        assert!(matches!(
596            m.get(greptime_timestamp()),
597            Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1426279439
598        ));
599        // `index` and `fields` are consumed, not columns.
600        assert!(!m.contains_key("index"));
601        assert!(!m.contains_key("fields"));
602    }
603
604    #[test]
605    fn map_falls_back_to_query_table_then_default() {
606        let ev1: VrlValue = json!({ "event": "x" }).into();
607        let (t1, _, _) = hec_event_to_map(ev1, Some("from_query")).unwrap();
608        assert_eq!(t1, "from_query");
609
610        let ev2: VrlValue = json!({ "event": "x" }).into();
611        let (t2, _, _) = hec_event_to_map(ev2, None).unwrap();
612        assert_eq!(t2, "splunk_logs");
613    }
614
615    #[test]
616    fn map_sanitizes_index_for_table() {
617        let ev: VrlValue = json!({ "index": "web/prod", "event": "x" }).into();
618        let (table, _, _) = hec_event_to_map(ev, None).unwrap();
619        assert_eq!(table, "web_prod");
620    }
621
622    #[test]
623    fn map_rejects_non_object_event() {
624        let ev: VrlValue = json!("just a string").into();
625        assert!(hec_event_to_map(ev, None).is_none());
626    }
627
628    // ---- validate_event ----
629
630    #[test]
631    fn validates_event() {
632        let check = |v: serde_json::Value| validate_event(&v.into());
633
634        // missing `event` -> code 12.
635        assert_eq!(
636            check(json!({ "host": "h" })),
637            Some((12, "Event field is required"))
638        );
639        // present but blank (empty / whitespace) or null -> code 13.
640        assert_eq!(
641            check(json!({ "event": "" })),
642            Some((13, "Event field cannot be blank"))
643        );
644        assert_eq!(
645            check(json!({ "event": "   " })),
646            Some((13, "Event field cannot be blank"))
647        );
648        assert_eq!(
649            check(json!({ "event": null })),
650            Some((13, "Event field cannot be blank"))
651        );
652        // valid: non-empty string, object, or other non-blank value.
653        assert_eq!(check(json!({ "event": "hello" })), None);
654        assert_eq!(check(json!({ "event": { "a": 1 } })), None);
655        assert_eq!(check(json!({ "event": 0 })), None);
656        // non-object events aren't validated here (handled by `hec_event_to_map`).
657        assert_eq!(check(json!("just a string")), None);
658
659        // present but unparsable `time` -> code 6 (number string / numeric are fine).
660        let bad_time = Some((6, "invalid data format"));
661        assert_eq!(
662            check(json!({ "event": "x", "time": "not-a-time" })),
663            bad_time
664        );
665        assert_eq!(check(json!({ "event": "x", "time": { "a": 1 } })), bad_time);
666        assert_eq!(check(json!({ "event": "x", "time": 1700000000 })), None);
667        assert_eq!(check(json!({ "event": "x", "time": "1700000000" })), None);
668        // absent or null `time` falls back to ingest time, so it's allowed.
669        assert_eq!(check(json!({ "event": "x" })), None);
670        assert_eq!(check(json!({ "event": "x", "time": null })), None);
671    }
672
673    #[test]
674    fn parses_minimal_events_in_both_batch_forms() {
675        // Splunk docs "Example 3": minimal events (`event` + `time` only), sent both as
676        // concatenated objects (whitespace-separated) and as a JSON array.
677        let concatenated = r#"{
678  "event": "event 1",
679  "time": 1447828325
680}
681
682{
683  "event": "event 2",
684  "time": 1447828326
685}"#;
686        let array = r#"[
687  { "event": "event 1", "time": 1447828325 },
688  { "event": "event 2", "time": 1447828326 }
689]"#;
690
691        for body in [concatenated, array] {
692            let events = parse_hec_events(body.as_bytes()).unwrap();
693            assert_eq!(events.len(), 2);
694
695            let (table, map, tags) =
696                hec_event_to_map(events.into_iter().next().unwrap(), None).unwrap();
697            assert_eq!(table, "splunk_logs"); // no `index` -> default table
698            assert!(tags.is_empty()); // no host/source/sourcetype/fields
699            let VrlValue::Object(m) = map else {
700                panic!("expected object");
701            };
702            assert_eq!(m.get("event"), Some(&VrlValue::from(json!("event 1"))));
703            assert!(matches!(
704                m.get(greptime_timestamp()),
705                Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1447828325
706            ));
707            assert!(!m.contains_key("host"));
708        }
709    }
710
711    #[test]
712    fn map_keeps_event_object_for_pipeline_flattening() {
713        let ev: VrlValue = json!({ "event": { "a": 1 } }).into();
714        let (_, map, _) = hec_event_to_map(ev, None).unwrap();
715        let VrlValue::Object(m) = map else {
716            panic!("expected object");
717        };
718        assert!(matches!(m.get("event"), Some(VrlValue::Object(_))));
719    }
720
721    #[test]
722    fn map_uses_ingest_time_when_time_absent() {
723        let ev: VrlValue = json!({ "event": "x" }).into();
724        let (_, map, _) = hec_event_to_map(ev, None).unwrap();
725        let VrlValue::Object(m) = map else {
726            panic!("expected object");
727        };
728        assert!(matches!(
729            m.get(greptime_timestamp()),
730            Some(VrlValue::Timestamp(_))
731        ));
732    }
733
734    // ---- real client payload ----
735
736    #[test]
737    fn parses_real_client_payloads() {
738        // Shapes captured from real `splunk_hec` clients (values trimmed, structure
739        // verbatim). The two clients deliberately disagree on batch separator,
740        // `event` type, and `fields` keys — the parser must handle all of it.
741
742        // --- Vector splunk_hec sink: NO separator; `event` is an object. ---
743        let vector = concat!(
744            r#"{"event":{"message":"GET /api 200","status":"200"},"fields":{"region":"us-east"},"#,
745            r#""time":1781713834.069,"host":"web-01","index":"main","source":"vector-src","sourcetype":"vector_demo"}"#,
746            r#"{"event":{"message":"POST /login 401","status":"401"},"fields":{"region":"us-west"},"#,
747            r#""time":1781713834.119,"host":"web-02","index":"main","source":"vector-src","sourcetype":"vector_demo"}"#,
748        );
749        let events = parse_hec_events(vector.as_bytes()).unwrap();
750        assert_eq!(events.len(), 2); // concatenated, no separator
751        let (table, map, tags) =
752            hec_event_to_map(events.into_iter().next().unwrap(), None).unwrap();
753        assert_eq!(table, "main");
754        let tagset: HashSet<&str> = tags.iter().map(String::as_str).collect();
755        assert_eq!(
756            tagset,
757            HashSet::from(["host", "source", "sourcetype", "region"])
758        );
759        let VrlValue::Object(m) = map else {
760            panic!("expected object");
761        };
762        assert!(matches!(
763            m.get(greptime_timestamp()),
764            Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1781713834
765        ));
766        assert!(matches!(m.get("event"), Some(VrlValue::Object(_)))); // event is an object
767
768        // --- OTel Collector splunk_hec exporter: NEWLINE-separated; `event` is a
769        //     string; a `fields` key contains dots. ---
770        let otel = concat!(
771            r#"{"event":"{\"level\":\"info\",\"msg\":\"login ok\"}","fields":{"log.file.name":"app.log"},"#,
772            r#""host":"unknown","source":"otel-src","sourcetype":"otel_st","index":"main","time":1781714234.6849608}"#,
773            "\n",
774            r#"{"event":"{\"level\":\"error\",\"msg\":\"disk full\"}","fields":{"log.file.name":"app.log"},"#,
775            r#""host":"unknown","source":"otel-src","sourcetype":"otel_st","index":"main","time":1781714234.6849632}"#,
776        );
777        let events = parse_hec_events(otel.as_bytes()).unwrap();
778        assert_eq!(events.len(), 2); // newline-separated
779        let (table, map, tags) =
780            hec_event_to_map(events.into_iter().next().unwrap(), None).unwrap();
781        assert_eq!(table, "main");
782        let tagset: HashSet<&str> = tags.iter().map(String::as_str).collect();
783        // a dotted `fields` key still becomes a tag column.
784        assert_eq!(
785            tagset,
786            HashSet::from(["host", "source", "sourcetype", "log.file.name"])
787        );
788        let VrlValue::Object(m) = map else {
789            panic!("expected object");
790        };
791        assert!(matches!(
792            m.get(greptime_timestamp()),
793            Some(VrlValue::Timestamp(dt)) if dt.timestamp() == 1781714234
794        ));
795        assert!(matches!(m.get("event"), Some(VrlValue::Bytes(_)))); // event is a string
796    }
797
798    #[test]
799    fn tag_promotion_is_scoped_per_table() {
800        use api::v1::{ColumnDataType, ColumnSchema, RowInsertRequest, Rows};
801
802        fn field_col(name: &str) -> ColumnSchema {
803            ColumnSchema {
804                column_name: name.to_string(),
805                datatype: ColumnDataType::String as i32,
806                semantic_type: SemanticType::Field as i32,
807                datatype_extension: None,
808                options: None,
809            }
810        }
811        fn req(table: &str, cols: &[&str]) -> RowInsertRequest {
812            RowInsertRequest {
813                table_name: table.to_string(),
814                rows: Some(Rows {
815                    schema: cols.iter().map(|c| field_col(c)).collect(),
816                    rows: vec![],
817                }),
818            }
819        }
820
821        // One batch -> two tables, both with a `region` column. `region` is a tag in
822        // table "a" only; table "b"'s same-named field must NOT be promoted.
823        let ctx_req =
824            ContextReq::default_opt_with_reqs(vec![req("a", &["region"]), req("b", &["region"])]);
825        let mut tags: HashMap<String, HashSet<String>> = HashMap::new();
826        tags.insert("a".to_string(), HashSet::from(["region".to_string()]));
827        tags.insert("b".to_string(), HashSet::new());
828
829        let out = apply_tag_columns(ctx_req, &tags);
830
831        for r in out.ref_all_req() {
832            let region = r
833                .rows
834                .as_ref()
835                .unwrap()
836                .schema
837                .iter()
838                .find(|c| c.column_name == "region")
839                .unwrap();
840            let expected = match r.table_name.as_str() {
841                "a" => SemanticType::Tag as i32,
842                "b" => SemanticType::Field as i32, // would have been Tag before the per-table fix
843                other => panic!("unexpected table {other}"),
844            };
845            assert_eq!(region.semantic_type, expected, "table {}", r.table_name);
846        }
847    }
848}