Skip to main content

servers/
elasticsearch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::Extension;
20use axum::extract::{Path, Query, State};
21use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
22use axum::response::IntoResponse;
23use axum_extra::TypedHeader;
24use common_error::ext::ErrorExt;
25use common_telemetry::{debug, error};
26use headers::ContentType;
27use once_cell::sync::Lazy;
28use pipeline::{
29    GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GreptimePipelineParams, PipelineDefinition,
30};
31use serde_json::{Deserializer, Value, json};
32use session::context::{Channel, QueryContext};
33use snafu::{ResultExt, ensure};
34use table::requests::{
35    SEMANTIC_SIGNAL_TYPE, SEMANTIC_SOURCE, SIGNAL_TYPE_LOG, SOURCE_ELASTICSEARCH,
36};
37use vrl::value::Value as VrlValue;
38
39use crate::error::{
40    InvalidElasticsearchInputSnafu, ParseJsonSnafu, Result as ServersResult,
41    status_code_to_http_status,
42};
43use crate::http::event::{
44    LogIngesterQueryParams, LogState, PipelineIngestRequest,
45    extract_pipeline_params_map_from_headers, ingest_logs_inner,
46};
47use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
48use crate::metrics::{
49    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
50};
51
52// The headers for every response of Elasticsearch API.
53static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
54    HeaderMap::from_iter([
55        (
56            axum::http::header::CONTENT_TYPE,
57            HeaderValue::from_static("application/json"),
58        ),
59        (
60            HeaderName::from_static("x-elastic-product"),
61            HeaderValue::from_static("Elasticsearch"),
62        ),
63    ])
64});
65
66// The fake version of Elasticsearch and used for `_version` API.
67const ELASTICSEARCH_VERSION: &str = "8.16.0";
68
69// Return fake response for Elasticsearch ping request.
70#[axum_macros::debug_handler]
71pub async fn handle_get_version() -> impl IntoResponse {
72    let body = serde_json::json!({
73        "version": {
74            "number": ELASTICSEARCH_VERSION
75        }
76    });
77    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
78}
79
80// Return fake response for Elasticsearch license request.
81// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-license.html.
82#[axum_macros::debug_handler]
83pub async fn handle_get_license() -> impl IntoResponse {
84    let body = serde_json::json!({
85        "license": {
86            "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
87            "type": "oss",
88            "status": "active",
89            "expiry_date_in_millis": 4891198687000_i64,
90        }
91    });
92    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
93}
94
95/// Process `_bulk` API requests. Only support to create logs.
96/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
97#[axum_macros::debug_handler]
98pub async fn handle_bulk_api(
99    State(log_state): State<LogState>,
100    Query(params): Query<LogIngesterQueryParams>,
101    Extension(query_ctx): Extension<QueryContext>,
102    TypedHeader(_content_type): TypedHeader<ContentType>,
103    headers: HeaderMap,
104    payload: String,
105) -> impl IntoResponse {
106    do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
107}
108
109/// Process `/${index}/_bulk` API requests. Only support to create logs.
110/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
111#[axum_macros::debug_handler]
112pub async fn handle_bulk_api_with_index(
113    State(log_state): State<LogState>,
114    Path(index): Path<String>,
115    Query(params): Query<LogIngesterQueryParams>,
116    Extension(query_ctx): Extension<QueryContext>,
117    TypedHeader(_content_type): TypedHeader<ContentType>,
118    headers: HeaderMap,
119    payload: String,
120) -> impl IntoResponse {
121    do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
122}
123
124async fn do_handle_bulk_api(
125    log_state: LogState,
126    index: Option<String>,
127    params: LogIngesterQueryParams,
128    mut query_ctx: QueryContext,
129    headers: HeaderMap,
130    payload: String,
131) -> impl IntoResponse {
132    let start = Instant::now();
133    debug!(
134        "Received bulk request, params: {:?}, payload: {:?}",
135        params, payload
136    );
137
138    // The `schema` is already set in the query_ctx in auth process.
139    query_ctx.set_channel(Channel::Elasticsearch);
140    query_ctx.set_extension(SEMANTIC_SIGNAL_TYPE, SIGNAL_TYPE_LOG);
141    query_ctx.set_extension(SEMANTIC_SOURCE, SOURCE_ELASTICSEARCH);
142
143    let db = query_ctx.current_schema();
144
145    // Record the ingestion time histogram.
146    let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
147        .with_label_values(&[&db])
148        .start_timer();
149
150    // If pipeline_name is not provided, use the internal pipeline.
151    let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
152        headers
153            .get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
154            .and_then(|v| v.to_str().ok())
155            .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
156    });
157
158    // Read the ndjson payload and convert it to a vector of Value.
159    let requests = match parse_bulk_request(&payload, &index, &params.msg_field) {
160        Ok(requests) => requests,
161        Err(e) => {
162            return (
163                StatusCode::BAD_REQUEST,
164                elasticsearch_headers(),
165                axum::Json(write_bulk_response(
166                    start.elapsed().as_millis() as i64,
167                    0,
168                    StatusCode::BAD_REQUEST.as_u16() as u32,
169                    e.to_string().as_str(),
170                )),
171            );
172        }
173    };
174    let log_num = requests.len();
175
176    let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
177        Ok(pipeline) => pipeline,
178        Err(e) => {
179            // should be unreachable
180            error!(e; "Failed to ingest logs");
181            return (
182                status_code_to_http_status(&e.status_code()),
183                elasticsearch_headers(),
184                axum::Json(write_bulk_response(
185                    start.elapsed().as_millis() as i64,
186                    0,
187                    e.status_code() as u32,
188                    e.to_string().as_str(),
189                )),
190            );
191        }
192    };
193    let pipeline_params =
194        GreptimePipelineParams::from_map(extract_pipeline_params_map_from_headers(&headers));
195    if let Err(e) = ingest_logs_inner(
196        log_state.log_handler,
197        pipeline,
198        requests,
199        Arc::new(query_ctx),
200        pipeline_params,
201    )
202    .await
203    {
204        error!(e; "Failed to ingest logs");
205        return (
206            status_code_to_http_status(&e.status_code()),
207            elasticsearch_headers(),
208            axum::Json(write_bulk_response(
209                start.elapsed().as_millis() as i64,
210                0,
211                e.status_code() as u32,
212                e.to_string().as_str(),
213            )),
214        );
215    }
216
217    // Record the number of documents ingested.
218    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
219        .with_label_values(&[&db])
220        .inc_by(log_num as u64);
221
222    (
223        StatusCode::OK,
224        elasticsearch_headers(),
225        axum::Json(write_bulk_response(
226            start.elapsed().as_millis() as i64,
227            log_num,
228            StatusCode::CREATED.as_u16() as u32,
229            "",
230        )),
231    )
232}
233
234// It will generate the following response when write _bulk request to GreptimeDB successfully:
235// {
236//     "took": 1000,
237//     "errors": false,
238//     "items": [
239//         { "create": { "status": 201 } },
240//         { "create": { "status": 201 } },
241//         ...
242//     ]
243// }
244// If the status code is not 201, it will generate the following response:
245// {
246//     "took": 1000,
247//     "errors": true,
248//     "items": [
249//         { "create": { "status": 400, "error": { "type": "illegal_argument_exception", "reason": "<error_reason>" } } }
250//     ]
251// }
252fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
253    if error_reason.is_empty() {
254        let items: Vec<Value> = (0..n)
255            .map(|_| {
256                json!({
257                    "create": {
258                        "status": status_code
259                    }
260                })
261            })
262            .collect();
263        json!({
264            "took": took_ms,
265            "errors": false,
266            "items": items,
267        })
268    } else {
269        json!({
270            "took": took_ms,
271            "errors": true,
272            "items": [
273                { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
274            ]
275        })
276    }
277}
278
279/// Returns the headers for every response of Elasticsearch API.
280pub fn elasticsearch_headers() -> HeaderMap {
281    ELASTICSEARCH_HEADERS.clone()
282}
283
284// Parse the Elasticsearch bulk request and convert it to multiple LogIngestRequests.
285// The input will be Elasticsearch bulk request in NDJSON format.
286// For example, the input will be like this:
287// { "index" : { "_index" : "test", "_id" : "1" } }
288// { "field1" : "value1" }
289// { "index" : { "_index" : "test", "_id" : "2" } }
290// { "field2" : "value2" }
291fn parse_bulk_request(
292    input: &str,
293    index_from_url: &Option<String>,
294    msg_field: &Option<String>,
295) -> ServersResult<Vec<PipelineIngestRequest>> {
296    // Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
297    let values: Vec<VrlValue> = Deserializer::from_str(input)
298        .into_iter::<VrlValue>()
299        .collect::<Result<_, _>>()
300        .context(ParseJsonSnafu)?;
301
302    // Check if the input is empty.
303    ensure!(
304        !values.is_empty(),
305        InvalidElasticsearchInputSnafu {
306            reason: "empty bulk request".to_string(),
307        }
308    );
309
310    let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
311    let mut values = values.into_iter();
312
313    // Read the ndjson payload and convert it to a (index, value) vector.
314    // For Elasticsearch post `_bulk` API, each chunk contains two objects:
315    //   1. The first object is the command, it should be `create` or `index`.
316    //   2. The second object is the document data.
317    while let Some(cmd) = values.next() {
318        // NOTE: Although the native Elasticsearch API supports upsert in `index` command, we don't support change any data in `index` command and it's same as `create` command.
319        let mut cmd = cmd.into_object();
320        let index = if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("create")) {
321            get_index_from_cmd(cmd)?
322        } else if let Some(cmd) = cmd.as_mut().and_then(|c| c.remove("index")) {
323            get_index_from_cmd(cmd)?
324        } else {
325            return InvalidElasticsearchInputSnafu {
326                reason: format!(
327                    "invalid bulk request, expected 'create' or 'index' but got {:?}",
328                    cmd
329                ),
330            }
331            .fail();
332        };
333
334        // Read the second object to get the document data. Stop the loop if there is no document.
335        if let Some(document) = values.next() {
336            // If the msg_field is provided, fetch the value of the field from the document data.
337            let log_value = if let Some(msg_field) = msg_field {
338                get_log_value_from_msg_field(document, msg_field)
339            } else {
340                document
341            };
342
343            ensure!(
344                index.is_some() || index_from_url.is_some(),
345                InvalidElasticsearchInputSnafu {
346                    reason: "missing index in bulk request".to_string(),
347                }
348            );
349
350            requests.push(PipelineIngestRequest {
351                table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
352                values: vec![log_value],
353            });
354        }
355    }
356
357    debug!(
358        "Received {} log ingest requests: {:?}",
359        requests.len(),
360        requests
361    );
362
363    Ok(requests)
364}
365
366// Get the index from the command. We will take index as the table name in GreptimeDB.
367fn get_index_from_cmd(v: VrlValue) -> ServersResult<Option<String>> {
368    let Some(index) = v.into_object().and_then(|mut m| m.remove("_index")) else {
369        return Ok(None);
370    };
371
372    if let VrlValue::Bytes(index) = index {
373        Ok(Some(String::from_utf8_lossy(&index).to_string()))
374    } else {
375        // If the `_index` exists, it should be a string.
376        InvalidElasticsearchInputSnafu {
377            reason: "index is not a string in bulk request",
378        }
379        .fail()
380    }
381}
382
383// If the msg_field is provided, fetch the value of the field from the document data.
384// For example, if the `msg_field` is `message`, and the document data is `{"message":"hello"}`, the log value will be Value::String("hello").
385fn get_log_value_from_msg_field(v: VrlValue, msg_field: &str) -> VrlValue {
386    let VrlValue::Object(mut m) = v else {
387        return v;
388    };
389
390    if let Some(message) = m.remove(msg_field) {
391        match message {
392            VrlValue::Bytes(bytes) => {
393                match serde_json::from_slice::<VrlValue>(&bytes) {
394                    Ok(v) => v,
395                    // If the message is not a valid JSON, return a map with the original message key and value.
396                    Err(_) => {
397                        let map = BTreeMap::from([(
398                            msg_field.to_string().into(),
399                            VrlValue::Bytes(bytes),
400                        )]);
401                        VrlValue::Object(map)
402                    }
403                }
404            }
405            // If the message is not a string, just use the original message as the log value.
406            _ => message,
407        }
408    } else {
409        // If the msg_field is not found, just use the original message as the log value.
410        VrlValue::Object(m)
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417
418    #[test]
419    fn test_parse_bulk_request() {
420        let test_cases = vec![
421            // Normal case.
422            (
423                r#"
424                {"create":{"_index":"test","_id":"1"}}
425                {"foo1":"foo1_value", "bar1":"bar1_value"}
426                {"create":{"_index":"test","_id":"2"}}
427                {"foo2":"foo2_value","bar2":"bar2_value"}
428                "#,
429                None,
430                None,
431                Ok(vec![
432                    PipelineIngestRequest {
433                        table: "test".to_string(),
434                        values: vec![
435                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
436                        ],
437                    },
438                    PipelineIngestRequest {
439                        table: "test".to_string(),
440                        values: vec![
441                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
442                        ],
443                    },
444                ]),
445            ),
446            // Case with index.
447            (
448                r#"
449                {"create":{"_index":"test","_id":"1"}}
450                {"foo1":"foo1_value", "bar1":"bar1_value"}
451                {"create":{"_index":"logs","_id":"2"}}
452                {"foo2":"foo2_value","bar2":"bar2_value"}
453                "#,
454                Some("logs".to_string()),
455                None,
456                Ok(vec![
457                    PipelineIngestRequest {
458                        table: "test".to_string(),
459                        values: vec![
460                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
461                        ],
462                    },
463                    PipelineIngestRequest {
464                        table: "logs".to_string(),
465                        values: vec![
466                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
467                        ],
468                    },
469                ]),
470            ),
471            // Case with index.
472            (
473                r#"
474                {"create":{"_index":"test","_id":"1"}}
475                {"foo1":"foo1_value", "bar1":"bar1_value"}
476                {"create":{"_index":"logs","_id":"2"}}
477                {"foo2":"foo2_value","bar2":"bar2_value"}
478                "#,
479                Some("logs".to_string()),
480                None,
481                Ok(vec![
482                    PipelineIngestRequest {
483                        table: "test".to_string(),
484                        values: vec![
485                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
486                        ],
487                    },
488                    PipelineIngestRequest {
489                        table: "logs".to_string(),
490                        values: vec![
491                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
492                        ],
493                    },
494                ]),
495            ),
496            // Case with incomplete bulk request.
497            (
498                r#"
499                {"create":{"_index":"test","_id":"1"}}
500                {"foo1":"foo1_value", "bar1":"bar1_value"}
501                {"create":{"_index":"logs","_id":"2"}}
502                "#,
503                Some("logs".to_string()),
504                None,
505                Ok(vec![
506                    PipelineIngestRequest {
507                        table: "test".to_string(),
508                        values: vec![
509                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
510                        ],
511                    },
512                ]),
513            ),
514            // Specify the `data` field as the message field and the value is a JSON string.
515            (
516                r#"
517                {"create":{"_index":"test","_id":"1"}}
518                {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
519                {"create":{"_index":"test","_id":"2"}}
520                {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
521                "#,
522                None,
523                Some("data".to_string()),
524                Ok(vec![
525                    PipelineIngestRequest {
526                        table: "test".to_string(),
527                        values: vec![
528                            json!({"foo1": "foo1_value", "bar1": "bar1_value"}).into(),
529                        ],
530                    },
531                    PipelineIngestRequest {
532                        table: "test".to_string(),
533                        values: vec![
534                            json!({"foo2": "foo2_value", "bar2": "bar2_value"}).into(),
535                        ],
536                    },
537                ]),
538            ),
539            // Simulate the log data from Logstash.
540            (
541                r#"
542                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
543                {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
544                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
545                {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
546                "#,
547                None,
548                Some("message".to_string()),
549                Ok(vec![
550                    PipelineIngestRequest {
551                        table: "logs-generic-default".to_string(),
552                        values: vec![
553                            json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""}).into(),
554                        ],
555                    },
556                    PipelineIngestRequest {
557                        table: "logs-generic-default".to_string(),
558                        values: vec![
559                            json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""}).into(),
560                        ],
561                    },
562                ]),
563            ),
564            // With invalid bulk request.
565            (
566                r#"
567                { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
568                { "foo1" : "foo1_value", "bar1" : "bar1_value" }
569                "#,
570                None,
571                None,
572                Err(InvalidElasticsearchInputSnafu {
573                    reason: "it's a invalid bulk request".to_string(),
574                }),
575            ),
576        ];
577
578        for (input, index, msg_field, expected) in test_cases {
579            let requests = parse_bulk_request(input, &index, &msg_field);
580            if let Ok(expected) = expected {
581                assert_eq!(requests.unwrap(), expected);
582            } else {
583                assert!(requests.is_err());
584            }
585        }
586    }
587}