use std::sync::Arc;
use std::time::Instant;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use axum::response::IntoResponse;
use axum::Extension;
use axum_extra::TypedHeader;
use common_error::ext::ErrorExt;
use common_telemetry::{debug, error};
use headers::ContentType;
use once_cell::sync::Lazy;
use pipeline::{PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME};
use serde_json::{json, Deserializer, Value};
use session::context::{Channel, QueryContext};
use snafu::{ensure, ResultExt};
use crate::error::{
status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, PipelineSnafu,
Result as ServersResult,
};
use crate::http::event::{ingest_logs_inner, LogIngestRequest, LogIngesterQueryParams, LogState};
use crate::metrics::{
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
};
static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
HeaderMap::from_iter([
(
axum::http::header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
),
(
HeaderName::from_static("x-elastic-product"),
HeaderValue::from_static("Elasticsearch"),
),
])
});
const ELASTICSEARCH_VERSION: &str = "8.16.0";
#[axum_macros::debug_handler]
pub async fn handle_get_version() -> impl IntoResponse {
let body = serde_json::json!({
"version": {
"number": ELASTICSEARCH_VERSION
}
});
(StatusCode::OK, elasticsearch_headers(), axum::Json(body))
}
#[axum_macros::debug_handler]
pub async fn handle_get_license() -> impl IntoResponse {
let body = serde_json::json!({
"license": {
"uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
"type": "oss",
"status": "active",
"expiry_date_in_millis": 4891198687000_i64,
}
});
(StatusCode::OK, elasticsearch_headers(), axum::Json(body))
}
#[axum_macros::debug_handler]
pub async fn handle_bulk_api(
State(log_state): State<LogState>,
Query(params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContext>,
TypedHeader(_content_type): TypedHeader<ContentType>,
headers: HeaderMap,
payload: String,
) -> impl IntoResponse {
do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
}
#[axum_macros::debug_handler]
pub async fn handle_bulk_api_with_index(
State(log_state): State<LogState>,
Path(index): Path<String>,
Query(params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContext>,
TypedHeader(_content_type): TypedHeader<ContentType>,
headers: HeaderMap,
payload: String,
) -> impl IntoResponse {
do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
}
async fn do_handle_bulk_api(
log_state: LogState,
index: Option<String>,
params: LogIngesterQueryParams,
mut query_ctx: QueryContext,
headers: HeaderMap,
payload: String,
) -> impl IntoResponse {
let start = Instant::now();
debug!(
"Received bulk request, params: {:?}, payload: {:?}",
params, payload
);
query_ctx.set_channel(Channel::Elasticsearch);
let db = params.db.unwrap_or_else(|| "public".to_string());
let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
.with_label_values(&[&db])
.start_timer();
let pipeline_name = if let Some(pipeline) = params.pipeline_name {
pipeline
} else {
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string()
};
let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) {
Ok(requests) => requests,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
elasticsearch_headers(),
axum::Json(write_bulk_response(
start.elapsed().as_millis() as i64,
0,
StatusCode::BAD_REQUEST.as_u16() as u32,
e.to_string().as_str(),
)),
);
}
};
let log_num = requests.len();
let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) {
Ok(pipeline) => pipeline,
Err(e) => {
error!(e; "Failed to ingest logs");
return (
status_code_to_http_status(&e.status_code()),
elasticsearch_headers(),
axum::Json(write_bulk_response(
start.elapsed().as_millis() as i64,
0,
e.status_code() as u32,
e.to_string().as_str(),
)),
);
}
};
if let Err(e) = ingest_logs_inner(
log_state.log_handler,
pipeline,
requests,
Arc::new(query_ctx),
headers,
)
.await
{
error!(e; "Failed to ingest logs");
return (
status_code_to_http_status(&e.status_code()),
elasticsearch_headers(),
axum::Json(write_bulk_response(
start.elapsed().as_millis() as i64,
0,
e.status_code() as u32,
e.to_string().as_str(),
)),
);
}
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
.with_label_values(&[&db])
.inc_by(log_num as u64);
(
StatusCode::OK,
elasticsearch_headers(),
axum::Json(write_bulk_response(
start.elapsed().as_millis() as i64,
log_num,
StatusCode::CREATED.as_u16() as u32,
"",
)),
)
}
fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
if error_reason.is_empty() {
let items: Vec<Value> = (0..n)
.map(|_| {
json!({
"create": {
"status": status_code
}
})
})
.collect();
json!({
"took": took_ms,
"errors": false,
"items": items,
})
} else {
json!({
"took": took_ms,
"errors": true,
"items": [
{ "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
]
})
}
}
pub fn elasticsearch_headers() -> HeaderMap {
ELASTICSEARCH_HEADERS.clone()
}
fn parse_bulk_request(
input: &str,
index_from_url: &Option<String>,
msg_field: &Option<String>,
) -> ServersResult<Vec<LogIngestRequest>> {
let values: Vec<Value> = Deserializer::from_str(input)
.into_iter::<Value>()
.collect::<Result<_, _>>()
.context(ParseJsonSnafu)?;
ensure!(
!values.is_empty(),
InvalidElasticsearchInputSnafu {
reason: "empty bulk request".to_string(),
}
);
let mut requests: Vec<LogIngestRequest> = Vec::with_capacity(values.len() / 2);
let mut values = values.into_iter();
while let Some(mut cmd) = values.next() {
let index = if let Some(cmd) = cmd.get_mut("create") {
get_index_from_cmd(cmd.take())?
} else if let Some(cmd) = cmd.get_mut("index") {
get_index_from_cmd(cmd.take())?
} else {
return InvalidElasticsearchInputSnafu {
reason: format!(
"invalid bulk request, expected 'create' or 'index' but got {:?}",
cmd
),
}
.fail();
};
if let Some(document) = values.next() {
let log_value = if let Some(msg_field) = msg_field {
get_log_value_from_msg_field(document, msg_field)
} else {
document
};
ensure!(
index.is_some() || index_from_url.is_some(),
InvalidElasticsearchInputSnafu {
reason: "missing index in bulk request".to_string(),
}
);
let log_value = pipeline::json_to_map(log_value).context(PipelineSnafu)?;
requests.push(LogIngestRequest {
table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
values: vec![log_value],
});
}
}
debug!(
"Received {} log ingest requests: {:?}",
requests.len(),
requests
);
Ok(requests)
}
fn get_index_from_cmd(mut v: Value) -> ServersResult<Option<String>> {
if let Some(index) = v.get_mut("_index") {
if let Value::String(index) = index.take() {
Ok(Some(index))
} else {
InvalidElasticsearchInputSnafu {
reason: "index is not a string in bulk request".to_string(),
}
.fail()
}
} else {
Ok(None)
}
}
fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
if let Some(message) = v.get_mut(msg_field) {
let message = message.take();
match message {
Value::String(s) => match serde_json::from_str::<Value>(&s) {
Ok(s) => s,
Err(_) => json!({msg_field: s}),
},
_ => message,
}
} else {
v
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_bulk_request() {
let test_cases = vec![
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"foo1":"foo1_value", "bar1":"bar1_value"}
{"create":{"_index":"test","_id":"2"}}
{"foo2":"foo2_value","bar2":"bar2_value"}
"#,
None,
None,
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![
pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap(),
],
},
LogIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
]),
),
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"foo1":"foo1_value", "bar1":"bar1_value"}
{"create":{"_index":"logs","_id":"2"}}
{"foo2":"foo2_value","bar2":"bar2_value"}
"#,
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
LogIngestRequest {
table: "logs".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
]),
),
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"foo1":"foo1_value", "bar1":"bar1_value"}
{"create":{"_index":"logs","_id":"2"}}
{"foo2":"foo2_value","bar2":"bar2_value"}
"#,
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
LogIngestRequest {
table: "logs".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
]),
),
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"foo1":"foo1_value", "bar1":"bar1_value"}
{"create":{"_index":"logs","_id":"2"}}
"#,
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
]),
),
(
r#"
{"create":{"_index":"test","_id":"1"}}
{"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
{"create":{"_index":"test","_id":"2"}}
{"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
"#,
None,
Some("data".to_string()),
Ok(vec![
LogIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
LogIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
]),
),
(
r#"
{"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
{"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
{"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
{"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
"#,
None,
Some("message".to_string()),
Ok(vec![
LogIngestRequest {
table: "logs-generic-default".to_string(),
values: vec![
pipeline::json_to_map(json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""})).unwrap(),
],
},
LogIngestRequest {
table: "logs-generic-default".to_string(),
values: vec![
pipeline::json_to_map(json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""})).unwrap(),
],
},
]),
),
(
r#"
{ "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
{ "foo1" : "foo1_value", "bar1" : "bar1_value" }
"#,
None,
None,
Err(InvalidElasticsearchInputSnafu {
reason: "it's a invalid bulk request".to_string(),
}),
),
];
for (input, index, msg_field, expected) in test_cases {
let requests = parse_bulk_request(input, &index, &msg_field);
if expected.is_ok() {
assert_eq!(requests.unwrap(), expected.unwrap());
} else {
assert!(requests.is_err());
}
}
}
}