1use std::sync::Arc;
16use std::time::Instant;
17
18use axum::extract::{Path, Query, State};
19use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
20use axum::response::IntoResponse;
21use axum::Extension;
22use axum_extra::TypedHeader;
23use common_error::ext::ErrorExt;
24use common_telemetry::{debug, error};
25use headers::ContentType;
26use once_cell::sync::Lazy;
27use pipeline::{
28 GreptimePipelineParams, PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
29};
30use serde_json::{json, Deserializer, Value};
31use session::context::{Channel, QueryContext};
32use snafu::{ensure, ResultExt};
33
34use crate::error::{
35 status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, PipelineSnafu,
36 Result as ServersResult,
37};
38use crate::http::event::{
39 extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState,
40 PipelineIngestRequest,
41};
42use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
43use crate::metrics::{
44 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
45};
46
47static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
49 HeaderMap::from_iter([
50 (
51 axum::http::header::CONTENT_TYPE,
52 HeaderValue::from_static("application/json"),
53 ),
54 (
55 HeaderName::from_static("x-elastic-product"),
56 HeaderValue::from_static("Elasticsearch"),
57 ),
58 ])
59});
60
61const ELASTICSEARCH_VERSION: &str = "8.16.0";
63
64#[axum_macros::debug_handler]
66pub async fn handle_get_version() -> impl IntoResponse {
67 let body = serde_json::json!({
68 "version": {
69 "number": ELASTICSEARCH_VERSION
70 }
71 });
72 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
73}
74
75#[axum_macros::debug_handler]
78pub async fn handle_get_license() -> impl IntoResponse {
79 let body = serde_json::json!({
80 "license": {
81 "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
82 "type": "oss",
83 "status": "active",
84 "expiry_date_in_millis": 4891198687000_i64,
85 }
86 });
87 (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
88}
89
90#[axum_macros::debug_handler]
93pub async fn handle_bulk_api(
94 State(log_state): State<LogState>,
95 Query(params): Query<LogIngesterQueryParams>,
96 Extension(query_ctx): Extension<QueryContext>,
97 TypedHeader(_content_type): TypedHeader<ContentType>,
98 headers: HeaderMap,
99 payload: String,
100) -> impl IntoResponse {
101 do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
102}
103
104#[axum_macros::debug_handler]
107pub async fn handle_bulk_api_with_index(
108 State(log_state): State<LogState>,
109 Path(index): Path<String>,
110 Query(params): Query<LogIngesterQueryParams>,
111 Extension(query_ctx): Extension<QueryContext>,
112 TypedHeader(_content_type): TypedHeader<ContentType>,
113 headers: HeaderMap,
114 payload: String,
115) -> impl IntoResponse {
116 do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
117}
118
119async fn do_handle_bulk_api(
120 log_state: LogState,
121 index: Option<String>,
122 params: LogIngesterQueryParams,
123 mut query_ctx: QueryContext,
124 headers: HeaderMap,
125 payload: String,
126) -> impl IntoResponse {
127 let start = Instant::now();
128 debug!(
129 "Received bulk request, params: {:?}, payload: {:?}",
130 params, payload
131 );
132
133 query_ctx.set_channel(Channel::Elasticsearch);
135
136 let db = query_ctx.current_schema();
137
138 let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
140 .with_label_values(&[&db])
141 .start_timer();
142
143 let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
145 headers
146 .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
147 .and_then(|v| v.to_str().ok())
148 .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
149 });
150
151 let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) {
153 Ok(requests) => requests,
154 Err(e) => {
155 return (
156 StatusCode::BAD_REQUEST,
157 elasticsearch_headers(),
158 axum::Json(write_bulk_response(
159 start.elapsed().as_millis() as i64,
160 0,
161 StatusCode::BAD_REQUEST.as_u16() as u32,
162 e.to_string().as_str(),
163 )),
164 );
165 }
166 };
167 let log_num = requests.len();
168
169 let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
170 Ok(pipeline) => pipeline,
171 Err(e) => {
172 error!(e; "Failed to ingest logs");
174 return (
175 status_code_to_http_status(&e.status_code()),
176 elasticsearch_headers(),
177 axum::Json(write_bulk_response(
178 start.elapsed().as_millis() as i64,
179 0,
180 e.status_code() as u32,
181 e.to_string().as_str(),
182 )),
183 );
184 }
185 };
186 let pipeline_params =
187 GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
188 if let Err(e) = ingest_logs_inner(
189 log_state.log_handler,
190 pipeline,
191 requests,
192 Arc::new(query_ctx),
193 pipeline_params,
194 )
195 .await
196 {
197 error!(e; "Failed to ingest logs");
198 return (
199 status_code_to_http_status(&e.status_code()),
200 elasticsearch_headers(),
201 axum::Json(write_bulk_response(
202 start.elapsed().as_millis() as i64,
203 0,
204 e.status_code() as u32,
205 e.to_string().as_str(),
206 )),
207 );
208 }
209
210 METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
212 .with_label_values(&[&db])
213 .inc_by(log_num as u64);
214
215 (
216 StatusCode::OK,
217 elasticsearch_headers(),
218 axum::Json(write_bulk_response(
219 start.elapsed().as_millis() as i64,
220 log_num,
221 StatusCode::CREATED.as_u16() as u32,
222 "",
223 )),
224 )
225}
226
227fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
246 if error_reason.is_empty() {
247 let items: Vec<Value> = (0..n)
248 .map(|_| {
249 json!({
250 "create": {
251 "status": status_code
252 }
253 })
254 })
255 .collect();
256 json!({
257 "took": took_ms,
258 "errors": false,
259 "items": items,
260 })
261 } else {
262 json!({
263 "took": took_ms,
264 "errors": true,
265 "items": [
266 { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
267 ]
268 })
269 }
270}
271
272pub fn elasticsearch_headers() -> HeaderMap {
274 ELASTICSEARCH_HEADERS.clone()
275}
276
277fn parse_bulk_request(
285 input: &str,
286 index_from_url: &Option<String>,
287 msg_field: &Option<String>,
288) -> ServersResult<Vec<PipelineIngestRequest>> {
289 let values: Vec<Value> = Deserializer::from_str(input)
291 .into_iter::<Value>()
292 .collect::<Result<_, _>>()
293 .context(ParseJsonSnafu)?;
294
295 ensure!(
297 !values.is_empty(),
298 InvalidElasticsearchInputSnafu {
299 reason: "empty bulk request".to_string(),
300 }
301 );
302
303 let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
304 let mut values = values.into_iter();
305
306 while let Some(mut cmd) = values.next() {
311 let index = if let Some(cmd) = cmd.get_mut("create") {
313 get_index_from_cmd(cmd.take())?
314 } else if let Some(cmd) = cmd.get_mut("index") {
315 get_index_from_cmd(cmd.take())?
316 } else {
317 return InvalidElasticsearchInputSnafu {
318 reason: format!(
319 "invalid bulk request, expected 'create' or 'index' but got {:?}",
320 cmd
321 ),
322 }
323 .fail();
324 };
325
326 if let Some(document) = values.next() {
328 let log_value = if let Some(msg_field) = msg_field {
330 get_log_value_from_msg_field(document, msg_field)
331 } else {
332 document
333 };
334
335 ensure!(
336 index.is_some() || index_from_url.is_some(),
337 InvalidElasticsearchInputSnafu {
338 reason: "missing index in bulk request".to_string(),
339 }
340 );
341
342 let log_value = pipeline::json_to_map(log_value).context(PipelineSnafu)?;
343 requests.push(PipelineIngestRequest {
344 table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
345 values: vec![log_value],
346 });
347 }
348 }
349
350 debug!(
351 "Received {} log ingest requests: {:?}",
352 requests.len(),
353 requests
354 );
355
356 Ok(requests)
357}
358
359fn get_index_from_cmd(mut v: Value) -> ServersResult<Option<String>> {
361 if let Some(index) = v.get_mut("_index") {
362 if let Value::String(index) = index.take() {
363 Ok(Some(index))
364 } else {
365 InvalidElasticsearchInputSnafu {
367 reason: "index is not a string in bulk request".to_string(),
368 }
369 .fail()
370 }
371 } else {
372 Ok(None)
373 }
374}
375
376fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
379 if let Some(message) = v.get_mut(msg_field) {
380 let message = message.take();
381 match message {
382 Value::String(s) => match serde_json::from_str::<Value>(&s) {
383 Ok(s) => s,
384 Err(_) => json!({msg_field: s}),
386 },
387 _ => message,
389 }
390 } else {
391 v
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_parse_bulk_request() {
402 let test_cases = vec![
403 (
405 r#"
406 {"create":{"_index":"test","_id":"1"}}
407 {"foo1":"foo1_value", "bar1":"bar1_value"}
408 {"create":{"_index":"test","_id":"2"}}
409 {"foo2":"foo2_value","bar2":"bar2_value"}
410 "#,
411 None,
412 None,
413 Ok(vec![
414 PipelineIngestRequest {
415 table: "test".to_string(),
416 values: vec![
417 pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap(),
418 ],
419 },
420 PipelineIngestRequest {
421 table: "test".to_string(),
422 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
423 },
424 ]),
425 ),
426 (
428 r#"
429 {"create":{"_index":"test","_id":"1"}}
430 {"foo1":"foo1_value", "bar1":"bar1_value"}
431 {"create":{"_index":"logs","_id":"2"}}
432 {"foo2":"foo2_value","bar2":"bar2_value"}
433 "#,
434 Some("logs".to_string()),
435 None,
436 Ok(vec![
437 PipelineIngestRequest {
438 table: "test".to_string(),
439 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
440 },
441 PipelineIngestRequest {
442 table: "logs".to_string(),
443 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
444 },
445 ]),
446 ),
447 (
449 r#"
450 {"create":{"_index":"test","_id":"1"}}
451 {"foo1":"foo1_value", "bar1":"bar1_value"}
452 {"create":{"_index":"logs","_id":"2"}}
453 {"foo2":"foo2_value","bar2":"bar2_value"}
454 "#,
455 Some("logs".to_string()),
456 None,
457 Ok(vec![
458 PipelineIngestRequest {
459 table: "test".to_string(),
460 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
461 },
462 PipelineIngestRequest {
463 table: "logs".to_string(),
464 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
465 },
466 ]),
467 ),
468 (
470 r#"
471 {"create":{"_index":"test","_id":"1"}}
472 {"foo1":"foo1_value", "bar1":"bar1_value"}
473 {"create":{"_index":"logs","_id":"2"}}
474 "#,
475 Some("logs".to_string()),
476 None,
477 Ok(vec![
478 PipelineIngestRequest {
479 table: "test".to_string(),
480 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
481 },
482 ]),
483 ),
484 (
486 r#"
487 {"create":{"_index":"test","_id":"1"}}
488 {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
489 {"create":{"_index":"test","_id":"2"}}
490 {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
491 "#,
492 None,
493 Some("data".to_string()),
494 Ok(vec![
495 PipelineIngestRequest {
496 table: "test".to_string(),
497 values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
498 },
499 PipelineIngestRequest {
500 table: "test".to_string(),
501 values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
502 },
503 ]),
504 ),
505 (
507 r#"
508 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
509 {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
510 {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
511 {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
512 "#,
513 None,
514 Some("message".to_string()),
515 Ok(vec![
516 PipelineIngestRequest {
517 table: "logs-generic-default".to_string(),
518 values: vec![
519 pipeline::json_to_map(json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""})).unwrap(),
520 ],
521 },
522 PipelineIngestRequest {
523 table: "logs-generic-default".to_string(),
524 values: vec![
525 pipeline::json_to_map(json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""})).unwrap(),
526 ],
527 },
528 ]),
529 ),
530 (
532 r#"
533 { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
534 { "foo1" : "foo1_value", "bar1" : "bar1_value" }
535 "#,
536 None,
537 None,
538 Err(InvalidElasticsearchInputSnafu {
539 reason: "it's a invalid bulk request".to_string(),
540 }),
541 ),
542 ];
543
544 for (input, index, msg_field, expected) in test_cases {
545 let requests = parse_bulk_request(input, &index, &msg_field);
546 if expected.is_ok() {
547 assert_eq!(requests.unwrap(), expected.unwrap());
548 } else {
549 assert!(requests.is_err());
550 }
551 }
552 }
553}