servers/
elasticsearch.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use axum::extract::{Path, Query, State};
19use axum::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
20use axum::response::IntoResponse;
21use axum::Extension;
22use axum_extra::TypedHeader;
23use common_error::ext::ErrorExt;
24use common_telemetry::{debug, error};
25use headers::ContentType;
26use once_cell::sync::Lazy;
27use pipeline::{PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME};
28use serde_json::{json, Deserializer, Value};
29use session::context::{Channel, QueryContext};
30use snafu::{ensure, ResultExt};
31
32use crate::error::{
33    status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, PipelineSnafu,
34    Result as ServersResult,
35};
36use crate::http::event::{
37    ingest_logs_inner, LogIngesterQueryParams, LogState, PipelineIngestRequest,
38};
39use crate::metrics::{
40    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
41};
42
43// The headers for every response of Elasticsearch API.
44static ELASTICSEARCH_HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
45    HeaderMap::from_iter([
46        (
47            axum::http::header::CONTENT_TYPE,
48            HeaderValue::from_static("application/json"),
49        ),
50        (
51            HeaderName::from_static("x-elastic-product"),
52            HeaderValue::from_static("Elasticsearch"),
53        ),
54    ])
55});
56
57// The fake version of Elasticsearch and used for `_version` API.
58const ELASTICSEARCH_VERSION: &str = "8.16.0";
59
60// Return fake response for Elasticsearch ping request.
61#[axum_macros::debug_handler]
62pub async fn handle_get_version() -> impl IntoResponse {
63    let body = serde_json::json!({
64        "version": {
65            "number": ELASTICSEARCH_VERSION
66        }
67    });
68    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
69}
70
71// Return fake response for Elasticsearch license request.
72// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-license.html.
73#[axum_macros::debug_handler]
74pub async fn handle_get_license() -> impl IntoResponse {
75    let body = serde_json::json!({
76        "license": {
77            "uid": "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
78            "type": "oss",
79            "status": "active",
80            "expiry_date_in_millis": 4891198687000_i64,
81        }
82    });
83    (StatusCode::OK, elasticsearch_headers(), axum::Json(body))
84}
85
86/// Process `_bulk` API requests. Only support to create logs.
87/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
88#[axum_macros::debug_handler]
89pub async fn handle_bulk_api(
90    State(log_state): State<LogState>,
91    Query(params): Query<LogIngesterQueryParams>,
92    Extension(query_ctx): Extension<QueryContext>,
93    TypedHeader(_content_type): TypedHeader<ContentType>,
94    headers: HeaderMap,
95    payload: String,
96) -> impl IntoResponse {
97    do_handle_bulk_api(log_state, None, params, query_ctx, headers, payload).await
98}
99
100/// Process `/${index}/_bulk` API requests. Only support to create logs.
101/// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request.
102#[axum_macros::debug_handler]
103pub async fn handle_bulk_api_with_index(
104    State(log_state): State<LogState>,
105    Path(index): Path<String>,
106    Query(params): Query<LogIngesterQueryParams>,
107    Extension(query_ctx): Extension<QueryContext>,
108    TypedHeader(_content_type): TypedHeader<ContentType>,
109    headers: HeaderMap,
110    payload: String,
111) -> impl IntoResponse {
112    do_handle_bulk_api(log_state, Some(index), params, query_ctx, headers, payload).await
113}
114
115async fn do_handle_bulk_api(
116    log_state: LogState,
117    index: Option<String>,
118    params: LogIngesterQueryParams,
119    mut query_ctx: QueryContext,
120    headers: HeaderMap,
121    payload: String,
122) -> impl IntoResponse {
123    let start = Instant::now();
124    debug!(
125        "Received bulk request, params: {:?}, payload: {:?}",
126        params, payload
127    );
128
129    // The `schema` is already set in the query_ctx in auth process.
130    query_ctx.set_channel(Channel::Elasticsearch);
131
132    let db = params.db.unwrap_or_else(|| "public".to_string());
133
134    // Record the ingestion time histogram.
135    let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
136        .with_label_values(&[&db])
137        .start_timer();
138
139    // If pipeline_name is not provided, use the internal pipeline.
140    let pipeline_name = if let Some(pipeline) = params.pipeline_name {
141        pipeline
142    } else {
143        GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string()
144    };
145
146    // Read the ndjson payload and convert it to a vector of Value.
147    let requests = match parse_bulk_request(&payload, &index, &params.msg_field) {
148        Ok(requests) => requests,
149        Err(e) => {
150            return (
151                StatusCode::BAD_REQUEST,
152                elasticsearch_headers(),
153                axum::Json(write_bulk_response(
154                    start.elapsed().as_millis() as i64,
155                    0,
156                    StatusCode::BAD_REQUEST.as_u16() as u32,
157                    e.to_string().as_str(),
158                )),
159            );
160        }
161    };
162    let log_num = requests.len();
163
164    let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) {
165        Ok(pipeline) => pipeline,
166        Err(e) => {
167            // should be unreachable
168            error!(e; "Failed to ingest logs");
169            return (
170                status_code_to_http_status(&e.status_code()),
171                elasticsearch_headers(),
172                axum::Json(write_bulk_response(
173                    start.elapsed().as_millis() as i64,
174                    0,
175                    e.status_code() as u32,
176                    e.to_string().as_str(),
177                )),
178            );
179        }
180    };
181    if let Err(e) = ingest_logs_inner(
182        log_state.log_handler,
183        pipeline,
184        requests,
185        Arc::new(query_ctx),
186        headers,
187    )
188    .await
189    {
190        error!(e; "Failed to ingest logs");
191        return (
192            status_code_to_http_status(&e.status_code()),
193            elasticsearch_headers(),
194            axum::Json(write_bulk_response(
195                start.elapsed().as_millis() as i64,
196                0,
197                e.status_code() as u32,
198                e.to_string().as_str(),
199            )),
200        );
201    }
202
203    // Record the number of documents ingested.
204    METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT
205        .with_label_values(&[&db])
206        .inc_by(log_num as u64);
207
208    (
209        StatusCode::OK,
210        elasticsearch_headers(),
211        axum::Json(write_bulk_response(
212            start.elapsed().as_millis() as i64,
213            log_num,
214            StatusCode::CREATED.as_u16() as u32,
215            "",
216        )),
217    )
218}
219
220// It will generate the following response when write _bulk request to GreptimeDB successfully:
221// {
222//     "took": 1000,
223//     "errors": false,
224//     "items": [
225//         { "create": { "status": 201 } },
226//         { "create": { "status": 201 } },
227//         ...
228//     ]
229// }
230// If the status code is not 201, it will generate the following response:
231// {
232//     "took": 1000,
233//     "errors": true,
234//     "items": [
235//         { "create": { "status": 400, "error": { "type": "illegal_argument_exception", "reason": "<error_reason>" } } }
236//     ]
237// }
238fn write_bulk_response(took_ms: i64, n: usize, status_code: u32, error_reason: &str) -> Value {
239    if error_reason.is_empty() {
240        let items: Vec<Value> = (0..n)
241            .map(|_| {
242                json!({
243                    "create": {
244                        "status": status_code
245                    }
246                })
247            })
248            .collect();
249        json!({
250            "took": took_ms,
251            "errors": false,
252            "items": items,
253        })
254    } else {
255        json!({
256            "took": took_ms,
257            "errors": true,
258            "items": [
259                { "create": { "status": status_code, "error": { "type": "illegal_argument_exception", "reason": error_reason } } }
260            ]
261        })
262    }
263}
264
265/// Returns the headers for every response of Elasticsearch API.
266pub fn elasticsearch_headers() -> HeaderMap {
267    ELASTICSEARCH_HEADERS.clone()
268}
269
270// Parse the Elasticsearch bulk request and convert it to multiple LogIngestRequests.
271// The input will be Elasticsearch bulk request in NDJSON format.
272// For example, the input will be like this:
273// { "index" : { "_index" : "test", "_id" : "1" } }
274// { "field1" : "value1" }
275// { "index" : { "_index" : "test", "_id" : "2" } }
276// { "field2" : "value2" }
277fn parse_bulk_request(
278    input: &str,
279    index_from_url: &Option<String>,
280    msg_field: &Option<String>,
281) -> ServersResult<Vec<PipelineIngestRequest>> {
282    // Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
283    let values: Vec<Value> = Deserializer::from_str(input)
284        .into_iter::<Value>()
285        .collect::<Result<_, _>>()
286        .context(ParseJsonSnafu)?;
287
288    // Check if the input is empty.
289    ensure!(
290        !values.is_empty(),
291        InvalidElasticsearchInputSnafu {
292            reason: "empty bulk request".to_string(),
293        }
294    );
295
296    let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
297    let mut values = values.into_iter();
298
299    // Read the ndjson payload and convert it to a (index, value) vector.
300    // For Elasticsearch post `_bulk` API, each chunk contains two objects:
301    //   1. The first object is the command, it should be `create` or `index`.
302    //   2. The second object is the document data.
303    while let Some(mut cmd) = values.next() {
304        // NOTE: Although the native Elasticsearch API supports upsert in `index` command, we don't support change any data in `index` command and it's same as `create` command.
305        let index = if let Some(cmd) = cmd.get_mut("create") {
306            get_index_from_cmd(cmd.take())?
307        } else if let Some(cmd) = cmd.get_mut("index") {
308            get_index_from_cmd(cmd.take())?
309        } else {
310            return InvalidElasticsearchInputSnafu {
311                reason: format!(
312                    "invalid bulk request, expected 'create' or 'index' but got {:?}",
313                    cmd
314                ),
315            }
316            .fail();
317        };
318
319        // Read the second object to get the document data. Stop the loop if there is no document.
320        if let Some(document) = values.next() {
321            // If the msg_field is provided, fetch the value of the field from the document data.
322            let log_value = if let Some(msg_field) = msg_field {
323                get_log_value_from_msg_field(document, msg_field)
324            } else {
325                document
326            };
327
328            ensure!(
329                index.is_some() || index_from_url.is_some(),
330                InvalidElasticsearchInputSnafu {
331                    reason: "missing index in bulk request".to_string(),
332                }
333            );
334
335            let log_value = pipeline::json_to_map(log_value).context(PipelineSnafu)?;
336            requests.push(PipelineIngestRequest {
337                table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
338                values: vec![log_value],
339            });
340        }
341    }
342
343    debug!(
344        "Received {} log ingest requests: {:?}",
345        requests.len(),
346        requests
347    );
348
349    Ok(requests)
350}
351
352// Get the index from the command. We will take index as the table name in GreptimeDB.
353fn get_index_from_cmd(mut v: Value) -> ServersResult<Option<String>> {
354    if let Some(index) = v.get_mut("_index") {
355        if let Value::String(index) = index.take() {
356            Ok(Some(index))
357        } else {
358            // If the `_index` exists, it should be a string.
359            InvalidElasticsearchInputSnafu {
360                reason: "index is not a string in bulk request".to_string(),
361            }
362            .fail()
363        }
364    } else {
365        Ok(None)
366    }
367}
368
369// If the msg_field is provided, fetch the value of the field from the document data.
370// For example, if the `msg_field` is `message`, and the document data is `{"message":"hello"}`, the log value will be Value::String("hello").
371fn get_log_value_from_msg_field(mut v: Value, msg_field: &str) -> Value {
372    if let Some(message) = v.get_mut(msg_field) {
373        let message = message.take();
374        match message {
375            Value::String(s) => match serde_json::from_str::<Value>(&s) {
376                Ok(s) => s,
377                // If the message is not a valid JSON, return a map with the original message key and value.
378                Err(_) => json!({msg_field: s}),
379            },
380            // If the message is not a string, just use the original message as the log value.
381            _ => message,
382        }
383    } else {
384        // If the msg_field is not found, just use the original message as the log value.
385        v
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn test_parse_bulk_request() {
395        let test_cases = vec![
396            // Normal case.
397            (
398                r#"
399                {"create":{"_index":"test","_id":"1"}}
400                {"foo1":"foo1_value", "bar1":"bar1_value"}
401                {"create":{"_index":"test","_id":"2"}}
402                {"foo2":"foo2_value","bar2":"bar2_value"}
403                "#,
404                None,
405                None,
406                Ok(vec![
407                    PipelineIngestRequest {
408                        table: "test".to_string(),
409                        values: vec![
410                            pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap(),
411                        ],
412                    },
413                    PipelineIngestRequest {
414                        table: "test".to_string(),
415                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
416                    },
417                ]),
418            ),
419            // Case with index.
420            (
421                r#"
422                {"create":{"_index":"test","_id":"1"}}
423                {"foo1":"foo1_value", "bar1":"bar1_value"}
424                {"create":{"_index":"logs","_id":"2"}}
425                {"foo2":"foo2_value","bar2":"bar2_value"}
426                "#,
427                Some("logs".to_string()),
428                None,
429                Ok(vec![
430                    PipelineIngestRequest {
431                        table: "test".to_string(),
432                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
433                    },
434                    PipelineIngestRequest {
435                        table: "logs".to_string(),
436                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
437                    },
438                ]),
439            ),
440            // Case with index.
441            (
442                r#"
443                {"create":{"_index":"test","_id":"1"}}
444                {"foo1":"foo1_value", "bar1":"bar1_value"}
445                {"create":{"_index":"logs","_id":"2"}}
446                {"foo2":"foo2_value","bar2":"bar2_value"}
447                "#,
448                Some("logs".to_string()),
449                None,
450                Ok(vec![
451                    PipelineIngestRequest {
452                        table: "test".to_string(),
453                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
454                    },
455                    PipelineIngestRequest {
456                        table: "logs".to_string(),
457                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
458                    },
459                ]),
460            ),
461            // Case with incomplete bulk request.
462            (
463                r#"
464                {"create":{"_index":"test","_id":"1"}}
465                {"foo1":"foo1_value", "bar1":"bar1_value"}
466                {"create":{"_index":"logs","_id":"2"}}
467                "#,
468                Some("logs".to_string()),
469                None,
470                Ok(vec![
471                    PipelineIngestRequest {
472                        table: "test".to_string(),
473                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
474                    },
475                ]),
476            ),
477            // Specify the `data` field as the message field and the value is a JSON string.
478            (
479                r#"
480                {"create":{"_index":"test","_id":"1"}}
481                {"data":"{\"foo1\":\"foo1_value\", \"bar1\":\"bar1_value\"}", "not_data":"not_data_value"}
482                {"create":{"_index":"test","_id":"2"}}
483                {"data":"{\"foo2\":\"foo2_value\", \"bar2\":\"bar2_value\"}", "not_data":"not_data_value"}
484                "#,
485                None,
486                Some("data".to_string()),
487                Ok(vec![
488                    PipelineIngestRequest {
489                        table: "test".to_string(),
490                        values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
491                    },
492                    PipelineIngestRequest {
493                        table: "test".to_string(),
494                        values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
495                    },
496                ]),
497            ),
498            // Simulate the log data from Logstash.
499            (
500                r#"
501                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
502                {"message":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\"","@timestamp":"2025-01-04T04:32:13.868962186Z","event":{"original":"172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
503                {"create":{"_id":null,"_index":"logs-generic-default","routing":null}}
504                {"message":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\"","@timestamp":"2025-01-04T04:32:13.868723810Z","event":{"original":"10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""},"host":{"name":"orbstack"},"log":{"file":{"path":"/var/log/nginx/access.log"}},"@version":"1","data_stream":{"type":"logs","dataset":"generic","namespace":"default"}}
505                "#,
506                None,
507                Some("message".to_string()),
508                Ok(vec![
509                    PipelineIngestRequest {
510                        table: "logs-generic-default".to_string(),
511                        values: vec![
512                            pipeline::json_to_map(json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""})).unwrap(),
513                        ],
514                    },
515                    PipelineIngestRequest {
516                        table: "logs-generic-default".to_string(),
517                        values: vec![
518                            pipeline::json_to_map(json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""})).unwrap(),
519                        ],
520                    },
521                ]),
522            ),
523            // With invalid bulk request.
524            (
525                r#"
526                { "not_create_or_index" : { "_index" : "test", "_id" : "1" } }
527                { "foo1" : "foo1_value", "bar1" : "bar1_value" }
528                "#,
529                None,
530                None,
531                Err(InvalidElasticsearchInputSnafu {
532                    reason: "it's a invalid bulk request".to_string(),
533                }),
534            ),
535        ];
536
537        for (input, index, msg_field, expected) in test_cases {
538            let requests = parse_bulk_request(input, &index, &msg_field);
539            if expected.is_ok() {
540                assert_eq!(requests.unwrap(), expected.unwrap());
541            } else {
542                assert!(requests.is_err());
543            }
544        }
545    }
546}