1use std::sync::Arc;
16use std::time::Instant;
17
18use axum::extract::{Path, Query, State};
19use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
20use axum::response::IntoResponse;
21use axum::Extension;
22use axum_extra::TypedHeader;
23use common_error::ext::ErrorExt;
24use common_telemetry::{debug, error};
25use headers::ContentType;
26use once_cell::sync::Lazy;
27use pipeline::{PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME};
28use serde_json::{json, Deserializer, Value};
29use session::context::{Channel, QueryContext};
30use snafu::{ensure, ResultExt};
31
32use crate::error::{
33 status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, PipelineSnafu,
34 Result as ServersResult,
35};
36use crate::http::event::{
37 ingest_logs_inner, LogIngesterQueryParams, LogState, PipelineIngestRequest,
38};
39use crate::metrics::{
40 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
41};
42
43static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
45 HeaderMap::from_iter([
46 (
47 axum::http::header::CONTENT_TYPE,
48 HeaderValue::from_static("application/json"),
49 ),
50 (
51 HeaderName::from_static("x-elastic-product"),
52 HeaderValue::from_static("Elasticsearch"),
53 ),
54 ])
55});
56
57const ELASTICSEARCH_VERSION: &str = "8.16.0";
59
60#[axum_macros::debug_handler]
62pub async fn handle_get_version() -> impl IntoResponse {
63 let body = serde_json::json!({
64 "version": {
65 "number": ELASTICSEARCH_VERSION
66 }
67 });
68 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
69}
70
71#[axum_macros::debug_handler]
74pub async fn handle_get_license() -> impl IntoResponse {
75 let body = serde_json::json!({
76 "license": {
77 "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
78 "type": "oss",
79 "status": "active",
80 "expiry_date_in_millis": 4891198687000_i64,
81 }
82 });
83 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
84}
85
86#[axum_macros::debug_handler]
89pub async fn handle_bulk_api(
90 State(log_state): State<LogState>,
91 Query(params): Query<LogIngesterQueryParams>,
92 Extension(query_ctx): Extension<QueryContext>,
93 TypedHeader(_content_type): TypedHeader<ContentType>,
94 headers: HeaderMap,
95 payload: String,
96) -> impl IntoResponse {
97 do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
98}
99
100#[axum_macros::debug_handler]
103pub async fn handle_bulk_api_with_index(
104 State(log_state): State<LogState>,
105 Path(index): Path<String>,
106 Query(params): Query<LogIngesterQueryParams>,
107 Extension(query_ctx): Extension<QueryContext>,
108 TypedHeader(_content_type): TypedHeader<ContentType>,
109 headers: HeaderMap,
110 payload: String,
111) -> impl IntoResponse {
112 do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
113}
114
115async fn do_handle_bulk_api(
116 log_state: LogState,
117 index: Option<String>,
118 params: LogIngesterQueryParams,
119 mut query_ctx: QueryContext,
120 headers: HeaderMap,
121 payload: String,
122) -> impl IntoResponse {
123 let start = Instant::now();
124 debug!(
125 "Received bulk request, params: {:?}, payload: {:?}",
126 params, payload
127 );
128
129 query_ctx.set_channel(Channel::Elasticsearch);
131
132 let db = params.db.unwrap_or_else(|| "public".to_string());
133
134 let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
136 .with_label_values(&[&db])
137 .start_timer();
138
139 let pipeline_name = if let Some(pipeline) = params.pipeline_name {
141 pipeline
142 } else {
143 GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string()
144 };
145
146 let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) {
148 Ok(requests) => requests,
149 Err(e) => {
150 return (
151 StatusCode::BAD_REQUEST,
152 elasticsearch_headers(),
153 axum::Json(write_bulk_response(
154 start.elapsed().as_millis() as i64,
155 0,
156 StatusCode::BAD_REQUEST.as_u16() as u32,
157 e.to_string().as_str(),
158 )),
159 );
160 }
161 };
162 let log_num = requests.len();
163
164 let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) {
165 Ok(pipeline) => pipeline,
166 Err(e) => {
167 error!(e; "Failed to ingest logs");
169 return (
170 status_code_to_http_status(&e.status_code()),
171 elasticsearch_headers(),
172 axum::Json(write_bulk_response(
173 start.elapsed().as_millis() as i64,
174 0,
175 e.status_code() as u32,
176 e.to_string().as_str(),
177 )),
178 );
179 }
180 };
181 if let Err(e) = ingest_logs_inner(
182 log_state.log_handler,
183 pipeline,
184 requests,
185 Arc::new(query_ctx),
186 headers,
187 )
188 .await
189 {
190 error!(e; "Failed to ingest logs");
191 return (
192 status_code_to_http_status(&e.status_code()),
193 elasticsearch_headers(),
194 axum::Json(write_bulk_response(
195 start.elapsed().as_millis() as i64,
196 0,
197 e.status_code() as u32,
198 e.to_string().as_str(),
199 )),
200 );
201 }
202
203 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
205 .with_label_values(&[&db])
206 .inc_by(log_num as u64);
207
208 (
209 StatusCode::OK,
210 elasticsearch_headers(),
211 axum::Json(write_bulk_response(
212 start.elapsed().as_millis() as i64,
213 log_num,
214 StatusCode::CREATED.as_u16() as u32,
215 "",
216 )),
217 )
218}
219
220fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
239 if error_reason.is_empty() {
240 let items: Vec<Value> = (0..n)
241 .map(|_| {
242 json!({
243 "create": {
244 "status": status_code
245 }
246 })
247 })
248 .collect();
249 json!({
250 "took": took_ms,
251 "errors": false,
252 "items": items,
253 })
254 } else {
255 json!({
256 "took": took_ms,
257 "errors": true,
258 "items": [
259 { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
260 ]
261 })
262 }
263}
264
265pub fn elasticsearch_headers() -> HeaderMap {
267 ELASTICSEARCH_HEADERS.clone()
268}
269
270fn parse_bulk_request(
278 input: &str,
279 index_from_url: &Option<String>,
280 msg_field: &Option<String>,
281) -> ServersResult<Vec<PipelineIngestRequest>> {
282 let values: Vec<Value> = Deserializer::from_str(input)
284 .into_iter::<Value>()
285 .collect::<Result<_, _>>()
286 .context(ParseJsonSnafu)?;
287
288 ensure!(
290 !values.is_empty(),
291 InvalidElasticsearchInputSnafu {
292 reason: "empty bulk request".to_string(),
293 }
294 );
295
296 let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
297 let mut values = values.into_iter();
298
299 while let Some(mut cmd) = values.next() {
304 let index = if let Some(cmd) = cmd.get_mut("create") {
306 get_index_from_cmd(cmd.take())?
307 } else if let Some(cmd) = cmd.get_mut("index") {
308 get_index_from_cmd(cmd.take())?
309 } else {
310 return InvalidElasticsearchInputSnafu {
311 reason: format!(
312 "invalid bulk request, expected 'create' or 'index' but got {:?}",
313 cmd
314 ),
315 }
316 .fail();
317 };
318
319 if let Some(document) = values.next() {
321 let log_value = if let Some(msg_field) = msg_field {
323 get_log_value_from_msg_field(document, msg_field)
324 } else {
325 document
326 };
327
328 ensure!(
329 index.is_some() || index_from_url.is_some(),
330 InvalidElasticsearchInputSnafu {
331 reason: "missing index in bulk request".to_string(),
332 }
333 );
334
335 let log_value = pipeline::json_to_map(log_value).context(PipelineSnafu)?;
336 requests.push(PipelineIngestRequest {
337 table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
338 values: vec![log_value],
339 });
340 }
341 }
342
343 debug!(
344 "Received {} log ingest requests: {:?}",
345 requests.len(),
346 requests
347 );
348
349 Ok(requests)
350}
351
352fn get_index_from_cmd(mut v: Value) -> ServersResult<Option<String>> {
354 if let Some(index) = v.get_mut("_index") {
355 if let Value::String(index) = index.take() {
356 Ok(Some(index))
357 } else {
358 InvalidElasticsearchInputSnafu {
360 reason: "index is not a string in bulk request".to_string(),
361 }
362 .fail()
363 }
364 } else {
365 Ok(None)
366 }
367}
368
369fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
372 if let Some(message) = v.get_mut(msg_field) {
373 let message = message.take();
374 match message {
375 Value::String(s) => match serde_json::from_str::<Value>(&s) {
376 Ok(s) => s,
377 Err(_) => json!({msg_field: s}),
379 },
380 _ => message,
382 }
383 } else {
384 v
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn test_parse_bulk_request() {
395 let test_cases = vec![
396 (
398 r#"
399 {"create":{"_index":"test","_id":"1"}}
400 {"foo1":"foo1_value", "bar1":"bar1_value"}
401 {"create":{"_index":"test","_id":"2"}}
402 {"foo2":"foo2_value","bar2":"bar2_value"}
403 "#,
404 None,
405 None,
406 Ok(vec![
407 PipelineIngestRequest {
408 table: "test".to_string(),
409 values: vec![
410 pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap(),
411 ],
412 },
413 PipelineIngestRequest {
414 table: "test".to_string(),
415 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
416 },
417 ]),
418 ),
419 (
421 r#"
422 {"create":{"_index":"test","_id":"1"}}
423 {"foo1":"foo1_value", "bar1":"bar1_value"}
424 {"create":{"_index":"logs","_id":"2"}}
425 {"foo2":"foo2_value","bar2":"bar2_value"}
426 "#,
427 Some("logs".to_string()),
428 None,
429 Ok(vec![
430 PipelineIngestRequest {
431 table: "test".to_string(),
432 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
433 },
434 PipelineIngestRequest {
435 table: "logs".to_string(),
436 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
437 },
438 ]),
439 ),
440 (
442 r#"
443 {"create":{"_index":"test","_id":"1"}}
444 {"foo1":"foo1_value", "bar1":"bar1_value"}
445 {"create":{"_index":"logs","_id":"2"}}
446 {"foo2":"foo2_value","bar2":"bar2_value"}
447 "#,
448 Some("logs".to_string()),
449 None,
450 Ok(vec![
451 PipelineIngestRequest {
452 table: "test".to_string(),
453 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
454 },
455 PipelineIngestRequest {
456 table: "logs".to_string(),
457 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
458 },
459 ]),
460 ),
461 (
463 r#"
464 {"create":{"_index":"test","_id":"1"}}
465 {"foo1":"foo1_value", "bar1":"bar1_value"}
466 {"create":{"_index":"logs","_id":"2"}}
467 "#,
468 Some("logs".to_string()),
469 None,
470 Ok(vec![
471 PipelineIngestRequest {
472 table: "test".to_string(),
473 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
474 },
475 ]),
476 ),
477 (
479 r#"
480 {"create":{"_index":"test","_id":"1"}}
481 {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
482 {"create":{"_index":"test","_id":"2"}}
483 {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
484 "#,
485 None,
486 Some("data".to_string()),
487 Ok(vec![
488 PipelineIngestRequest {
489 table: "test".to_string(),
490 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
491 },
492 PipelineIngestRequest {
493 table: "test".to_string(),
494 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
495 },
496 ]),
497 ),
498 (
500 r#"
501 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
502 {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
503 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
504 {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
505 "#,
506 None,
507 Some("message".to_string()),
508 Ok(vec![
509 PipelineIngestRequest {
510 table: "logs-generic-default".to_string(),
511 values: vec![
512 pipeline::json_to_map(json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""})).unwrap(),
513 ],
514 },
515 PipelineIngestRequest {
516 table: "logs-generic-default".to_string(),
517 values: vec![
518 pipeline::json_to_map(json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""})).unwrap(),
519 ],
520 },
521 ]),
522 ),
523 (
525 r#"
526 { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
527 { "foo1" : "foo1_value", "bar1" : "bar1_value" }
528 "#,
529 None,
530 None,
531 Err(InvalidElasticsearchInputSnafu {
532 reason: "it's a invalid bulk request".to_string(),
533 }),
534 ),
535 ];
536
537 for (input, index, msg_field, expected) in test_cases {
538 let requests = parse_bulk_request(input, &index, &msg_field);
539 if expected.is_ok() {
540 assert_eq!(requests.unwrap(), expected.unwrap());
541 } else {
542 assert!(requests.is_err());
543 }
544 }
545 }
546}