1use std::collections::BTreeMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Path, Query, State};
20use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
21use axum::response::IntoResponse;
22use axum::Extension;
23use axum_extra::TypedHeader;
24use common_error::ext::ErrorExt;
25use common_telemetry::{debug, error};
26use headers::ContentType;
27use once_cell::sync::Lazy;
28use pipeline::{
29 GreptimePipelineParams, PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
30};
31use serde_json::{json, Deserializer, Value};
32use session::context::{Channel, QueryContext};
33use snafu::{ensure, ResultExt};
34use vrl::value::Value as VrlValue;
35
36use crate::error::{
37 status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu,
38 Result as ServersResult,
39};
40use crate::http::event::{
41 extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState,
42 PipelineIngestRequest,
43};
44use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
45use crate::metrics::{
46 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
47};
48
49static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
51 HeaderMap::from_iter([
52 (
53 axum::http::header::CONTENT_TYPE,
54 HeaderValue::from_static("application/json"),
55 ),
56 (
57 HeaderName::from_static("x-elastic-product"),
58 HeaderValue::from_static("Elasticsearch"),
59 ),
60 ])
61});
62
63const ELASTICSEARCH_VERSION: &str = "8.16.0";
65
66#[axum_macros::debug_handler]
68pub async fn handle_get_version() -> impl IntoResponse {
69 let body = serde_json::json!({
70 "version": {
71 "number": ELASTICSEARCH_VERSION
72 }
73 });
74 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
75}
76
77#[axum_macros::debug_handler]
80pub async fn handle_get_license() -> impl IntoResponse {
81 let body = serde_json::json!({
82 "license": {
83 "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
84 "type": "oss",
85 "status": "active",
86 "expiry_date_in_millis": 4891198687000_i64,
87 }
88 });
89 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
90}
91
92#[axum_macros::debug_handler]
95pub async fn handle_bulk_api(
96 State(log_state): State<LogState>,
97 Query(params): Query<LogIngesterQueryParams>,
98 Extension(query_ctx): Extension<QueryContext>,
99 TypedHeader(_content_type): TypedHeader<ContentType>,
100 headers: HeaderMap,
101 payload: String,
102) -> impl IntoResponse {
103 do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
104}
105
106#[axum_macros::debug_handler]
109pub async fn handle_bulk_api_with_index(
110 State(log_state): State<LogState>,
111 Path(index): Path<String>,
112 Query(params): Query<LogIngesterQueryParams>,
113 Extension(query_ctx): Extension<QueryContext>,
114 TypedHeader(_content_type): TypedHeader<ContentType>,
115 headers: HeaderMap,
116 payload: String,
117) -> impl IntoResponse {
118 do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
119}
120
121async fn do_handle_bulk_api(
122 log_state: LogState,
123 index: Option<String>,
124 params: LogIngesterQueryParams,
125 mut query_ctx: QueryContext,
126 headers: HeaderMap,
127 payload: String,
128) -> impl IntoResponse {
129 let start = Instant::now();
130 debug!(
131 "Received bulk request, params: {:?}, payload: {:?}",
132 params, payload
133 );
134
135 query_ctx.set_channel(Channel::Elasticsearch);
137
138 let db = query_ctx.current_schema();
139
140 let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
142 .with_label_values(&[&db])
143 .start_timer();
144
145 let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
147 headers
148 .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
149 .and_then(|v| v.to_str().ok())
150 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
151 });
152
153 let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) {
155 Ok(requests) => requests,
156 Err(e) => {
157 return (
158 StatusCode::BAD_REQUEST,
159 elasticsearch_headers(),
160 axum::Json(write_bulk_response(
161 start.elapsed().as_millis() as i64,
162 0,
163 StatusCode::BAD_REQUEST.as_u16() as u32,
164 e.to_string().as_str(),
165 )),
166 );
167 }
168 };
169 let log_num = requests.len();
170
171 let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
172 Ok(pipeline) => pipeline,
173 Err(e) => {
174 error!(e; "Failed to ingest logs");
176 return (
177 status_code_to_http_status(&e.status_code()),
178 elasticsearch_headers(),
179 axum::Json(write_bulk_response(
180 start.elapsed().as_millis() as i64,
181 0,
182 e.status_code() as u32,
183 e.to_string().as_str(),
184 )),
185 );
186 }
187 };
188 let pipeline_params =
189 GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
190 if let Err(e) = ingest_logs_inner(
191 log_state.log_handler,
192 pipeline,
193 requests,
194 Arc::new(query_ctx),
195 pipeline_params,
196 )
197 .await
198 {
199 error!(e; "Failed to ingest logs");
200 return (
201 status_code_to_http_status(&e.status_code()),
202 elasticsearch_headers(),
203 axum::Json(write_bulk_response(
204 start.elapsed().as_millis() as i64,
205 0,
206 e.status_code() as u32,
207 e.to_string().as_str(),
208 )),
209 );
210 }
211
212 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
214 .with_label_values(&[&db])
215 .inc_by(log_num as u64);
216
217 (
218 StatusCode::OK,
219 elasticsearch_headers(),
220 axum::Json(write_bulk_response(
221 start.elapsed().as_millis() as i64,
222 log_num,
223 StatusCode::CREATED.as_u16() as u32,
224 "",
225 )),
226 )
227}
228
229fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
248 if error_reason.is_empty() {
249 let items: Vec<Value> = (0..n)
250 .map(|_| {
251 json!({
252 "create": {
253 "status": status_code
254 }
255 })
256 })
257 .collect();
258 json!({
259 "took": took_ms,
260 "errors": false,
261 "items": items,
262 })
263 } else {
264 json!({
265 "took": took_ms,
266 "errors": true,
267 "items": [
268 { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
269 ]
270 })
271 }
272}
273
274pub fn elasticsearch_headers() -> HeaderMap {
276 ELASTICSEARCH_HEADERS.clone()
277}
278
279fn parse_bulk_request(
287 input: &str,
288 index_from_url: &Option<String>,
289 msg_field: &Option<String>,
290) -> ServersResult<Vec<PipelineIngestRequest>> {
291 let values: Vec<VrlValue> = Deserializer::from_str(input)
293 .into_iter::<VrlValue>()
294 .collect::<Result<_, _>>()
295 .context(ParseJsonSnafu)?;
296
297 ensure!(
299 !values.is_empty(),
300 InvalidElasticsearchInputSnafu {
301 reason: "empty bulk request".to_string(),
302 }
303 );
304
305 let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
306 let mut values = values.into_iter();
307
308 while let Some(cmd) = values.next() {
313 let mut cmd = cmd.into_object();
315 let index = if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("create")) {
316 get_index_from_cmd(cmd)?
317 } else if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("index")) {
318 get_index_from_cmd(cmd)?
319 } else {
320 return InvalidElasticsearchInputSnafu {
321 reason: format!(
322 "invalid bulk request, expected 'create' or 'index' but got {:?}",
323 cmd
324 ),
325 }
326 .fail();
327 };
328
329 if let Some(document) = values.next() {
331 let log_value = if let Some(msg_field) = msg_field {
333 get_log_value_from_msg_field(document, msg_field)
334 } else {
335 document
336 };
337
338 ensure!(
339 index.is_some() || index_from_url.is_some(),
340 InvalidElasticsearchInputSnafu {
341 reason: "missing index in bulk request".to_string(),
342 }
343 );
344
345 requests.push(PipelineIngestRequest {
346 table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
347 values: vec![log_value],
348 });
349 }
350 }
351
352 debug!(
353 "Received {} log ingest requests: {:?}",
354 requests.len(),
355 requests
356 );
357
358 Ok(requests)
359}
360
361fn get_index_from_cmd(v: VrlValue) -> ServersResult<Option<String>> {
363 let Some(index) = v.into_object().and_then(|mut m| m.remove("_index")) else {
364 return Ok(None);
365 };
366
367 if let VrlValue::Bytes(index) = index {
368 Ok(Some(String::from_utf8_lossy(&index).to_string()))
369 } else {
370 InvalidElasticsearchInputSnafu {
372 reason: "index is not a string in bulk request",
373 }
374 .fail()
375 }
376}
377
378fn get_log_value_from_msg_field(v: VrlValue, msg_field: &str) -> VrlValue {
381 let VrlValue::Object(mut m) = v else {
382 return v;
383 };
384
385 if let Some(message) = m.remove(msg_field) {
386 match message {
387 VrlValue::Bytes(bytes) => {
388 match serde_json::from_slice::<VrlValue>(&bytes) {
389 Ok(v) => v,
390 Err(_) => {
392 let map = BTreeMap::from([(
393 msg_field.to_string().into(),
394 VrlValue::Bytes(bytes),
395 )]);
396 VrlValue::Object(map)
397 }
398 }
399 }
400 _ => message,
402 }
403 } else {
404 VrlValue::Object(m)
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn test_parse_bulk_request() {
415 let test_cases = vec![
416 (
418 r#"
419 {"create":{"_index":"test","_id":"1"}}
420 {"foo1":"foo1_value", "bar1":"bar1_value"}
421 {"create":{"_index":"test","_id":"2"}}
422 {"foo2":"foo2_value","bar2":"bar2_value"}
423 "#,
424 None,
425 None,
426 Ok(vec![
427 PipelineIngestRequest {
428 table: "test".to_string(),
429 values: vec![
430 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
431 ],
432 },
433 PipelineIngestRequest {
434 table: "test".to_string(),
435 values: vec![
436 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
437 ],
438 },
439 ]),
440 ),
441 (
443 r#"
444 {"create":{"_index":"test","_id":"1"}}
445 {"foo1":"foo1_value", "bar1":"bar1_value"}
446 {"create":{"_index":"logs","_id":"2"}}
447 {"foo2":"foo2_value","bar2":"bar2_value"}
448 "#,
449 Some("logs".to_string()),
450 None,
451 Ok(vec![
452 PipelineIngestRequest {
453 table: "test".to_string(),
454 values: vec![
455 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
456 ],
457 },
458 PipelineIngestRequest {
459 table: "logs".to_string(),
460 values: vec![
461 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
462 ],
463 },
464 ]),
465 ),
466 (
468 r#"
469 {"create":{"_index":"test","_id":"1"}}
470 {"foo1":"foo1_value", "bar1":"bar1_value"}
471 {"create":{"_index":"logs","_id":"2"}}
472 {"foo2":"foo2_value","bar2":"bar2_value"}
473 "#,
474 Some("logs".to_string()),
475 None,
476 Ok(vec![
477 PipelineIngestRequest {
478 table: "test".to_string(),
479 values: vec![
480 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
481 ],
482 },
483 PipelineIngestRequest {
484 table: "logs".to_string(),
485 values: vec![
486 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
487 ],
488 },
489 ]),
490 ),
491 (
493 r#"
494 {"create":{"_index":"test","_id":"1"}}
495 {"foo1":"foo1_value", "bar1":"bar1_value"}
496 {"create":{"_index":"logs","_id":"2"}}
497 "#,
498 Some("logs".to_string()),
499 None,
500 Ok(vec![
501 PipelineIngestRequest {
502 table: "test".to_string(),
503 values: vec![
504 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
505 ],
506 },
507 ]),
508 ),
509 (
511 r#"
512 {"create":{"_index":"test","_id":"1"}}
513 {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
514 {"create":{"_index":"test","_id":"2"}}
515 {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
516 "#,
517 None,
518 Some("data".to_string()),
519 Ok(vec![
520 PipelineIngestRequest {
521 table: "test".to_string(),
522 values: vec![
523 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
524 ],
525 },
526 PipelineIngestRequest {
527 table: "test".to_string(),
528 values: vec![
529 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
530 ],
531 },
532 ]),
533 ),
534 (
536 r#"
537 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
538 {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
539 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
540 {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
541 "#,
542 None,
543 Some("message".to_string()),
544 Ok(vec![
545 PipelineIngestRequest {
546 table: "logs-generic-default".to_string(),
547 values: vec![
548 json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""}).into(),
549 ],
550 },
551 PipelineIngestRequest {
552 table: "logs-generic-default".to_string(),
553 values: vec![
554 json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""}).into(),
555 ],
556 },
557 ]),
558 ),
559 (
561 r#"
562 { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
563 { "foo1" : "foo1_value", "bar1" : "bar1_value" }
564 "#,
565 None,
566 None,
567 Err(InvalidElasticsearchInputSnafu {
568 reason: "it's a invalid bulk request".to_string(),
569 }),
570 ),
571 ];
572
573 for (input, index, msg_field, expected) in test_cases {
574 let requests = parse_bulk_request(input, &index, &msg_field);
575 if expected.is_ok() {
576 assert_eq!(requests.unwrap(), expected.unwrap());
577 } else {
578 assert!(requests.is_err());
579 }
580 }
581 }
582}