1use std::collections::BTreeMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::Extension;
20use axum::extract::{Path, Query, State};
21use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
22use axum::response::IntoResponse;
23use axum_extra::TypedHeader;
24use common_error::ext::ErrorExt;
25use common_telemetry::{debug, error};
26use headers::ContentType;
27use once_cell::sync::Lazy;
28use pipeline::{
29 GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GreptimePipelineParams, PipelineDefinition,
30};
31use serde_json::{Deserializer, Value, json};
32use session::context::{Channel, QueryContext};
33use snafu::{ResultExt, ensure};
34use table::requests::{
35 SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_ELASTICSEARCH,
36};
37use vrl::value::Value as VrlValue;
38
39use crate::error::{
40 InvalidElasticsearchInputSnafu, ParseJsonSnafu, Result as ServersResult,
41 status_code_to_http_status,
42};
43use crate::http::event::{
44 LogIngesterQueryParams, LogState, PipelineIngestRequest,
45 extract_pipeline_params_map_from_headers, ingest_logs_inner,
46};
47use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
48use crate::metrics::{
49 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
50};
51
52static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
54 HeaderMap::from_iter([
55 (
56 axum::http::header::CONTENT_TYPE,
57 HeaderValue::from_static("application/json"),
58 ),
59 (
60 HeaderName::from_static("x-elastic-product"),
61 HeaderValue::from_static("Elasticsearch"),
62 ),
63 ])
64});
65
66const ELASTICSEARCH_VERSION: &str = "8.16.0";
68
69#[axum_macros::debug_handler]
71pub async fn handle_get_version() -> impl IntoResponse {
72 let body = serde_json::json!({
73 "version": {
74 "number": ELASTICSEARCH_VERSION
75 }
76 });
77 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
78}
79
80#[axum_macros::debug_handler]
83pub async fn handle_get_license() -> impl IntoResponse {
84 let body = serde_json::json!({
85 "license": {
86 "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
87 "type": "oss",
88 "status": "active",
89 "expiry_date_in_millis": 4891198687000_i64,
90 }
91 });
92 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
93}
94
95#[axum_macros::debug_handler]
98pub async fn handle_bulk_api(
99 State(log_state): State<LogState>,
100 Query(params): Query<LogIngesterQueryParams>,
101 Extension(query_ctx): Extension<QueryContext>,
102 TypedHeader(_content_type): TypedHeader<ContentType>,
103 headers: HeaderMap,
104 payload: String,
105) -> impl IntoResponse {
106 do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
107}
108
109#[axum_macros::debug_handler]
112pub async fn handle_bulk_api_with_index(
113 State(log_state): State<LogState>,
114 Path(index): Path<String>,
115 Query(params): Query<LogIngesterQueryParams>,
116 Extension(query_ctx): Extension<QueryContext>,
117 TypedHeader(_content_type): TypedHeader<ContentType>,
118 headers: HeaderMap,
119 payload: String,
120) -> impl IntoResponse {
121 do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
122}
123
124async fn do_handle_bulk_api(
125 log_state: LogState,
126 index: Option<String>,
127 params: LogIngesterQueryParams,
128 mut query_ctx: QueryContext,
129 headers: HeaderMap,
130 payload: String,
131) -> impl IntoResponse {
132 let start = Instant::now();
133 debug!(
134 "Received bulk request, params: {:?}, payload: {:?}",
135 params, payload
136 );
137
138 query_ctx.set_channel(Channel::Elasticsearch);
140 query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG);
141 query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_ELASTICSEARCH);
142
143 let db = query_ctx.current_schema();
144
145 let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
147 .with_label_values(&[&db])
148 .start_timer();
149
150 let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
152 headers
153 .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
154 .and_then(|v| v.to_str().ok())
155 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
156 });
157
158 let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) {
160 Ok(requests) => requests,
161 Err(e) => {
162 return (
163 StatusCode::BAD_REQUEST,
164 elasticsearch_headers(),
165 axum::Json(write_bulk_response(
166 start.elapsed().as_millis() as i64,
167 0,
168 StatusCode::BAD_REQUEST.as_u16() as u32,
169 e.to_string().as_str(),
170 )),
171 );
172 }
173 };
174 let log_num = requests.len();
175
176 let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
177 Ok(pipeline) => pipeline,
178 Err(e) => {
179 error!(e; "Failed to ingest logs");
181 return (
182 status_code_to_http_status(&e.status_code()),
183 elasticsearch_headers(),
184 axum::Json(write_bulk_response(
185 start.elapsed().as_millis() as i64,
186 0,
187 e.status_code() as u32,
188 e.to_string().as_str(),
189 )),
190 );
191 }
192 };
193 let pipeline_params =
194 GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
195 if let Err(e) = ingest_logs_inner(
196 log_state.log_handler,
197 pipeline,
198 requests,
199 Arc::new(query_ctx),
200 pipeline_params,
201 )
202 .await
203 {
204 error!(e; "Failed to ingest logs");
205 return (
206 status_code_to_http_status(&e.status_code()),
207 elasticsearch_headers(),
208 axum::Json(write_bulk_response(
209 start.elapsed().as_millis() as i64,
210 0,
211 e.status_code() as u32,
212 e.to_string().as_str(),
213 )),
214 );
215 }
216
217 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
219 .with_label_values(&[&db])
220 .inc_by(log_num as u64);
221
222 (
223 StatusCode::OK,
224 elasticsearch_headers(),
225 axum::Json(write_bulk_response(
226 start.elapsed().as_millis() as i64,
227 log_num,
228 StatusCode::CREATED.as_u16() as u32,
229 "",
230 )),
231 )
232}
233
234fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
253 if error_reason.is_empty() {
254 let items: Vec<Value> = (0..n)
255 .map(|_| {
256 json!({
257 "create": {
258 "status": status_code
259 }
260 })
261 })
262 .collect();
263 json!({
264 "took": took_ms,
265 "errors": false,
266 "items": items,
267 })
268 } else {
269 json!({
270 "took": took_ms,
271 "errors": true,
272 "items": [
273 { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
274 ]
275 })
276 }
277}
278
279pub fn elasticsearch_headers() -> HeaderMap {
281 ELASTICSEARCH_HEADERS.clone()
282}
283
284fn parse_bulk_request(
292 input: &str,
293 index_from_url: &Option<String>,
294 msg_field: &Option<String>,
295) -> ServersResult<Vec<PipelineIngestRequest>> {
296 let values: Vec<VrlValue> = Deserializer::from_str(input)
298 .into_iter::<VrlValue>()
299 .collect::<Result<_, _>>()
300 .context(ParseJsonSnafu)?;
301
302 ensure!(
304 !values.is_empty(),
305 InvalidElasticsearchInputSnafu {
306 reason: "empty bulk request".to_string(),
307 }
308 );
309
310 let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
311 let mut values = values.into_iter();
312
313 while let Some(cmd) = values.next() {
318 let mut cmd = cmd.into_object();
320 let index = if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("create")) {
321 get_index_from_cmd(cmd)?
322 } else if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("index")) {
323 get_index_from_cmd(cmd)?
324 } else {
325 return InvalidElasticsearchInputSnafu {
326 reason: format!(
327 "invalid bulk request, expected 'create' or 'index' but got {:?}",
328 cmd
329 ),
330 }
331 .fail();
332 };
333
334 if let Some(document) = values.next() {
336 let log_value = if let Some(msg_field) = msg_field {
338 get_log_value_from_msg_field(document, msg_field)
339 } else {
340 document
341 };
342
343 ensure!(
344 index.is_some() || index_from_url.is_some(),
345 InvalidElasticsearchInputSnafu {
346 reason: "missing index in bulk request".to_string(),
347 }
348 );
349
350 requests.push(PipelineIngestRequest {
351 table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
352 values: vec![log_value],
353 });
354 }
355 }
356
357 debug!(
358 "Received {} log ingest requests: {:?}",
359 requests.len(),
360 requests
361 );
362
363 Ok(requests)
364}
365
366fn get_index_from_cmd(v: VrlValue) -> ServersResult<Option<String>> {
368 let Some(index) = v.into_object().and_then(|mut m| m.remove("_index")) else {
369 return Ok(None);
370 };
371
372 if let VrlValue::Bytes(index) = index {
373 Ok(Some(String::from_utf8_lossy(&index).to_string()))
374 } else {
375 InvalidElasticsearchInputSnafu {
377 reason: "index is not a string in bulk request",
378 }
379 .fail()
380 }
381}
382
383fn get_log_value_from_msg_field(v: VrlValue, msg_field: &str) -> VrlValue {
386 let VrlValue::Object(mut m) = v else {
387 return v;
388 };
389
390 if let Some(message) = m.remove(msg_field) {
391 match message {
392 VrlValue::Bytes(bytes) => {
393 match serde_json::from_slice::<VrlValue>(&bytes) {
394 Ok(v) => v,
395 Err(_) => {
397 let map = BTreeMap::from([(
398 msg_field.to_string().into(),
399 VrlValue::Bytes(bytes),
400 )]);
401 VrlValue::Object(map)
402 }
403 }
404 }
405 _ => message,
407 }
408 } else {
409 VrlValue::Object(m)
411 }
412}
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417
418 #[test]
419 fn test_parse_bulk_request() {
420 let test_cases = vec![
421 (
423 r#"
424 {"create":{"_index":"test","_id":"1"}}
425 {"foo1":"foo1_value", "bar1":"bar1_value"}
426 {"create":{"_index":"test","_id":"2"}}
427 {"foo2":"foo2_value","bar2":"bar2_value"}
428 "#,
429 None,
430 None,
431 Ok(vec![
432 PipelineIngestRequest {
433 table: "test".to_string(),
434 values: vec![
435 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
436 ],
437 },
438 PipelineIngestRequest {
439 table: "test".to_string(),
440 values: vec![
441 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
442 ],
443 },
444 ]),
445 ),
446 (
448 r#"
449 {"create":{"_index":"test","_id":"1"}}
450 {"foo1":"foo1_value", "bar1":"bar1_value"}
451 {"create":{"_index":"logs","_id":"2"}}
452 {"foo2":"foo2_value","bar2":"bar2_value"}
453 "#,
454 Some("logs".to_string()),
455 None,
456 Ok(vec![
457 PipelineIngestRequest {
458 table: "test".to_string(),
459 values: vec![
460 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
461 ],
462 },
463 PipelineIngestRequest {
464 table: "logs".to_string(),
465 values: vec![
466 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
467 ],
468 },
469 ]),
470 ),
471 (
473 r#"
474 {"create":{"_index":"test","_id":"1"}}
475 {"foo1":"foo1_value", "bar1":"bar1_value"}
476 {"create":{"_index":"logs","_id":"2"}}
477 {"foo2":"foo2_value","bar2":"bar2_value"}
478 "#,
479 Some("logs".to_string()),
480 None,
481 Ok(vec![
482 PipelineIngestRequest {
483 table: "test".to_string(),
484 values: vec![
485 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
486 ],
487 },
488 PipelineIngestRequest {
489 table: "logs".to_string(),
490 values: vec![
491 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
492 ],
493 },
494 ]),
495 ),
496 (
498 r#"
499 {"create":{"_index":"test","_id":"1"}}
500 {"foo1":"foo1_value", "bar1":"bar1_value"}
501 {"create":{"_index":"logs","_id":"2"}}
502 "#,
503 Some("logs".to_string()),
504 None,
505 Ok(vec![
506 PipelineIngestRequest {
507 table: "test".to_string(),
508 values: vec![
509 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
510 ],
511 },
512 ]),
513 ),
514 (
516 r#"
517 {"create":{"_index":"test","_id":"1"}}
518 {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
519 {"create":{"_index":"test","_id":"2"}}
520 {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
521 "#,
522 None,
523 Some("data".to_string()),
524 Ok(vec![
525 PipelineIngestRequest {
526 table: "test".to_string(),
527 values: vec![
528 json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
529 ],
530 },
531 PipelineIngestRequest {
532 table: "test".to_string(),
533 values: vec![
534 json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
535 ],
536 },
537 ]),
538 ),
539 (
541 r#"
542 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
543 {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
544 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
545 {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
546 "#,
547 None,
548 Some("message".to_string()),
549 Ok(vec![
550 PipelineIngestRequest {
551 table: "logs-generic-default".to_string(),
552 values: vec![
553 json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""}).into(),
554 ],
555 },
556 PipelineIngestRequest {
557 table: "logs-generic-default".to_string(),
558 values: vec![
559 json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""}).into(),
560 ],
561 },
562 ]),
563 ),
564 (
566 r#"
567 { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
568 { "foo1" : "foo1_value", "bar1" : "bar1_value" }
569 "#,
570 None,
571 None,
572 Err(InvalidElasticsearchInputSnafu {
573 reason: "it's a invalid bulk request".to_string(),
574 }),
575 ),
576 ];
577
578 for (input, index, msg_field, expected) in test_cases {
579 let requests = parse_bulk_request(input, &index, &msg_field);
580 if let Ok(expected) = expected {
581 assert_eq!(requests.unwrap(), expected);
582 } else {
583 assert!(requests.is_err());
584 }
585 }
586 }
587}