servers/
elasticsearch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::{Path, Query, State};
20use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
21use axum::response::IntoResponse;
22use axum::Extension;
23use axum_extra::TypedHeader;
24use common_error::ext::ErrorExt;
25use common_telemetry::{debug, error};
26use headers::ContentType;
27use once_cell::sync::Lazy;
28use pipeline::{
29    GreptimePipelineParams, PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
30};
31use serde_json::{json, Deserializer, Value};
32use session::context::{Channel, QueryContext};
33use snafu::{ensure, ResultExt};
34use vrl::value::Value as VrlValue;
35
36use crate::error::{
37    status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu,
38    Result as ServersResult,
39};
40use crate::http::event::{
41    extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState,
42    PipelineIngestRequest,
43};
44use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
45use crate::metrics::{
46    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
47};
48
49// The headers for every response of Elasticsearch API.
50static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
51    HeaderMap::from_iter([
52        (
53            axum::http::header::CONTENT_TYPE,
54            HeaderValue::from_static("application/json"),
55        ),
56        (
57            HeaderName::from_static("x-elastic-product"),
58            HeaderValue::from_static("Elasticsearch"),
59        ),
60    ])
61});
62
63// The fake version of Elasticsearch and used for `_version` API.
64const ELASTICSEARCH_VERSION: &str = "8.16.0";
65
66// Return fake response for Elasticsearch ping request.
67#[axum_macros::debug_handler]
68pub async fn handle_get_version() -> impl IntoResponse {
69    let body = serde_json::json!({
70        "version": {
71            "number": ELASTICSEARCH_VERSION
72        }
73    });
74    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
75}
76
77// Return fake response for Elasticsearch license request.
78// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-license.html.
79#[axum_macros::debug_handler]
80pub async fn handle_get_license() -> impl IntoResponse {
81    let body = serde_json::json!({
82        "license": {
83            "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
84            "type": "oss",
85            "status": "active",
86            "expiry_date_in_millis": 4891198687000_i64,
87        }
88    });
89    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
90}
91
92/// Process `_bulk` API requests. Only support to create logs.
93/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
94#[axum_macros::debug_handler]
95pub async fn handle_bulk_api(
96    State(log_state): State<LogState>,
97    Query(params): Query<LogIngesterQueryParams>,
98    Extension(query_ctx): Extension<QueryContext>,
99    TypedHeader(_content_type): TypedHeader<ContentType>,
100    headers: HeaderMap,
101    payload: String,
102) -> impl IntoResponse {
103    do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
104}
105
106/// Process `/${index}/_bulk` API requests. Only support to create logs.
107/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
108#[axum_macros::debug_handler]
109pub async fn handle_bulk_api_with_index(
110    State(log_state): State<LogState>,
111    Path(index): Path<String>,
112    Query(params): Query<LogIngesterQueryParams>,
113    Extension(query_ctx): Extension<QueryContext>,
114    TypedHeader(_content_type): TypedHeader<ContentType>,
115    headers: HeaderMap,
116    payload: String,
117) -> impl IntoResponse {
118    do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
119}
120
121async fn do_handle_bulk_api(
122    log_state: LogState,
123    index: Option<String>,
124    params: LogIngesterQueryParams,
125    mut query_ctx: QueryContext,
126    headers: HeaderMap,
127    payload: String,
128) -> impl IntoResponse {
129    let start = Instant::now();
130    debug!(
131        "Received bulk request, params: {:?}, payload: {:?}",
132        params, payload
133    );
134
135    // The `schema` is already set in the query_ctx in auth process.
136    query_ctx.set_channel(Channel::Elasticsearch);
137
138    let db = query_ctx.current_schema();
139
140    // Record the ingestion time histogram.
141    let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
142        .with_label_values(&[&db])
143        .start_timer();
144
145    // If pipeline_name is not provided, use the internal pipeline.
146    let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
147        headers
148            .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
149            .and_then(|v| v.to_str().ok())
150            .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
151    });
152
153    // Read the ndjson payload and convert it to a vector of Value.
154    let requests = match parse_bulk_request(&payload, &index, &params.msg_field) {
155        Ok(requests) => requests,
156        Err(e) => {
157            return (
158                StatusCode::BAD_REQUEST,
159                elasticsearch_headers(),
160                axum::Json(write_bulk_response(
161                    start.elapsed().as_millis() as i64,
162                    0,
163                    StatusCode::BAD_REQUEST.as_u16() as u32,
164                    e.to_string().as_str(),
165                )),
166            );
167        }
168    };
169    let log_num = requests.len();
170
171    let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
172        Ok(pipeline) => pipeline,
173        Err(e) => {
174            // should be unreachable
175            error!(e; "Failed to ingest logs");
176            return (
177                status_code_to_http_status(&e.status_code()),
178                elasticsearch_headers(),
179                axum::Json(write_bulk_response(
180                    start.elapsed().as_millis() as i64,
181                    0,
182                    e.status_code() as u32,
183                    e.to_string().as_str(),
184                )),
185            );
186        }
187    };
188    let pipeline_params =
189        GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
190    if let Err(e) = ingest_logs_inner(
191        log_state.log_handler,
192        pipeline,
193        requests,
194        Arc::new(query_ctx),
195        pipeline_params,
196    )
197    .await
198    {
199        error!(e; "Failed to ingest logs");
200        return (
201            status_code_to_http_status(&e.status_code()),
202            elasticsearch_headers(),
203            axum::Json(write_bulk_response(
204                start.elapsed().as_millis() as i64,
205                0,
206                e.status_code() as u32,
207                e.to_string().as_str(),
208            )),
209        );
210    }
211
212    // Record the number of documents ingested.
213    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
214        .with_label_values(&[&db])
215        .inc_by(log_num as u64);
216
217    (
218        StatusCode::OK,
219        elasticsearch_headers(),
220        axum::Json(write_bulk_response(
221            start.elapsed().as_millis() as i64,
222            log_num,
223            StatusCode::CREATED.as_u16() as u32,
224            "",
225        )),
226    )
227}
228
229// It will generate the following response when write _bulk request to GreptimeDB successfully:
230// {
231//     "took": 1000,
232//     "errors": false,
233//     "items": [
234//         { "create": { "status": 201 } },
235//         { "create": { "status": 201 } },
236//         ...
237//     ]
238// }
239// If the status code is not 201, it will generate the following response:
240// {
241//     "took": 1000,
242//     "errors": true,
243//     "items": [
244//         { "create": { "status": 400, "error": { "type": "illegal_argument_exception", "reason": "<error_reason>" } } }
245//     ]
246// }
247fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
248    if error_reason.is_empty() {
249        let items: Vec<Value> = (0..n)
250            .map(|_| {
251                json!({
252                    "create": {
253                        "status": status_code
254                    }
255                })
256            })
257            .collect();
258        json!({
259            "took": took_ms,
260            "errors": false,
261            "items": items,
262        })
263    } else {
264        json!({
265            "took": took_ms,
266            "errors": true,
267            "items": [
268                { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
269            ]
270        })
271    }
272}
273
274/// Returns the headers for every response of Elasticsearch API.
275pub fn elasticsearch_headers() -> HeaderMap {
276    ELASTICSEARCH_HEADERS.clone()
277}
278
279// Parse the Elasticsearch bulk request and convert it to multiple LogIngestRequests.
280// The input will be Elasticsearch bulk request in NDJSON format.
281// For example, the input will be like this:
282// { "index" : { "_index" : "test", "_id" : "1" } }
283// { "field1" : "value1" }
284// { "index" : { "_index" : "test", "_id" : "2" } }
285// { "field2" : "value2" }
286fn parse_bulk_request(
287    input: &str,
288    index_from_url: &Option<String>,
289    msg_field: &Option<String>,
290) -> ServersResult<Vec<PipelineIngestRequest>> {
291    // Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
292    let values: Vec<VrlValue> = Deserializer::from_str(input)
293        .into_iter::<VrlValue>()
294        .collect::<Result<_, _>>()
295        .context(ParseJsonSnafu)?;
296
297    // Check if the input is empty.
298    ensure!(
299        !values.is_empty(),
300        InvalidElasticsearchInputSnafu {
301            reason: "empty bulk request".to_string(),
302        }
303    );
304
305    let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
306    let mut values = values.into_iter();
307
308    // Read the ndjson payload and convert it to a (index, value) vector.
309    // For Elasticsearch post `_bulk` API, each chunk contains two objects:
310    //   1. The first object is the command, it should be `create` or `index`.
311    //   2. The second object is the document data.
312    while let Some(cmd) = values.next() {
313        // NOTE: Although the native Elasticsearch API supports upsert in `index` command, we don't support change any data in `index` command and it's same as `create` command.
314        let mut cmd = cmd.into_object();
315        let index = if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("create")) {
316            get_index_from_cmd(cmd)?
317        } else if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("index")) {
318            get_index_from_cmd(cmd)?
319        } else {
320            return InvalidElasticsearchInputSnafu {
321                reason: format!(
322                    "invalid bulk request, expected 'create' or 'index' but got {:?}",
323                    cmd
324                ),
325            }
326            .fail();
327        };
328
329        // Read the second object to get the document data. Stop the loop if there is no document.
330        if let Some(document) = values.next() {
331            // If the msg_field is provided, fetch the value of the field from the document data.
332            let log_value = if let Some(msg_field) = msg_field {
333                get_log_value_from_msg_field(document, msg_field)
334            } else {
335                document
336            };
337
338            ensure!(
339                index.is_some() || index_from_url.is_some(),
340                InvalidElasticsearchInputSnafu {
341                    reason: "missing index in bulk request".to_string(),
342                }
343            );
344
345            requests.push(PipelineIngestRequest {
346                table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
347                values: vec![log_value],
348            });
349        }
350    }
351
352    debug!(
353        "Received {} log ingest requests: {:?}",
354        requests.len(),
355        requests
356    );
357
358    Ok(requests)
359}
360
361// Get the index from the command. We will take index as the table name in GreptimeDB.
362fn get_index_from_cmd(v: VrlValue) -> ServersResult<Option<String>> {
363    let Some(index) = v.into_object().and_then(|mut m| m.remove("_index")) else {
364        return Ok(None);
365    };
366
367    if let VrlValue::Bytes(index) = index {
368        Ok(Some(String::from_utf8_lossy(&index).to_string()))
369    } else {
370        // If the `_index` exists, it should be a string.
371        InvalidElasticsearchInputSnafu {
372            reason: "index is not a string in bulk request",
373        }
374        .fail()
375    }
376}
377
378// If the msg_field is provided, fetch the value of the field from the document data.
379// For example, if the `msg_field` is `message`, and the document data is `{"message":"hello"}`, the log value will be Value::String("hello").
380fn get_log_value_from_msg_field(v: VrlValue, msg_field: &str) -> VrlValue {
381    let VrlValue::Object(mut m) = v else {
382        return v;
383    };
384
385    if let Some(message) = m.remove(msg_field) {
386        match message {
387            VrlValue::Bytes(bytes) => {
388                match serde_json::from_slice::<VrlValue>(&bytes) {
389                    Ok(v) => v,
390                    // If the message is not a valid JSON, return a map with the original message key and value.
391                    Err(_) => {
392                        let map = BTreeMap::from([(
393                            msg_field.to_string().into(),
394                            VrlValue::Bytes(bytes),
395                        )]);
396                        VrlValue::Object(map)
397                    }
398                }
399            }
400            // If the message is not a string, just use the original message as the log value.
401            _ => message,
402        }
403    } else {
404        // If the msg_field is not found, just use the original message as the log value.
405        VrlValue::Object(m)
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    #[test]
414    fn test_parse_bulk_request() {
415        let test_cases = vec![
416            // Normal case.
417            (
418                r#"
419                {"create":{"_index":"test","_id":"1"}}
420                {"foo1":"foo1_value", "bar1":"bar1_value"}
421                {"create":{"_index":"test","_id":"2"}}
422                {"foo2":"foo2_value","bar2":"bar2_value"}
423                "#,
424                None,
425                None,
426                Ok(vec![
427                    PipelineIngestRequest {
428                        table: "test".to_string(),
429                        values: vec![
430                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
431                        ],
432                    },
433                    PipelineIngestRequest {
434                        table: "test".to_string(),
435                        values: vec![
436                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
437                        ],
438                    },
439                ]),
440            ),
441            // Case with index.
442            (
443                r#"
444                {"create":{"_index":"test","_id":"1"}}
445                {"foo1":"foo1_value", "bar1":"bar1_value"}
446                {"create":{"_index":"logs","_id":"2"}}
447                {"foo2":"foo2_value","bar2":"bar2_value"}
448                "#,
449                Some("logs".to_string()),
450                None,
451                Ok(vec![
452                    PipelineIngestRequest {
453                        table: "test".to_string(),
454                        values: vec![
455                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
456                        ],
457                    },
458                    PipelineIngestRequest {
459                        table: "logs".to_string(),
460                        values: vec![
461                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
462                        ],
463                    },
464                ]),
465            ),
466            // Case with index.
467            (
468                r#"
469                {"create":{"_index":"test","_id":"1"}}
470                {"foo1":"foo1_value", "bar1":"bar1_value"}
471                {"create":{"_index":"logs","_id":"2"}}
472                {"foo2":"foo2_value","bar2":"bar2_value"}
473                "#,
474                Some("logs".to_string()),
475                None,
476                Ok(vec![
477                    PipelineIngestRequest {
478                        table: "test".to_string(),
479                        values: vec![
480                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
481                        ],
482                    },
483                    PipelineIngestRequest {
484                        table: "logs".to_string(),
485                        values: vec![
486                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
487                        ],
488                    },
489                ]),
490            ),
491            // Case with incomplete bulk request.
492            (
493                r#"
494                {"create":{"_index":"test","_id":"1"}}
495                {"foo1":"foo1_value", "bar1":"bar1_value"}
496                {"create":{"_index":"logs","_id":"2"}}
497                "#,
498                Some("logs".to_string()),
499                None,
500                Ok(vec![
501                    PipelineIngestRequest {
502                        table: "test".to_string(),
503                        values: vec![
504                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
505                        ],
506                    },
507                ]),
508            ),
509            // Specify the `data` field as the message field and the value is a JSON string.
510            (
511                r#"
512                {"create":{"_index":"test","_id":"1"}}
513                {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
514                {"create":{"_index":"test","_id":"2"}}
515                {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
516                "#,
517                None,
518                Some("data".to_string()),
519                Ok(vec![
520                    PipelineIngestRequest {
521                        table: "test".to_string(),
522                        values: vec![
523                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
524                        ],
525                    },
526                    PipelineIngestRequest {
527                        table: "test".to_string(),
528                        values: vec![
529                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
530                        ],
531                    },
532                ]),
533            ),
534            // Simulate the log data from Logstash.
535            (
536                r#"
537                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
538                {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
539                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
540                {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
541                "#,
542                None,
543                Some("message".to_string()),
544                Ok(vec![
545                    PipelineIngestRequest {
546                        table: "logs-generic-default".to_string(),
547                        values: vec![
548                            json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""}).into(),
549                        ],
550                    },
551                    PipelineIngestRequest {
552                        table: "logs-generic-default".to_string(),
553                        values: vec![
554                            json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""}).into(),
555                        ],
556                    },
557                ]),
558            ),
559            // With invalid bulk request.
560            (
561                r#"
562                { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
563                { "foo1" : "foo1_value", "bar1" : "bar1_value" }
564                "#,
565                None,
566                None,
567                Err(InvalidElasticsearchInputSnafu {
568                    reason: "it's a invalid bulk request".to_string(),
569                }),
570            ),
571        ];
572
573        for (input, index, msg_field, expected) in test_cases {
574            let requests = parse_bulk_request(input, &index, &msg_field);
575            if expected.is_ok() {
576                assert_eq!(requests.unwrap(), expected.unwrap());
577            } else {
578                assert!(requests.is_err());
579            }
580        }
581    }
582}