1use std::collections::HashMap as StdHashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
21 RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
22};
23use jsonb::{Number as JsonbNumber, Value as JsonbValue};
24use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
25use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue};
26use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
27use pipeline::{
28 ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
29};
30use serde_json::{Map, Value};
31use session::context::QueryContextRef;
32use snafu::{ensure, ResultExt};
33
34use crate::error::{
35 IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result,
36 UnsupportedJsonDataTypeForTagSnafu,
37};
38use crate::http::event::PipelineIngestRequest;
39use crate::otlp::trace::attributes::OtlpAnyValue;
40use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
41use crate::pipeline::run_pipeline;
42use crate::query_handler::PipelineHandlerRef;
43
44pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
45
46pub async fn to_grpc_insert_requests(
54 request: ExportLogsServiceRequest,
55 pipeline: PipelineWay,
56 pipeline_params: GreptimePipelineParams,
57 table_name: String,
58 query_ctx: &QueryContextRef,
59 pipeline_handler: PipelineHandlerRef,
60) -> Result<ContextReq> {
61 match pipeline {
62 PipelineWay::OtlpLogDirect(select_info) => {
63 let rows = parse_export_logs_service_request_to_rows(request, select_info)?;
64 let insert_request = RowInsertRequest {
65 rows: Some(rows),
66 table_name,
67 };
68
69 Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
70 }
71 PipelineWay::Pipeline(pipeline_def) => {
72 let data = parse_export_logs_service_request(request);
73 let array = pipeline::json_array_to_map(data).context(PipelineSnafu)?;
74
75 let pipeline_ctx =
76 PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
77 run_pipeline(
78 &pipeline_handler,
79 &pipeline_ctx,
80 PipelineIngestRequest {
81 table: table_name,
82 values: array,
83 },
84 query_ctx,
85 true,
86 )
87 .await
88 }
89 _ => NotSupportedSnafu {
90 feat: "Unsupported pipeline for logs",
91 }
92 .fail(),
93 }
94}
95
96fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (Value, Value, Value) {
97 scope
98 .map(|x| {
99 (
100 Value::Object(key_value_to_map(x.attributes)),
101 Value::String(x.version),
102 Value::String(x.name),
103 )
104 })
105 .unwrap_or((Value::Null, Value::Null, Value::Null))
106}
107
108fn scope_to_jsonb(
109 scope: Option<InstrumentationScope>,
110) -> (JsonbValue<'static>, Option<String>, Option<String>) {
111 scope
112 .map(|x| {
113 (
114 key_value_to_jsonb(x.attributes),
115 Some(x.version),
116 Some(x.name),
117 )
118 })
119 .unwrap_or((JsonbValue::Null, None, None))
120}
121
122fn log_to_pipeline_value(
123 log: LogRecord,
124 resource_schema_url: Value,
125 resource_attr: Value,
126 scope_schema_url: Value,
127 scope_name: Value,
128 scope_version: Value,
129 scope_attrs: Value,
130) -> Value {
131 let log_attrs = Value::Object(key_value_to_map(log.attributes));
132 let mut map = Map::new();
133 map.insert("Timestamp".to_string(), Value::from(log.time_unix_nano));
134 map.insert(
135 "ObservedTimestamp".to_string(),
136 Value::from(log.observed_time_unix_nano),
137 );
138
139 map.insert(
141 "TraceId".to_string(),
142 Value::String(bytes_to_hex_string(&log.trace_id)),
143 );
144 map.insert(
145 "SpanId".to_string(),
146 Value::String(bytes_to_hex_string(&log.span_id)),
147 );
148 map.insert("TraceFlags".to_string(), Value::from(log.flags));
149 map.insert("SeverityText".to_string(), Value::String(log.severity_text));
150 map.insert(
151 "SeverityNumber".to_string(),
152 Value::from(log.severity_number),
153 );
154 map.insert(
156 "Body".to_string(),
157 log.body
158 .as_ref()
159 .map(|x| Value::String(log_body_to_string(x)))
160 .unwrap_or(Value::Null),
161 );
162 map.insert("ResourceSchemaUrl".to_string(), resource_schema_url);
163
164 map.insert("ResourceAttributes".to_string(), resource_attr);
165 map.insert("ScopeSchemaUrl".to_string(), scope_schema_url);
166 map.insert("ScopeName".to_string(), scope_name);
167 map.insert("ScopeVersion".to_string(), scope_version);
168 map.insert("ScopeAttributes".to_string(), scope_attrs);
169 map.insert("LogAttributes".to_string(), log_attrs);
170 Value::Object(map)
171}
172
173fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
174 [
175 (
176 "timestamp",
177 ColumnDataType::TimestampNanosecond,
178 SemanticType::Timestamp,
179 None,
180 None,
181 ),
182 (
183 "trace_id",
184 ColumnDataType::String,
185 SemanticType::Field,
186 None,
187 None,
188 ),
189 (
190 "span_id",
191 ColumnDataType::String,
192 SemanticType::Field,
193 None,
194 None,
195 ),
196 (
197 "severity_text",
198 ColumnDataType::String,
199 SemanticType::Field,
200 None,
201 None,
202 ),
203 (
204 "severity_number",
205 ColumnDataType::Int32,
206 SemanticType::Field,
207 None,
208 None,
209 ),
210 (
211 "body",
212 ColumnDataType::String,
213 SemanticType::Field,
214 None,
215 Some(ColumnOptions {
216 options: StdHashMap::from([(
217 "fulltext".to_string(),
218 r#"{"enable":true}"#.to_string(),
219 )]),
220 }),
221 ),
222 (
223 "log_attributes",
224 ColumnDataType::Binary,
225 SemanticType::Field,
226 Some(ColumnDataTypeExtension {
227 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
228 }),
229 None,
230 ),
231 (
232 "trace_flags",
233 ColumnDataType::Uint32,
234 SemanticType::Field,
235 None,
236 None,
237 ),
238 (
239 "scope_name",
240 ColumnDataType::String,
241 SemanticType::Tag,
242 None,
243 None,
244 ),
245 (
246 "scope_version",
247 ColumnDataType::String,
248 SemanticType::Field,
249 None,
250 None,
251 ),
252 (
253 "scope_attributes",
254 ColumnDataType::Binary,
255 SemanticType::Field,
256 Some(ColumnDataTypeExtension {
257 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
258 }),
259 None,
260 ),
261 (
262 "scope_schema_url",
263 ColumnDataType::String,
264 SemanticType::Field,
265 None,
266 None,
267 ),
268 (
269 "resource_attributes",
270 ColumnDataType::Binary,
271 SemanticType::Field,
272 Some(ColumnDataTypeExtension {
273 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
274 }),
275 None,
276 ),
277 (
278 "resource_schema_url",
279 ColumnDataType::String,
280 SemanticType::Field,
281 None,
282 None,
283 ),
284 ]
285 .into_iter()
286 .map(
287 |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
288 column_name: field_name.to_string(),
289 datatype: column_type as i32,
290 semantic_type: semantic_type as i32,
291 datatype_extension,
292 options,
293 },
294 )
295 .collect::<Vec<ColumnSchema>>()
296}
297
298fn build_otlp_build_in_row(
299 log: LogRecord,
300 parse_ctx: &mut ParseContext,
301) -> (Row, JsonbValue<'static>) {
302 let log_attr = key_value_to_jsonb(log.attributes);
303 let ts = if log.time_unix_nano != 0 {
304 log.time_unix_nano
305 } else {
306 log.observed_time_unix_nano
307 };
308
309 let row = vec![
310 GreptimeValue {
311 value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
312 },
313 GreptimeValue {
314 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
315 },
316 GreptimeValue {
317 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
318 },
319 GreptimeValue {
320 value_data: Some(ValueData::StringValue(log.severity_text)),
321 },
322 GreptimeValue {
323 value_data: Some(ValueData::I32Value(log.severity_number)),
324 },
325 GreptimeValue {
326 value_data: log
327 .body
328 .as_ref()
329 .map(|x| ValueData::StringValue(log_body_to_string(x))),
330 },
331 GreptimeValue {
332 value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
333 },
334 GreptimeValue {
335 value_data: Some(ValueData::U32Value(log.flags)),
336 },
337 GreptimeValue {
338 value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
339 },
340 GreptimeValue {
341 value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
342 },
343 GreptimeValue {
344 value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
345 },
346 GreptimeValue {
347 value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
348 },
349 GreptimeValue {
350 value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
351 },
352 GreptimeValue {
353 value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
354 },
355 ];
356 (Row { values: row }, log_attr)
357}
358
359fn extract_field_from_attr_and_combine_schema(
360 select_info: &SelectInfo,
361 select_schema: &mut SchemaInfo,
362 attrs: &jsonb::Value,
363) -> Result<Vec<GreptimeValue>> {
364 let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
367
368 for key in select_info.keys.iter() {
369 let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
370 continue;
371 };
372 let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else {
373 continue;
374 };
375
376 if let Some(index) = select_schema.index.get(key) {
377 let column_schema = &select_schema.schema[*index];
378 ensure!(
380 column_schema.datatype == schema.datatype,
381 IncompatibleSchemaSnafu {
382 column_name: key,
383 datatype: column_schema.datatype().as_str_name(),
384 expected: column_schema.datatype,
385 actual: schema.datatype,
386 }
387 );
388 extracted_values[*index] = value;
389 } else {
390 select_schema.schema.push(schema);
391 select_schema
392 .index
393 .insert(key.clone(), select_schema.schema.len() - 1);
394 extracted_values.push(value);
395 }
396 }
397
398 Ok(extracted_values)
399}
400
401fn decide_column_schema_and_convert_value(
402 column_name: &str,
403 value: JsonbValue,
404) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
405 let column_info = match value {
406 JsonbValue::String(s) => Ok(Some((
407 GreptimeValue {
408 value_data: Some(ValueData::StringValue(s.into())),
409 },
410 ColumnDataType::String,
411 SemanticType::Tag,
412 None,
413 ))),
414 JsonbValue::Number(n) => match n {
415 JsonbNumber::Int64(i) => Ok(Some((
416 GreptimeValue {
417 value_data: Some(ValueData::I64Value(i)),
418 },
419 ColumnDataType::Int64,
420 SemanticType::Tag,
421 None,
422 ))),
423 JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
424 ty: "FLOAT".to_string(),
425 key: column_name,
426 }
427 .fail(),
428 JsonbNumber::UInt64(u) => Ok(Some((
429 GreptimeValue {
430 value_data: Some(ValueData::U64Value(u)),
431 },
432 ColumnDataType::Uint64,
433 SemanticType::Tag,
434 None,
435 ))),
436 },
437 JsonbValue::Bool(b) => Ok(Some((
438 GreptimeValue {
439 value_data: Some(ValueData::BoolValue(b)),
440 },
441 ColumnDataType::Boolean,
442 SemanticType::Tag,
443 None,
444 ))),
445 JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
446 ty: "Json".to_string(),
447 key: column_name,
448 }
449 .fail(),
450 JsonbValue::Null => Ok(None),
451 };
452 column_info.map(|c| {
453 c.map(|(value, column_type, semantic_type, datatype_extension)| {
454 (
455 ColumnSchema {
456 column_name: column_name.to_string(),
457 datatype: column_type as i32,
458 semantic_type: semantic_type as i32,
459 datatype_extension,
460 options: None,
461 },
462 value,
463 )
464 })
465 })
466}
467
468fn parse_export_logs_service_request_to_rows(
469 request: ExportLogsServiceRequest,
470 select_info: Box<SelectInfo>,
471) -> Result<Rows> {
472 let mut schemas = build_otlp_logs_identity_schema();
473
474 let mut parse_ctx = ParseContext::new(select_info);
475 let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
476
477 schemas.extend(parse_ctx.select_schema.schema);
478
479 rows.iter_mut().for_each(|row| {
480 row.values.resize(schemas.len(), GreptimeValue::default());
481 });
482
483 Ok(Rows {
484 schema: schemas,
485 rows,
486 })
487}
488
489fn parse_resource(
490 parse_ctx: &mut ParseContext,
491 resource_logs_vec: Vec<ResourceLogs>,
492) -> Result<Vec<Row>> {
493 let total_len = resource_logs_vec
494 .iter()
495 .flat_map(|r| r.scope_logs.iter())
496 .map(|s| s.log_records.len())
497 .sum();
498
499 let mut results = Vec::with_capacity(total_len);
500
501 for r in resource_logs_vec {
502 parse_ctx.resource_attr = r
503 .resource
504 .map(|resource| key_value_to_jsonb(resource.attributes))
505 .unwrap_or(JsonbValue::Null);
506
507 parse_ctx.resource_url = r.schema_url;
508
509 parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
510 &parse_ctx.select_info,
511 &mut parse_ctx.select_schema,
512 &parse_ctx.resource_attr,
513 )?;
514
515 let rows = parse_scope(r.scope_logs, parse_ctx)?;
516 results.extend(rows);
517 }
518 Ok(results)
519}
520
521struct ParseContext<'a> {
522 select_info: Box<SelectInfo>,
524 select_schema: SchemaInfo,
527
528 resource_uplift_values: Vec<GreptimeValue>,
530 scope_uplift_values: Vec<GreptimeValue>,
531
532 resource_url: String,
534 resource_attr: JsonbValue<'a>,
535 scope_name: Option<String>,
536 scope_version: Option<String>,
537 scope_url: String,
538 scope_attrs: JsonbValue<'a>,
539}
540
541impl<'a> ParseContext<'a> {
542 pub fn new(select_info: Box<SelectInfo>) -> ParseContext<'a> {
543 let len = select_info.keys.len();
544 ParseContext {
545 select_info,
546 select_schema: SchemaInfo::with_capacity(len),
547 resource_uplift_values: vec![],
548 scope_uplift_values: vec![],
549 resource_url: String::new(),
550 resource_attr: JsonbValue::Null,
551 scope_name: None,
552 scope_version: None,
553 scope_url: String::new(),
554 scope_attrs: JsonbValue::Null,
555 }
556 }
557}
558
559fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
560 let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
561 let mut results = Vec::with_capacity(len);
562
563 for scope_logs in scopes_log_vec {
564 let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
565 parse_ctx.scope_name = scope_name;
566 parse_ctx.scope_version = scope_version;
567 parse_ctx.scope_url = scope_logs.schema_url;
568 parse_ctx.scope_attrs = scope_attrs;
569
570 parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
571 &parse_ctx.select_info,
572 &mut parse_ctx.select_schema,
573 &parse_ctx.scope_attrs,
574 )?;
575
576 let rows = parse_log(scope_logs.log_records, parse_ctx)?;
577 results.extend(rows);
578 }
579 Ok(results)
580}
581
582fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
583 let mut result = Vec::with_capacity(log_records.len());
584
585 for log in log_records {
586 let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
587
588 let log_values = extract_field_from_attr_and_combine_schema(
589 &parse_ctx.select_info,
590 &mut parse_ctx.select_schema,
591 &log_attr,
592 )?;
593
594 let extracted_values = merge_values(
595 log_values,
596 &parse_ctx.scope_uplift_values,
597 &parse_ctx.resource_uplift_values,
598 );
599
600 row.values.extend(extracted_values);
601
602 result.push(row);
603 }
604 Ok(result)
605}
606
607fn merge_values(
608 log: Vec<GreptimeValue>,
609 scope: &[GreptimeValue],
610 resource: &[GreptimeValue],
611) -> Vec<GreptimeValue> {
612 log.into_iter()
613 .enumerate()
614 .map(|(i, value)| GreptimeValue {
615 value_data: value
616 .value_data
617 .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
618 .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
619 })
620 .collect()
621}
622
623fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<Value> {
626 let mut result = Vec::new();
627 for r in request.resource_logs {
628 let resource_attr = r
629 .resource
630 .map(|x| Value::Object(key_value_to_map(x.attributes)))
631 .unwrap_or(Value::Null);
632 let resource_schema_url = Value::String(r.schema_url);
633 for scope_logs in r.scope_logs {
634 let (scope_attrs, scope_version, scope_name) =
635 scope_to_pipeline_value(scope_logs.scope);
636 let scope_schema_url = Value::String(scope_logs.schema_url);
637 for log in scope_logs.log_records {
638 let value = log_to_pipeline_value(
639 log,
640 resource_schema_url.clone(),
641 resource_attr.clone(),
642 scope_schema_url.clone(),
643 scope_name.clone(),
644 scope_version.clone(),
645 scope_attrs.clone(),
646 );
647 result.push(value);
648 }
649 }
650 }
651 result
652}
653
654fn any_value_to_pipeline_value(value: any_value::Value) -> Value {
656 match value {
657 any_value::Value::StringValue(s) => Value::String(s),
658 any_value::Value::IntValue(i) => Value::from(i),
659 any_value::Value::DoubleValue(d) => Value::from(d),
660 any_value::Value::BoolValue(b) => Value::Bool(b),
661 any_value::Value::ArrayValue(a) => {
662 let values = a
663 .values
664 .into_iter()
665 .map(|v| match v.value {
666 Some(value) => any_value_to_pipeline_value(value),
667 None => Value::Null,
668 })
669 .collect();
670 Value::Array(values)
671 }
672 any_value::Value::KvlistValue(kv) => {
673 let value = key_value_to_map(kv.values);
674 Value::Object(value)
675 }
676 any_value::Value::BytesValue(b) => Value::String(bytes_to_hex_string(&b)),
677 }
678}
679
680fn key_value_to_map(key_values: Vec<KeyValue>) -> Map<String, Value> {
682 let mut map = Map::new();
683 for kv in key_values {
684 let value = match kv.value {
685 Some(value) => match value.value {
686 Some(value) => any_value_to_pipeline_value(value),
687 None => Value::Null,
688 },
689 None => Value::Null,
690 };
691 map.insert(kv.key.clone(), value);
692 }
693 map
694}
695
696fn log_body_to_string(body: &AnyValue) -> String {
697 let otlp_value = OtlpAnyValue::from(body);
698 otlp_value.to_string()
699}