servers/
elasticsearch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use axum::extract::{Path, Query, State};
19use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
20use axum::response::IntoResponse;
21use axum::Extension;
22use axum_extra::TypedHeader;
23use common_error::ext::ErrorExt;
24use common_telemetry::{debug, error};
25use headers::ContentType;
26use once_cell::sync::Lazy;
27use pipeline::{
28    GreptimePipelineParams, PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
29};
30use serde_json::{json, Deserializer, Value};
31use session::context::{Channel, QueryContext};
32use snafu::{ensure, ResultExt};
33
34use crate::error::{
35    status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, PipelineSnafu,
36    Result as ServersResult,
37};
38use crate::http::event::{
39    extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState,
40    PipelineIngestRequest,
41};
42use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
43use crate::metrics::{
44    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
45};
46
47// The headers for every response of Elasticsearch API.
48static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
49    HeaderMap::from_iter([
50        (
51            axum::http::header::CONTENT_TYPE,
52            HeaderValue::from_static("application/json"),
53        ),
54        (
55            HeaderName::from_static("x-elastic-product"),
56            HeaderValue::from_static("Elasticsearch"),
57        ),
58    ])
59});
60
61// The fake version of Elasticsearch and used for `_version` API.
62const ELASTICSEARCH_VERSION: &str = "8.16.0";
63
64// Return fake response for Elasticsearch ping request.
65#[axum_macros::debug_handler]
66pub async fn handle_get_version() -> impl IntoResponse {
67    let body = serde_json::json!({
68        "version": {
69            "number": ELASTICSEARCH_VERSION
70        }
71    });
72    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
73}
74
75// Return fake response for Elasticsearch license request.
76// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-license.html.
77#[axum_macros::debug_handler]
78pub async fn handle_get_license() -> impl IntoResponse {
79    let body = serde_json::json!({
80        "license": {
81            "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
82            "type": "oss",
83            "status": "active",
84            "expiry_date_in_millis": 4891198687000_i64,
85        }
86    });
87    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
88}
89
90/// Process `_bulk` API requests. Only support to create logs.
91/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
92#[axum_macros::debug_handler]
93pub async fn handle_bulk_api(
94    State(log_state): State<LogState>,
95    Query(params): Query<LogIngesterQueryParams>,
96    Extension(query_ctx): Extension<QueryContext>,
97    TypedHeader(_content_type): TypedHeader<ContentType>,
98    headers: HeaderMap,
99    payload: String,
100) -> impl IntoResponse {
101    do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
102}
103
104/// Process `/${index}/_bulk` API requests. Only support to create logs.
105/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
106#[axum_macros::debug_handler]
107pub async fn handle_bulk_api_with_index(
108    State(log_state): State<LogState>,
109    Path(index): Path<String>,
110    Query(params): Query<LogIngesterQueryParams>,
111    Extension(query_ctx): Extension<QueryContext>,
112    TypedHeader(_content_type): TypedHeader<ContentType>,
113    headers: HeaderMap,
114    payload: String,
115) -> impl IntoResponse {
116    do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
117}
118
119async fn do_handle_bulk_api(
120    log_state: LogState,
121    index: Option<String>,
122    params: LogIngesterQueryParams,
123    mut query_ctx: QueryContext,
124    headers: HeaderMap,
125    payload: String,
126) -> impl IntoResponse {
127    let start = Instant::now();
128    debug!(
129        "Received bulk request, params: {:?}, payload: {:?}",
130        params, payload
131    );
132
133    // The `schema` is already set in the query_ctx in auth process.
134    query_ctx.set_channel(Channel::Elasticsearch);
135
136    let db = query_ctx.current_schema();
137
138    // Record the ingestion time histogram.
139    let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
140        .with_label_values(&[&db])
141        .start_timer();
142
143    // If pipeline_name is not provided, use the internal pipeline.
144    let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
145        headers
146            .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
147            .and_then(|v| v.to_str().ok())
148            .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
149    });
150
151    // Read the ndjson payload and convert it to a vector of Value.
152    let requests = match parse_bulk_request(&payload, &index, &params.msg_field) {
153        Ok(requests) => requests,
154        Err(e) => {
155            return (
156                StatusCode::BAD_REQUEST,
157                elasticsearch_headers(),
158                axum::Json(write_bulk_response(
159                    start.elapsed().as_millis() as i64,
160                    0,
161                    StatusCode::BAD_REQUEST.as_u16() as u32,
162                    e.to_string().as_str(),
163                )),
164            );
165        }
166    };
167    let log_num = requests.len();
168
169    let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
170        Ok(pipeline) => pipeline,
171        Err(e) => {
172            // should be unreachable
173            error!(e; "Failed to ingest logs");
174            return (
175                status_code_to_http_status(&e.status_code()),
176                elasticsearch_headers(),
177                axum::Json(write_bulk_response(
178                    start.elapsed().as_millis() as i64,
179                    0,
180                    e.status_code() as u32,
181                    e.to_string().as_str(),
182                )),
183            );
184        }
185    };
186    let pipeline_params =
187        GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
188    if let Err(e) = ingest_logs_inner(
189        log_state.log_handler,
190        pipeline,
191        requests,
192        Arc::new(query_ctx),
193        pipeline_params,
194    )
195    .await
196    {
197        error!(e; "Failed to ingest logs");
198        return (
199            status_code_to_http_status(&e.status_code()),
200            elasticsearch_headers(),
201            axum::Json(write_bulk_response(
202                start.elapsed().as_millis() as i64,
203                0,
204                e.status_code() as u32,
205                e.to_string().as_str(),
206            )),
207        );
208    }
209
210    // Record the number of documents ingested.
211    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
212        .with_label_values(&[&db])
213        .inc_by(log_num as u64);
214
215    (
216        StatusCode::OK,
217        elasticsearch_headers(),
218        axum::Json(write_bulk_response(
219            start.elapsed().as_millis() as i64,
220            log_num,
221            StatusCode::CREATED.as_u16() as u32,
222            "",
223        )),
224    )
225}
226
227// It will generate the following response when write _bulk request to GreptimeDB successfully:
228// {
229//     "took": 1000,
230//     "errors": false,
231//     "items": [
232//         { "create": { "status": 201 } },
233//         { "create": { "status": 201 } },
234//         ...
235//     ]
236// }
237// If the status code is not 201, it will generate the following response:
238// {
239//     "took": 1000,
240//     "errors": true,
241//     "items": [
242//         { "create": { "status": 400, "error": { "type": "illegal_argument_exception", "reason": "<error_reason>" } } }
243//     ]
244// }
245fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
246    if error_reason.is_empty() {
247        let items: Vec<Value> = (0..n)
248            .map(|_| {
249                json!({
250                    "create": {
251                        "status": status_code
252                    }
253                })
254            })
255            .collect();
256        json!({
257            "took": took_ms,
258            "errors": false,
259            "items": items,
260        })
261    } else {
262        json!({
263            "took": took_ms,
264            "errors": true,
265            "items": [
266                { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
267            ]
268        })
269    }
270}
271
272/// Returns the headers for every response of Elasticsearch API.
273pub fn elasticsearch_headers() -> HeaderMap {
274    ELASTICSEARCH_HEADERS.clone()
275}
276
277// Parse the Elasticsearch bulk request and convert it to multiple LogIngestRequests.
278// The input will be Elasticsearch bulk request in NDJSON format.
279// For example, the input will be like this:
280// { "index" : { "_index" : "test", "_id" : "1" } }
281// { "field1" : "value1" }
282// { "index" : { "_index" : "test", "_id" : "2" } }
283// { "field2" : "value2" }
284fn parse_bulk_request(
285    input: &str,
286    index_from_url: &Option<String>,
287    msg_field: &Option<String>,
288) -> ServersResult<Vec<PipelineIngestRequest>> {
289    // Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
290    let values: Vec<Value> = Deserializer::from_str(input)
291        .into_iter::<Value>()
292        .collect::<Result<_, _>>()
293        .context(ParseJsonSnafu)?;
294
295    // Check if the input is empty.
296    ensure!(
297        !values.is_empty(),
298        InvalidElasticsearchInputSnafu {
299            reason: "empty bulk request".to_string(),
300        }
301    );
302
303    let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
304    let mut values = values.into_iter();
305
306    // Read the ndjson payload and convert it to a (index, value) vector.
307    // For Elasticsearch post `_bulk` API, each chunk contains two objects:
308    //   1. The first object is the command, it should be `create` or `index`.
309    //   2. The second object is the document data.
310    while let Some(mut cmd) = values.next() {
311        // NOTE: Although the native Elasticsearch API supports upsert in `index` command, we don't support change any data in `index` command and it's same as `create` command.
312        let index = if let Some(cmd) = cmd.get_mut("create") {
313            get_index_from_cmd(cmd.take())?
314        } else if let Some(cmd) = cmd.get_mut("index") {
315            get_index_from_cmd(cmd.take())?
316        } else {
317            return InvalidElasticsearchInputSnafu {
318                reason: format!(
319                    "invalid bulk request, expected 'create' or 'index' but got {:?}",
320                    cmd
321                ),
322            }
323            .fail();
324        };
325
326        // Read the second object to get the document data. Stop the loop if there is no document.
327        if let Some(document) = values.next() {
328            // If the msg_field is provided, fetch the value of the field from the document data.
329            let log_value = if let Some(msg_field) = msg_field {
330                get_log_value_from_msg_field(document, msg_field)
331            } else {
332                document
333            };
334
335            ensure!(
336                index.is_some() || index_from_url.is_some(),
337                InvalidElasticsearchInputSnafu {
338                    reason: "missing index in bulk request".to_string(),
339                }
340            );
341
342            let log_value = pipeline::json_to_map(log_value).context(PipelineSnafu)?;
343            requests.push(PipelineIngestRequest {
344                table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
345                values: vec![log_value],
346            });
347        }
348    }
349
350    debug!(
351        "Received {} log ingest requests: {:?}",
352        requests.len(),
353        requests
354    );
355
356    Ok(requests)
357}
358
359// Get the index from the command. We will take index as the table name in GreptimeDB.
360fn get_index_from_cmd(mut v: Value) -> ServersResult<Option<String>> {
361    if let Some(index) = v.get_mut("_index") {
362        if let Value::String(index) = index.take() {
363            Ok(Some(index))
364        } else {
365            // If the `_index` exists, it should be a string.
366            InvalidElasticsearchInputSnafu {
367                reason: "index is not a string in bulk request".to_string(),
368            }
369            .fail()
370        }
371    } else {
372        Ok(None)
373    }
374}
375
376// If the msg_field is provided, fetch the value of the field from the document data.
377// For example, if the `msg_field` is `message`, and the document data is `{"message":"hello"}`, the log value will be Value::String("hello").
378fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
379    if let Some(message) = v.get_mut(msg_field) {
380        let message = message.take();
381        match message {
382            Value::String(s) => match serde_json::from_str::<Value>(&s) {
383                Ok(s) => s,
384                // If the message is not a valid JSON, return a map with the original message key and value.
385                Err(_) => json!({msg_field: s}),
386            },
387            // If the message is not a string, just use the original message as the log value.
388            _ => message,
389        }
390    } else {
391        // If the msg_field is not found, just use the original message as the log value.
392        v
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    #[test]
401    fn test_parse_bulk_request() {
402        let test_cases = vec![
403            // Normal case.
404            (
405                r#"
406                {"create":{"_index":"test","_id":"1"}}
407                {"foo1":"foo1_value", "bar1":"bar1_value"}
408                {"create":{"_index":"test","_id":"2"}}
409                {"foo2":"foo2_value","bar2":"bar2_value"}
410                "#,
411                None,
412                None,
413                Ok(vec![
414                    PipelineIngestRequest {
415                        table: "test".to_string(),
416                        values: vec![
417                            pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap(),
418                        ],
419                    },
420                    PipelineIngestRequest {
421                        table: "test".to_string(),
422                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
423                    },
424                ]),
425            ),
426            // Case with index.
427            (
428                r#"
429                {"create":{"_index":"test","_id":"1"}}
430                {"foo1":"foo1_value", "bar1":"bar1_value"}
431                {"create":{"_index":"logs","_id":"2"}}
432                {"foo2":"foo2_value","bar2":"bar2_value"}
433                "#,
434                Some("logs".to_string()),
435                None,
436                Ok(vec![
437                    PipelineIngestRequest {
438                        table: "test".to_string(),
439                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
440                    },
441                    PipelineIngestRequest {
442                        table: "logs".to_string(),
443                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
444                    },
445                ]),
446            ),
447            // Case with index.
448            (
449                r#"
450                {"create":{"_index":"test","_id":"1"}}
451                {"foo1":"foo1_value", "bar1":"bar1_value"}
452                {"create":{"_index":"logs","_id":"2"}}
453                {"foo2":"foo2_value","bar2":"bar2_value"}
454                "#,
455                Some("logs".to_string()),
456                None,
457                Ok(vec![
458                    PipelineIngestRequest {
459                        table: "test".to_string(),
460                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
461                    },
462                    PipelineIngestRequest {
463                        table: "logs".to_string(),
464                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
465                    },
466                ]),
467            ),
468            // Case with incomplete bulk request.
469            (
470                r#"
471                {"create":{"_index":"test","_id":"1"}}
472                {"foo1":"foo1_value", "bar1":"bar1_value"}
473                {"create":{"_index":"logs","_id":"2"}}
474                "#,
475                Some("logs".to_string()),
476                None,
477                Ok(vec![
478                    PipelineIngestRequest {
479                        table: "test".to_string(),
480                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
481                    },
482                ]),
483            ),
484            // Specify the `data` field as the message field and the value is a JSON string.
485            (
486                r#"
487                {"create":{"_index":"test","_id":"1"}}
488                {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
489                {"create":{"_index":"test","_id":"2"}}
490                {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
491                "#,
492                None,
493                Some("data".to_string()),
494                Ok(vec![
495                    PipelineIngestRequest {
496                        table: "test".to_string(),
497                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
498                    },
499                    PipelineIngestRequest {
500                        table: "test".to_string(),
501                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
502                    },
503                ]),
504            ),
505            // Simulate the log data from Logstash.
506            (
507                r#"
508                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
509                {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
510                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
511                {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
512                "#,
513                None,
514                Some("message".to_string()),
515                Ok(vec![
516                    PipelineIngestRequest {
517                        table: "logs-generic-default".to_string(),
518                        values: vec![
519                            pipeline::json_to_map(json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""})).unwrap(),
520                        ],
521                    },
522                    PipelineIngestRequest {
523                        table: "logs-generic-default".to_string(),
524                        values: vec![
525                            pipeline::json_to_map(json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""})).unwrap(),
526                        ],
527                    },
528                ]),
529            ),
530            // With invalid bulk request.
531            (
532                r#"
533                { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
534                { "foo1" : "foo1_value", "bar1" : "bar1_value" }
535                "#,
536                None,
537                None,
538                Err(InvalidElasticsearchInputSnafu {
539                    reason: "it's a invalid bulk request".to_string(),
540                }),
541            ),
542        ];
543
544        for (input, index, msg_field, expected) in test_cases {
545            let requests = parse_bulk_request(input, &index, &msg_field);
546            if expected.is_ok() {
547                assert_eq!(requests.unwrap(), expected.unwrap());
548            } else {
549                assert!(requests.is_err());
550            }
551        }
552    }
553}