1use std::collections::BTreeMap;
16
17use ahash::{HashMap, HashMapExt};
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::column_data_type_extension::TypeExt;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::value::ValueData;
22use api::v1::{
23 ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row,
24 RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
25};
26use bytes::Bytes;
27use common_time::Timestamp;
28use common_time::timestamp::TimeUnit;
29use jsonb::{Number as JsonbNumber, Value as JsonbValue};
30use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
31use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value};
32use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
33use pipeline::{
34 ContextReq, GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo,
35};
36use session::context::QueryContextRef;
37use snafu::ensure;
38use vrl::prelude::NotNan;
39use vrl::value::{KeyString, Value as VrlValue};
40
41use crate::error::{
42 Error, IncompatibleSchemaSnafu, InvalidParameterSnafu, NotSupportedSnafu, Result,
43 UnsupportedJsonDataTypeForTagSnafu,
44};
45use crate::http::event::PipelineIngestRequest;
46use crate::otlp::coerce::coerce_value_data;
47use crate::otlp::trace::attributes::OtlpAnyValue;
48use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
49use crate::pipeline::run_pipeline;
50use crate::query_handler::PipelineHandlerRef;
51
52pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";
53
54pub async fn to_grpc_insert_requests(
62 request: ExportLogsServiceRequest,
63 pipeline: PipelineWay,
64 pipeline_params: GreptimePipelineParams,
65 table_name: String,
66 query_ctx: &QueryContextRef,
67 pipeline_handler: PipelineHandlerRef,
68) -> Result<ContextReq> {
69 match pipeline {
70 PipelineWay::OtlpLogDirect(select_info) => {
71 let table = pipeline_handler
72 .get_table(&table_name, query_ctx)
73 .await
74 .map_err(Error::from)?;
75 let existing_schema = table
76 .as_deref()
77 .map(ExistingLogSchema::try_from_table)
78 .transpose()?;
79 let rows = parse_export_logs_service_request_to_rows(
80 request,
81 select_info,
82 existing_schema.as_ref(),
83 &table_name,
84 )?;
85 let insert_request = RowInsertRequest {
86 rows: Some(rows),
87 table_name,
88 };
89
90 Ok(ContextReq::default_opt_with_reqs(vec![insert_request]))
91 }
92 PipelineWay::Pipeline(pipeline_def) => {
93 let array = parse_export_logs_service_request(request);
94
95 let pipeline_ctx =
96 PipelineContext::new(&pipeline_def, &pipeline_params, query_ctx.channel());
97 run_pipeline(
98 &pipeline_handler,
99 &pipeline_ctx,
100 PipelineIngestRequest {
101 table: table_name,
102 values: array,
103 },
104 query_ctx,
105 true,
106 )
107 .await
108 }
109 _ => NotSupportedSnafu {
110 feat: "Unsupported pipeline for logs",
111 }
112 .fail(),
113 }
114}
115
116fn scope_to_pipeline_value(scope: Option<InstrumentationScope>) -> (VrlValue, VrlValue, VrlValue) {
117 scope
118 .map(|x| {
119 (
120 VrlValue::Object(key_value_to_map(x.attributes)),
121 VrlValue::Bytes(x.version.into()),
122 VrlValue::Bytes(x.name.into()),
123 )
124 })
125 .unwrap_or((VrlValue::Null, VrlValue::Null, VrlValue::Null))
126}
127
128fn scope_to_jsonb(
129 scope: Option<InstrumentationScope>,
130) -> (JsonbValue<'static>, Option<String>, Option<String>) {
131 scope
132 .map(|x| {
133 (
134 key_value_to_jsonb(x.attributes),
135 Some(x.version),
136 Some(x.name),
137 )
138 })
139 .unwrap_or((JsonbValue::Null, None, None))
140}
141
142fn log_to_pipeline_value(
143 log: LogRecord,
144 resource_schema_url: VrlValue,
145 resource_attr: VrlValue,
146 scope_schema_url: VrlValue,
147 scope_name: VrlValue,
148 scope_version: VrlValue,
149 scope_attrs: VrlValue,
150) -> VrlValue {
151 let log_attrs = VrlValue::Object(key_value_to_map(log.attributes));
152 let mut map = BTreeMap::new();
153 map.insert(
154 "Timestamp".into(),
155 VrlValue::Integer(log.time_unix_nano as i64),
156 );
157 map.insert(
158 "ObservedTimestamp".into(),
159 VrlValue::Integer(log.observed_time_unix_nano as i64),
160 );
161
162 map.insert(
164 "TraceId".into(),
165 VrlValue::Bytes(bytes_to_hex_string(&log.trace_id).into()),
166 );
167 map.insert(
168 "SpanId".into(),
169 VrlValue::Bytes(bytes_to_hex_string(&log.span_id).into()),
170 );
171 map.insert("TraceFlags".into(), VrlValue::Integer(log.flags as i64));
172 map.insert(
173 "SeverityText".into(),
174 VrlValue::Bytes(log.severity_text.into()),
175 );
176 map.insert(
177 "SeverityNumber".into(),
178 VrlValue::Integer(log.severity_number as i64),
179 );
180 map.insert(
182 "Body".into(),
183 log.body
184 .as_ref()
185 .map(|x| VrlValue::Bytes(log_body_to_string(x).into()))
186 .unwrap_or(VrlValue::Null),
187 );
188 map.insert("ResourceSchemaUrl".into(), resource_schema_url);
189
190 map.insert("ResourceAttributes".into(), resource_attr);
191 map.insert("ScopeSchemaUrl".into(), scope_schema_url);
192 map.insert("ScopeName".into(), scope_name);
193 map.insert("ScopeVersion".into(), scope_version);
194 map.insert("ScopeAttributes".into(), scope_attrs);
195 map.insert("LogAttributes".into(), log_attrs);
196 VrlValue::Object(map)
197}
198
199fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
200 [
201 (
202 "timestamp",
203 ColumnDataType::TimestampNanosecond,
204 SemanticType::Timestamp,
205 None,
206 None,
207 ),
208 (
209 "trace_id",
210 ColumnDataType::String,
211 SemanticType::Field,
212 None,
213 None,
214 ),
215 (
216 "span_id",
217 ColumnDataType::String,
218 SemanticType::Field,
219 None,
220 None,
221 ),
222 (
223 "severity_text",
224 ColumnDataType::String,
225 SemanticType::Field,
226 None,
227 None,
228 ),
229 (
230 "severity_number",
231 ColumnDataType::Int32,
232 SemanticType::Field,
233 None,
234 None,
235 ),
236 (
237 "body",
238 ColumnDataType::String,
239 SemanticType::Field,
240 None,
241 Some(ColumnOptions {
242 options: std::collections::HashMap::from([(
243 "fulltext".to_string(),
244 r#"{"enable":true}"#.to_string(),
245 )]),
246 }),
247 ),
248 (
249 "log_attributes",
250 ColumnDataType::Binary,
251 SemanticType::Field,
252 Some(ColumnDataTypeExtension {
253 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
254 }),
255 None,
256 ),
257 (
258 "trace_flags",
259 ColumnDataType::Uint32,
260 SemanticType::Field,
261 None,
262 None,
263 ),
264 (
265 "scope_name",
266 ColumnDataType::String,
267 SemanticType::Tag,
268 None,
269 None,
270 ),
271 (
272 "scope_version",
273 ColumnDataType::String,
274 SemanticType::Field,
275 None,
276 None,
277 ),
278 (
279 "scope_attributes",
280 ColumnDataType::Binary,
281 SemanticType::Field,
282 Some(ColumnDataTypeExtension {
283 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
284 }),
285 None,
286 ),
287 (
288 "scope_schema_url",
289 ColumnDataType::String,
290 SemanticType::Field,
291 None,
292 None,
293 ),
294 (
295 "resource_attributes",
296 ColumnDataType::Binary,
297 SemanticType::Field,
298 Some(ColumnDataTypeExtension {
299 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
300 }),
301 None,
302 ),
303 (
304 "resource_schema_url",
305 ColumnDataType::String,
306 SemanticType::Field,
307 None,
308 None,
309 ),
310 ]
311 .into_iter()
312 .map(
313 |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema {
314 column_name: field_name.to_string(),
315 datatype: column_type as i32,
316 semantic_type: semantic_type as i32,
317 datatype_extension,
318 options,
319 },
320 )
321 .collect::<Vec<ColumnSchema>>()
322}
323
324#[derive(Clone)]
325struct ExistingLogColumn {
326 schema: ColumnSchema,
327 datatype: ColumnDataType,
328}
329
330impl ExistingLogColumn {
331 fn schema_for_request_type(&self, request_type: ColumnDataType) -> ColumnSchema {
332 let mut schema = self.schema.clone();
333 if request_type == ColumnDataType::Binary && self.is_json_binary() {
334 schema.datatype = ColumnDataType::Binary as i32;
335 }
336 schema
337 }
338
339 fn is_json_binary(&self) -> bool {
340 self.datatype == ColumnDataType::Json
341 && matches!(
342 self.schema
343 .datatype_extension
344 .as_ref()
345 .and_then(|datatype_extension| datatype_extension.type_ext.as_ref()),
346 Some(TypeExt::JsonType(json_type))
347 if *json_type == JsonTypeExtension::JsonBinary as i32
348 )
349 }
350}
351
352#[derive(Default)]
353struct ExistingLogSchema {
354 columns: HashMap<String, ExistingLogColumn>,
355}
356
357impl ExistingLogSchema {
358 fn try_from_table(table: &table::Table) -> Result<Self> {
359 let table_info = table.table_info();
360 Self::try_from_schema_parts(
361 table.schema_ref().column_schemas(),
362 &table_info.meta.primary_key_indices,
363 )
364 }
365
366 fn try_from_schema_parts(
367 column_schemas: &[datatypes::schema::ColumnSchema],
368 primary_key_indices: &[usize],
369 ) -> Result<Self> {
370 let mut columns = HashMap::with_capacity(column_schemas.len());
371
372 for (index, column_schema) in column_schemas.iter().enumerate() {
373 let (datatype, datatype_extension) =
374 ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
375 .map(|wrapper| wrapper.into_parts())
376 .map_err(Error::from)?;
377 let semantic_type = if column_schema.is_time_index() {
378 SemanticType::Timestamp
379 } else if primary_key_indices.contains(&index) {
380 SemanticType::Tag
381 } else {
382 SemanticType::Field
383 };
384 let schema = ColumnSchema {
385 column_name: column_schema.name.clone(),
386 datatype: datatype as i32,
387 semantic_type: semantic_type as i32,
388 datatype_extension,
389 options: options_from_column_schema(column_schema),
390 };
391 columns.insert(
392 schema.column_name.clone(),
393 ExistingLogColumn { schema, datatype },
394 );
395 }
396
397 Ok(Self { columns })
398 }
399
400 fn get(&self, column_name: &str) -> Option<&ExistingLogColumn> {
401 self.columns.get(column_name)
402 }
403}
404
405fn build_otlp_build_in_row(
406 log: LogRecord,
407 parse_ctx: &mut ParseContext,
408) -> (Row, JsonbValue<'static>) {
409 let log_attr = key_value_to_jsonb(log.attributes);
410 let ts = if log.time_unix_nano != 0 {
411 log.time_unix_nano
412 } else {
413 log.observed_time_unix_nano
414 };
415
416 let row = vec![
417 GreptimeValue {
418 value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)),
419 },
420 GreptimeValue {
421 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))),
422 },
423 GreptimeValue {
424 value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))),
425 },
426 GreptimeValue {
427 value_data: Some(ValueData::StringValue(log.severity_text)),
428 },
429 GreptimeValue {
430 value_data: Some(ValueData::I32Value(log.severity_number)),
431 },
432 GreptimeValue {
433 value_data: log
434 .body
435 .as_ref()
436 .map(|x| ValueData::StringValue(log_body_to_string(x))),
437 },
438 GreptimeValue {
439 value_data: Some(ValueData::BinaryValue(log_attr.to_vec())),
440 },
441 GreptimeValue {
442 value_data: Some(ValueData::U32Value(log.flags)),
443 },
444 GreptimeValue {
445 value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue),
446 },
447 GreptimeValue {
448 value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue),
449 },
450 GreptimeValue {
451 value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())),
452 },
453 GreptimeValue {
454 value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())),
455 },
456 GreptimeValue {
457 value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())),
458 },
459 GreptimeValue {
460 value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
461 },
462 ];
463 (Row { values: row }, log_attr)
464}
465
466fn extract_field_from_attr_and_combine_schema(
467 select_info: &SelectInfo,
468 select_schema: &mut SchemaInfo,
469 attrs: &jsonb::Value,
470 existing_schema: Option<&ExistingLogSchema>,
471 table_name: &str,
472) -> Result<Vec<GreptimeValue>> {
473 let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
476
477 for key in select_info.keys.iter() {
478 let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
479 continue;
480 };
481 let Some((schema, value)) =
482 decide_column_schema_and_convert_value(key, value, existing_schema, table_name)?
483 else {
484 continue;
485 };
486
487 if let Some(index) = select_schema.index.get(key) {
488 let column_schema = &select_schema.schema[*index];
489 let column_schema: ColumnSchema = column_schema.clone().try_into()?;
490 ensure!(
492 column_schema.datatype == schema.datatype,
493 IncompatibleSchemaSnafu {
494 column_name: key,
495 datatype: column_schema.datatype().as_str_name(),
496 expected: column_schema.datatype,
497 actual: schema.datatype,
498 }
499 );
500 extracted_values[*index] = value;
501 } else {
502 select_schema.schema.push(schema.into());
503 select_schema
504 .index
505 .insert(key.clone(), select_schema.schema.len() - 1);
506 extracted_values.push(value);
507 }
508 }
509
510 Ok(extracted_values)
511}
512
513fn decide_column_schema_and_convert_value(
514 column_name: &str,
515 value: JsonbValue,
516 existing_schema: Option<&ExistingLogSchema>,
517 table_name: &str,
518) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
519 if let Some(existing_column) = existing_schema.and_then(|schema| schema.get(column_name)) {
520 return decide_existing_column_schema_and_convert_value(
521 column_name,
522 value,
523 existing_column,
524 table_name,
525 );
526 }
527
528 let column_info = match value {
529 JsonbValue::String(s) => Ok(Some((
530 GreptimeValue {
531 value_data: Some(ValueData::StringValue(s.into())),
532 },
533 ColumnDataType::String,
534 SemanticType::Tag,
535 None,
536 ))),
537 JsonbValue::Number(n) => match n {
538 JsonbNumber::Int64(i) => Ok(Some((
539 GreptimeValue {
540 value_data: Some(ValueData::I64Value(i)),
541 },
542 ColumnDataType::Int64,
543 SemanticType::Tag,
544 None,
545 ))),
546 JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
547 ty: "FLOAT".to_string(),
548 key: column_name,
549 }
550 .fail(),
551 JsonbNumber::UInt64(u) => Ok(Some((
552 GreptimeValue {
553 value_data: Some(ValueData::U64Value(u)),
554 },
555 ColumnDataType::Uint64,
556 SemanticType::Tag,
557 None,
558 ))),
559 },
560 JsonbValue::Bool(b) => Ok(Some((
561 GreptimeValue {
562 value_data: Some(ValueData::BoolValue(b)),
563 },
564 ColumnDataType::Boolean,
565 SemanticType::Tag,
566 None,
567 ))),
568 JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
569 ty: "Json".to_string(),
570 key: column_name,
571 }
572 .fail(),
573 JsonbValue::Null => Ok(None),
574 };
575 column_info.map(|c| {
576 c.map(|(value, column_type, semantic_type, datatype_extension)| {
577 (
578 ColumnSchema {
579 column_name: column_name.to_string(),
580 datatype: column_type as i32,
581 semantic_type: semantic_type as i32,
582 datatype_extension,
583 options: None,
584 },
585 value,
586 )
587 })
588 })
589}
590
591fn decide_existing_column_schema_and_convert_value(
592 column_name: &str,
593 value: JsonbValue,
594 existing_column: &ExistingLogColumn,
595 table_name: &str,
596) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
597 let Some((value_data, request_type)) = jsonb_value_to_log_value_data(column_name, value, true)?
598 else {
599 return Ok(None);
600 };
601 let value_data = coerce_log_value_data(
602 Some(value_data),
603 existing_column.datatype,
604 existing_column.schema.semantic_type(),
605 request_type,
606 existing_column.is_json_binary(),
607 column_name,
608 table_name,
609 )?;
610
611 Ok(Some((
612 existing_column.schema.clone(),
613 GreptimeValue { value_data },
614 )))
615}
616
617fn jsonb_value_to_log_value_data(
618 column_name: &str,
619 value: JsonbValue,
620 allow_float: bool,
621) -> Result<Option<(ValueData, ColumnDataType)>> {
622 match value {
623 JsonbValue::String(s) => Ok(Some((
624 ValueData::StringValue(s.into()),
625 ColumnDataType::String,
626 ))),
627 JsonbValue::Number(n) => match n {
628 JsonbNumber::Int64(i) => Ok(Some((ValueData::I64Value(i), ColumnDataType::Int64))),
629 JsonbNumber::Float64(f) if allow_float => {
630 Ok(Some((ValueData::F64Value(f), ColumnDataType::Float64)))
631 }
632 JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu {
633 ty: "FLOAT".to_string(),
634 key: column_name,
635 }
636 .fail(),
637 JsonbNumber::UInt64(u) => Ok(Some((ValueData::U64Value(u), ColumnDataType::Uint64))),
638 },
639 JsonbValue::Bool(b) => Ok(Some((ValueData::BoolValue(b), ColumnDataType::Boolean))),
640 JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu {
641 ty: "Json".to_string(),
642 key: column_name,
643 }
644 .fail(),
645 JsonbValue::Null => Ok(None),
646 }
647}
648
649fn align_rows_with_existing_schema(
650 schemas: &mut [ColumnSchema],
651 rows: &mut [Row],
652 existing_schema: Option<&ExistingLogSchema>,
653 table_name: &str,
654) -> Result<()> {
655 let Some(existing_schema) = existing_schema else {
656 return Ok(());
657 };
658
659 for (column_idx, schema) in schemas.iter_mut().enumerate() {
660 let request_type = schema.datatype();
661 let Some(existing_column) = existing_schema.get(&schema.column_name) else {
662 if schema.semantic_type() == SemanticType::Tag {
665 schema.semantic_type = SemanticType::Field as i32;
666 }
667 continue;
668 };
669
670 let target_type = existing_column.datatype;
671 let semantic_type = existing_column.schema.semantic_type();
672 let target_is_json_binary = existing_column.is_json_binary();
673 for row in rows.iter_mut() {
674 let Some(value) = row.values.get_mut(column_idx) else {
675 continue;
676 };
677 value.value_data = coerce_log_value_data(
678 value.value_data.take(),
679 target_type,
680 semantic_type,
681 request_type,
682 target_is_json_binary,
683 &schema.column_name,
684 table_name,
685 )?;
686 }
687 *schema = existing_column.schema_for_request_type(request_type);
688 }
689
690 Ok(())
691}
692
693fn coerce_log_value_data(
694 value_data: Option<ValueData>,
695 target_type: ColumnDataType,
696 _semantic_type: SemanticType,
697 request_type: ColumnDataType,
698 target_is_json_binary: bool,
699 column_name: &str,
700 table_name: &str,
701) -> Result<Option<ValueData>> {
702 let Some(value_data) = value_data else {
703 return Ok(None);
704 };
705
706 if request_type == target_type {
707 return Ok(Some(value_data));
708 }
709
710 if request_type == ColumnDataType::Binary && target_is_json_binary {
711 return Ok(Some(value_data));
712 }
713
714 if is_timestamp_type(request_type)
715 && let Some(target_unit) = timestamp_unit(target_type)
716 {
717 return align_timestamp_value(value_data, target_unit, column_name, table_name).map(Some);
718 }
719
720 if target_type == ColumnDataType::String {
721 if let Ok(value_data) =
722 coerce_value_data(&Some(value_data.clone()), target_type, request_type)
723 {
724 return Ok(value_data);
725 }
726 if let Some(value_data) = stringify_scalar_value(value_data) {
727 return Ok(Some(value_data));
728 }
729 }
730
731 InvalidParameterSnafu {
732 reason: format!(
733 "failed to align log column '{}' in table '{}' from {:?} to {:?}",
734 column_name, table_name, request_type, target_type
735 ),
736 }
737 .fail()
738}
739
740fn stringify_scalar_value(value_data: ValueData) -> Option<ValueData> {
741 let value = match value_data {
742 ValueData::StringValue(value) => value,
743 ValueData::BoolValue(value) => value.to_string(),
744 ValueData::I8Value(value) => value.to_string(),
745 ValueData::I16Value(value) => value.to_string(),
746 ValueData::I32Value(value) => value.to_string(),
747 ValueData::I64Value(value) => value.to_string(),
748 ValueData::U8Value(value) => value.to_string(),
749 ValueData::U16Value(value) => value.to_string(),
750 ValueData::U32Value(value) => value.to_string(),
751 ValueData::U64Value(value) => value.to_string(),
752 ValueData::F32Value(value) => value.to_string(),
753 ValueData::F64Value(value) => value.to_string(),
754 _ => return None,
755 };
756 Some(ValueData::StringValue(value))
757}
758
759fn align_timestamp_value(
760 value_data: ValueData,
761 target_unit: TimeUnit,
762 column_name: &str,
763 table_name: &str,
764) -> Result<ValueData> {
765 let timestamp = match value_data {
766 ValueData::TimestampSecondValue(value) => Timestamp::new_second(value),
767 ValueData::TimestampMillisecondValue(value) => Timestamp::new_millisecond(value),
768 ValueData::TimestampMicrosecondValue(value) => Timestamp::new_microsecond(value),
769 ValueData::TimestampNanosecondValue(value) => Timestamp::new_nanosecond(value),
770 value_data => {
771 return InvalidParameterSnafu {
772 reason: format!(
773 "failed to align log column '{}' in table '{}' from non-timestamp value {:?}",
774 column_name, table_name, value_data
775 ),
776 }
777 .fail();
778 }
779 };
780 let timestamp = timestamp.convert_to(target_unit).ok_or_else(|| {
781 InvalidParameterSnafu {
782 reason: format!(
783 "failed to align log column '{}' in table '{}' to timestamp unit {}",
784 column_name, table_name, target_unit
785 ),
786 }
787 .build()
788 })?;
789
790 Ok(match target_unit {
791 TimeUnit::Second => ValueData::TimestampSecondValue(timestamp.value()),
792 TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(timestamp.value()),
793 TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(timestamp.value()),
794 TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(timestamp.value()),
795 })
796}
797
798fn is_timestamp_type(datatype: ColumnDataType) -> bool {
799 timestamp_unit(datatype).is_some()
800}
801
802fn timestamp_unit(datatype: ColumnDataType) -> Option<TimeUnit> {
803 match datatype {
804 ColumnDataType::TimestampSecond => Some(TimeUnit::Second),
805 ColumnDataType::TimestampMillisecond => Some(TimeUnit::Millisecond),
806 ColumnDataType::TimestampMicrosecond => Some(TimeUnit::Microsecond),
807 ColumnDataType::TimestampNanosecond => Some(TimeUnit::Nanosecond),
808 _ => None,
809 }
810}
811
812fn parse_export_logs_service_request_to_rows(
813 request: ExportLogsServiceRequest,
814 select_info: Box<SelectInfo>,
815 existing_schema: Option<&ExistingLogSchema>,
816 table_name: &str,
817) -> Result<Rows> {
818 let mut schemas = build_otlp_logs_identity_schema();
819
820 let mut parse_ctx = ParseContext::new(select_info, existing_schema, table_name);
821 let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
822
823 schemas.extend(parse_ctx.select_schema.column_schemas()?);
824 align_rows_with_existing_schema(&mut schemas, &mut rows, existing_schema, table_name)?;
825
826 rows.iter_mut().for_each(|row| {
827 row.values.resize(schemas.len(), GreptimeValue::default());
828 });
829
830 Ok(Rows {
831 schema: schemas,
832 rows,
833 })
834}
835
836fn parse_resource(
837 parse_ctx: &mut ParseContext,
838 resource_logs_vec: Vec<ResourceLogs>,
839) -> Result<Vec<Row>> {
840 let total_len = resource_logs_vec
841 .iter()
842 .flat_map(|r| r.scope_logs.iter())
843 .map(|s| s.log_records.len())
844 .sum();
845
846 let mut results = Vec::with_capacity(total_len);
847
848 for r in resource_logs_vec {
849 parse_ctx.resource_attr = r
850 .resource
851 .map(|resource| key_value_to_jsonb(resource.attributes))
852 .unwrap_or(JsonbValue::Null);
853
854 parse_ctx.resource_url = r.schema_url;
855
856 parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
857 &parse_ctx.select_info,
858 &mut parse_ctx.select_schema,
859 &parse_ctx.resource_attr,
860 parse_ctx.existing_schema,
861 parse_ctx.table_name,
862 )?;
863
864 let rows = parse_scope(r.scope_logs, parse_ctx)?;
865 results.extend(rows);
866 }
867 Ok(results)
868}
869
870struct ParseContext<'a> {
871 select_info: Box<SelectInfo>,
873 existing_schema: Option<&'a ExistingLogSchema>,
874 table_name: &'a str,
875 select_schema: SchemaInfo,
878
879 resource_uplift_values: Vec<GreptimeValue>,
881 scope_uplift_values: Vec<GreptimeValue>,
882
883 resource_url: String,
885 resource_attr: JsonbValue<'a>,
886 scope_name: Option<String>,
887 scope_version: Option<String>,
888 scope_url: String,
889 scope_attrs: JsonbValue<'a>,
890}
891
892impl<'a> ParseContext<'a> {
893 pub fn new(
894 select_info: Box<SelectInfo>,
895 existing_schema: Option<&'a ExistingLogSchema>,
896 table_name: &'a str,
897 ) -> ParseContext<'a> {
898 let len = select_info.keys.len();
899 ParseContext {
900 select_info,
901 existing_schema,
902 table_name,
903 select_schema: SchemaInfo::with_capacity(len),
904 resource_uplift_values: vec![],
905 scope_uplift_values: vec![],
906 resource_url: String::new(),
907 resource_attr: JsonbValue::Null,
908 scope_name: None,
909 scope_version: None,
910 scope_url: String::new(),
911 scope_attrs: JsonbValue::Null,
912 }
913 }
914}
915
916fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
917 let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
918 let mut results = Vec::with_capacity(len);
919
920 for scope_logs in scopes_log_vec {
921 let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
922 parse_ctx.scope_name = scope_name;
923 parse_ctx.scope_version = scope_version;
924 parse_ctx.scope_url = scope_logs.schema_url;
925 parse_ctx.scope_attrs = scope_attrs;
926
927 parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
928 &parse_ctx.select_info,
929 &mut parse_ctx.select_schema,
930 &parse_ctx.scope_attrs,
931 parse_ctx.existing_schema,
932 parse_ctx.table_name,
933 )?;
934
935 let rows = parse_log(scope_logs.log_records, parse_ctx)?;
936 results.extend(rows);
937 }
938 Ok(results)
939}
940
941fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
942 let mut result = Vec::with_capacity(log_records.len());
943
944 for log in log_records {
945 let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
946
947 let log_values = extract_field_from_attr_and_combine_schema(
948 &parse_ctx.select_info,
949 &mut parse_ctx.select_schema,
950 &log_attr,
951 parse_ctx.existing_schema,
952 parse_ctx.table_name,
953 )?;
954
955 let extracted_values = merge_values(
956 log_values,
957 &parse_ctx.scope_uplift_values,
958 &parse_ctx.resource_uplift_values,
959 );
960
961 row.values.extend(extracted_values);
962
963 result.push(row);
964 }
965 Ok(result)
966}
967
968fn merge_values(
969 log: Vec<GreptimeValue>,
970 scope: &[GreptimeValue],
971 resource: &[GreptimeValue],
972) -> Vec<GreptimeValue> {
973 log.into_iter()
974 .enumerate()
975 .map(|(i, value)| GreptimeValue {
976 value_data: value
977 .value_data
978 .or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
979 .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
980 })
981 .collect()
982}
983
984fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec<VrlValue> {
987 let mut result = Vec::new();
988 for r in request.resource_logs {
989 let resource_attr = r
990 .resource
991 .map(|x| VrlValue::Object(key_value_to_map(x.attributes)))
992 .unwrap_or(VrlValue::Null);
993 let resource_schema_url = VrlValue::Bytes(r.schema_url.into());
994 for scope_logs in r.scope_logs {
995 let (scope_attrs, scope_version, scope_name) =
996 scope_to_pipeline_value(scope_logs.scope);
997 let scope_schema_url = VrlValue::Bytes(scope_logs.schema_url.into());
998 for log in scope_logs.log_records {
999 let value = log_to_pipeline_value(
1000 log,
1001 resource_schema_url.clone(),
1002 resource_attr.clone(),
1003 scope_schema_url.clone(),
1004 scope_name.clone(),
1005 scope_version.clone(),
1006 scope_attrs.clone(),
1007 );
1008 result.push(value);
1009 }
1010 }
1011 }
1012 result
1013}
1014
1015fn any_value_to_vrl_value(value: any_value::Value) -> VrlValue {
1017 match value {
1018 any_value::Value::StringValue(s) => VrlValue::Bytes(s.into()),
1019 any_value::Value::IntValue(i) => VrlValue::Integer(i),
1020 any_value::Value::DoubleValue(d) => VrlValue::Float(NotNan::new(d).unwrap()),
1021 any_value::Value::BoolValue(b) => VrlValue::Boolean(b),
1022 any_value::Value::ArrayValue(array_value) => {
1023 let values = array_value
1024 .values
1025 .into_iter()
1026 .filter_map(|v| v.value.map(any_value_to_vrl_value))
1027 .collect();
1028 VrlValue::Array(values)
1029 }
1030 any_value::Value::KvlistValue(key_value_list) => {
1031 VrlValue::Object(key_value_to_map(key_value_list.values))
1032 }
1033 any_value::Value::BytesValue(items) => VrlValue::Bytes(Bytes::from(items)),
1034 }
1035}
1036
1037fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<KeyString, VrlValue> {
1039 let mut map = BTreeMap::new();
1040 for kv in key_values {
1041 let value = match kv.value {
1042 Some(value) => match value.value {
1043 Some(value) => any_value_to_vrl_value(value),
1044 None => VrlValue::Null,
1045 },
1046 None => VrlValue::Null,
1047 };
1048 map.insert(kv.key.into(), value);
1049 }
1050 map
1051}
1052
1053fn log_body_to_string(body: &AnyValue) -> String {
1054 let otlp_value = OtlpAnyValue::from(body);
1055 otlp_value.to_string()
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use datatypes::prelude::ConcreteDataType;
1061 use datatypes::schema::ColumnSchema as DatatypesColumnSchema;
1062 use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
1063
1064 use super::*;
1065
1066 fn time_column(datatype: ConcreteDataType) -> DatatypesColumnSchema {
1067 DatatypesColumnSchema::new("timestamp", datatype, false).with_time_index(true)
1068 }
1069
1070 fn column(name: &str, datatype: ConcreteDataType) -> DatatypesColumnSchema {
1071 DatatypesColumnSchema::new(name, datatype, true)
1072 }
1073
1074 fn existing_schema(
1075 columns: Vec<DatatypesColumnSchema>,
1076 primary_key_indices: &[usize],
1077 ) -> ExistingLogSchema {
1078 ExistingLogSchema::try_from_schema_parts(&columns, primary_key_indices).unwrap()
1079 }
1080
1081 fn kv(key: &str, value: OtlpValue) -> KeyValue {
1082 KeyValue {
1083 key: key.to_string(),
1084 value: Some(AnyValue { value: Some(value) }),
1085 }
1086 }
1087
1088 fn request_with_log_attrs(attrs: Vec<KeyValue>) -> ExportLogsServiceRequest {
1089 ExportLogsServiceRequest {
1090 resource_logs: vec![ResourceLogs {
1091 scope_logs: vec![ScopeLogs {
1092 log_records: vec![LogRecord {
1093 time_unix_nano: 1_234_000_000,
1094 trace_id: vec![1; 16],
1095 attributes: attrs,
1096 ..Default::default()
1097 }],
1098 ..Default::default()
1099 }],
1100 ..Default::default()
1101 }],
1102 }
1103 }
1104
1105 fn parse_with_select(
1106 request: ExportLogsServiceRequest,
1107 select: &str,
1108 existing_schema: Option<&ExistingLogSchema>,
1109 ) -> Result<Rows> {
1110 parse_export_logs_service_request_to_rows(
1111 request,
1112 Box::new(SelectInfo::from(select.to_string())),
1113 existing_schema,
1114 "test_logs",
1115 )
1116 }
1117
1118 fn column_index(rows: &Rows, name: &str) -> usize {
1119 rows.schema
1120 .iter()
1121 .position(|schema| schema.column_name == name)
1122 .unwrap()
1123 }
1124
1125 #[test]
1126 fn test_no_existing_table_preserves_direct_schema() {
1127 let rows = parse_with_select(request_with_log_attrs(vec![]), "", None).unwrap();
1128
1129 assert_eq!(rows.schema[0].column_name, "timestamp");
1130 assert_eq!(
1131 rows.schema[0].datatype,
1132 ColumnDataType::TimestampNanosecond as i32
1133 );
1134 assert_eq!(rows.schema[0].semantic_type, SemanticType::Timestamp as i32);
1135 let scope_name_idx = column_index(&rows, "scope_name");
1136 assert_eq!(
1137 rows.schema[scope_name_idx].semantic_type,
1138 SemanticType::Tag as i32
1139 );
1140 }
1141
1142 #[test]
1143 fn test_existing_primary_key_updates_builtin_column_semantic_type() {
1144 let existing = existing_schema(
1145 vec![
1146 time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1147 column("trace_id", ConcreteDataType::string_datatype()),
1148 ],
1149 &[1],
1150 );
1151
1152 let rows = parse_with_select(request_with_log_attrs(vec![]), "", Some(&existing)).unwrap();
1153 let trace_id_idx = column_index(&rows, "trace_id");
1154
1155 assert_eq!(
1156 rows.schema[trace_id_idx].semantic_type,
1157 SemanticType::Tag as i32
1158 );
1159 }
1160
1161 #[test]
1162 fn test_existing_string_primary_key_stringifies_selected_scalar_values() {
1163 let existing = existing_schema(
1164 vec![
1165 time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1166 column("host", ConcreteDataType::string_datatype()),
1167 ],
1168 &[1],
1169 );
1170 let rows = parse_with_select(
1171 request_with_log_attrs(vec![kv("host", OtlpValue::IntValue(42))]),
1172 "host",
1173 Some(&existing),
1174 )
1175 .unwrap();
1176 let host_idx = column_index(&rows, "host");
1177
1178 assert_eq!(
1179 rows.schema[host_idx].datatype,
1180 ColumnDataType::String as i32
1181 );
1182 assert_eq!(
1183 rows.schema[host_idx].semantic_type,
1184 SemanticType::Tag as i32
1185 );
1186 assert_eq!(
1187 rows.rows[0].values[host_idx].value_data,
1188 Some(ValueData::StringValue("42".to_string()))
1189 );
1190 }
1191
1192 #[test]
1193 fn test_existing_string_field_stringifies_selected_scalar_values() {
1194 let existing = existing_schema(
1195 vec![
1196 time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1197 column("host", ConcreteDataType::string_datatype()),
1198 ],
1199 &[],
1200 );
1201 let rows = parse_with_select(
1202 request_with_log_attrs(vec![kv("host", OtlpValue::IntValue(42))]),
1203 "host",
1204 Some(&existing),
1205 )
1206 .unwrap();
1207 let host_idx = column_index(&rows, "host");
1208
1209 assert_eq!(
1210 rows.schema[host_idx].datatype,
1211 ColumnDataType::String as i32
1212 );
1213 assert_eq!(
1214 rows.schema[host_idx].semantic_type,
1215 SemanticType::Field as i32
1216 );
1217 assert_eq!(
1218 rows.rows[0].values[host_idx].value_data,
1219 Some(ValueData::StringValue("42".to_string()))
1220 );
1221 }
1222
1223 #[test]
1224 fn test_existing_non_string_primary_key_rejects_incompatible_selected_value() {
1225 let existing = existing_schema(
1226 vec![
1227 time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1228 column("host", ConcreteDataType::int64_datatype()),
1229 ],
1230 &[1],
1231 );
1232 let err = parse_with_select(
1233 request_with_log_attrs(vec![kv(
1234 "host",
1235 OtlpValue::StringValue("node-a".to_string()),
1236 )]),
1237 "host",
1238 Some(&existing),
1239 )
1240 .unwrap_err();
1241
1242 assert!(
1243 err.to_string()
1244 .contains("failed to align log column 'host'")
1245 );
1246 }
1247
1248 #[test]
1249 fn test_existing_timestamp_unit_is_respected() {
1250 let existing = existing_schema(
1251 vec![time_column(
1252 ConcreteDataType::timestamp_millisecond_datatype(),
1253 )],
1254 &[],
1255 );
1256 let rows = parse_with_select(request_with_log_attrs(vec![]), "", Some(&existing)).unwrap();
1257
1258 assert_eq!(
1259 rows.schema[0].datatype,
1260 ColumnDataType::TimestampMillisecond as i32
1261 );
1262 assert_eq!(
1263 rows.rows[0].values[0].value_data,
1264 Some(ValueData::TimestampMillisecondValue(1234))
1265 );
1266 }
1267
1268 #[test]
1269 fn test_missing_existing_primary_key_is_not_generated() {
1270 let existing = existing_schema(
1271 vec![
1272 time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1273 column("host", ConcreteDataType::string_datatype()),
1274 ],
1275 &[1],
1276 );
1277 let rows = parse_with_select(request_with_log_attrs(vec![]), "", Some(&existing)).unwrap();
1278
1279 assert!(
1280 !rows
1281 .schema
1282 .iter()
1283 .any(|schema| schema.column_name == "host")
1284 );
1285 }
1286
1287 #[test]
1288 fn test_existing_table_keeps_new_generated_columns_as_fields() {
1289 let existing = existing_schema(
1290 vec![
1291 time_column(ConcreteDataType::timestamp_nanosecond_datatype()),
1292 column("trace_id", ConcreteDataType::string_datatype()),
1293 ],
1294 &[1],
1295 );
1296 let rows = parse_with_select(
1297 request_with_log_attrs(vec![kv(
1298 "host",
1299 OtlpValue::StringValue("node-a".to_string()),
1300 )]),
1301 "host",
1302 Some(&existing),
1303 )
1304 .unwrap();
1305 let host_idx = column_index(&rows, "host");
1306 let scope_name_idx = column_index(&rows, "scope_name");
1307
1308 assert_eq!(
1309 rows.schema[host_idx].semantic_type,
1310 SemanticType::Field as i32
1311 );
1312 assert_eq!(
1313 rows.schema[scope_name_idx].semantic_type,
1314 SemanticType::Field as i32
1315 );
1316 }
1317}