1#![allow(dead_code)]
16pub mod ctx_req;
17pub mod field;
18pub mod processor;
19pub mod transform;
20pub mod value;
21
22use ahash::{HashMap, HashMapExt};
23use api::v1::Row;
24use common_time::timestamp::TimeUnit;
25use processor::{Processor, Processors};
26use snafu::{ensure, OptionExt, ResultExt};
27use transform::Transforms;
28use value::Value;
29use yaml_rust::YamlLoader;
30
31use crate::dispatcher::{Dispatcher, Rule};
32use crate::error::{
33 AutoTransformOneTimestampSnafu, InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
34 YamlLoadSnafu, YamlParseSnafu,
35};
36use crate::etl::ctx_req::TABLE_SUFFIX_KEY;
37use crate::etl::processor::ProcessorKind;
38use crate::tablesuffix::TableSuffixTemplate;
39use crate::{ContextOpt, GreptimeTransformer};
40
41const DESCRIPTION: &str = "description";
42const PROCESSORS: &str = "processors";
43const TRANSFORM: &str = "transform";
44const TRANSFORMS: &str = "transforms";
45const DISPATCHER: &str = "dispatcher";
46const TABLESUFFIX: &str = "table_suffix";
47
48pub type PipelineMap = std::collections::BTreeMap<String, Value>;
49
50pub enum Content<'a> {
51 Json(&'a str),
52 Yaml(&'a str),
53}
54
55pub fn parse(input: &Content) -> Result<Pipeline> {
56 match input {
57 Content::Yaml(str) => {
58 let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
59
60 ensure!(docs.len() == 1, YamlParseSnafu);
61
62 let doc = &docs[0];
63
64 let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
65
66 let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
67 v.try_into()?
68 } else {
69 Processors::default()
70 };
71
72 let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
73 {
74 v.try_into()?
75 } else {
76 Transforms::default()
77 };
78
79 let transformer = if transformers.is_empty() {
80 let cnt = processors
83 .iter()
84 .filter_map(|p| match p {
85 ProcessorKind::Date(d) => Some(d.target_count()),
86 ProcessorKind::Timestamp(t) => Some(t.target_count()),
87 ProcessorKind::Epoch(e) => Some(e.target_count()),
88 _ => None,
89 })
90 .sum::<usize>();
91 ensure!(cnt == 1, AutoTransformOneTimestampSnafu);
92 None
93 } else {
94 Some(GreptimeTransformer::new(transformers)?)
95 };
96
97 let dispatcher = if !doc[DISPATCHER].is_badvalue() {
98 Some(Dispatcher::try_from(&doc[DISPATCHER])?)
99 } else {
100 None
101 };
102
103 let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
104 Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
105 } else {
106 None
107 };
108
109 Ok(Pipeline {
110 description,
111 processors,
112 transformer,
113 dispatcher,
114 tablesuffix,
115 })
116 }
117 Content::Json(_) => unimplemented!(),
118 }
119}
120
121#[derive(Debug)]
122pub struct Pipeline {
123 description: Option<String>,
124 processors: processor::Processors,
125 dispatcher: Option<Dispatcher>,
126 transformer: Option<GreptimeTransformer>,
127 tablesuffix: Option<TableSuffixTemplate>,
128}
129
130#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
132pub struct DispatchedTo {
133 pub table_suffix: String,
134 pub pipeline: Option<String>,
135}
136
137impl From<&Rule> for DispatchedTo {
138 fn from(value: &Rule) -> Self {
139 DispatchedTo {
140 table_suffix: value.table_suffix.clone(),
141 pipeline: value.pipeline.clone(),
142 }
143 }
144}
145
146impl DispatchedTo {
147 pub fn dispatched_to_table_name(&self, original: &str) -> String {
149 format!("{}_{}", &original, self.table_suffix)
150 }
151}
152
153#[derive(Debug)]
155pub enum PipelineExecOutput {
156 Transformed(TransformedOutput),
157 AutoTransform(AutoTransformOutput),
158 DispatchedTo(DispatchedTo, PipelineMap),
159}
160
161#[derive(Debug)]
162pub struct TransformedOutput {
163 pub opt: ContextOpt,
164 pub row: Row,
165 pub table_suffix: Option<String>,
166 pub pipeline_map: PipelineMap,
167}
168
169#[derive(Debug)]
170pub struct AutoTransformOutput {
171 pub table_suffix: Option<String>,
172 pub ts_unit_map: HashMap<String, TimeUnit>,
174 pub pipeline_map: PipelineMap,
175}
176
177impl PipelineExecOutput {
178 pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
180 if let Self::Transformed(TransformedOutput {
181 row, table_suffix, ..
182 }) = self
183 {
184 Some((row, table_suffix))
185 } else {
186 None
187 }
188 }
189
190 pub fn into_dispatched(self) -> Option<DispatchedTo> {
192 if let Self::DispatchedTo(d, _) = self {
193 Some(d)
194 } else {
195 None
196 }
197 }
198}
199
200pub fn json_to_map(val: serde_json::Value) -> Result<PipelineMap> {
201 match val {
202 serde_json::Value::Object(map) => {
203 let mut intermediate_state = PipelineMap::new();
204 for (k, v) in map {
205 intermediate_state.insert(k, Value::try_from(v)?);
206 }
207 Ok(intermediate_state)
208 }
209 _ => InputValueMustBeObjectSnafu.fail(),
210 }
211}
212
213pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>> {
214 val.into_iter().map(json_to_map).collect()
215}
216
217pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<PipelineMap> {
218 match val {
219 simd_json::OwnedValue::Object(map) => {
220 let mut intermediate_state = PipelineMap::new();
221 for (k, v) in map.into_iter() {
222 intermediate_state.insert(k, Value::try_from(v)?);
223 }
224 Ok(intermediate_state)
225 }
226 _ => InputValueMustBeObjectSnafu.fail(),
227 }
228}
229
230pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<PipelineMap>> {
231 val.into_iter().map(simd_json_to_map).collect()
232}
233
234impl Pipeline {
235 pub fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineExecOutput> {
236 for processor in self.processors.iter() {
238 val = processor.exec_mut(val)?;
239 }
240
241 if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(&val)) {
243 return Ok(PipelineExecOutput::DispatchedTo(rule.into(), val));
244 }
245
246 if let Some(transformer) = self.transformer() {
248 let (mut opt, row) = transformer.transform_mut(&mut val)?;
249 let table_suffix = opt.resolve_table_suffix(self.tablesuffix.as_ref(), &val);
250
251 Ok(PipelineExecOutput::Transformed(TransformedOutput {
252 opt,
253 row,
254 table_suffix,
255 pipeline_map: val,
256 }))
257 } else {
258 let table_suffix = val
260 .remove(TABLE_SUFFIX_KEY)
261 .map(|f| f.to_str_value())
262 .or_else(|| self.tablesuffix.as_ref().and_then(|t| t.apply(&val)));
263
264 let mut ts_unit_map = HashMap::with_capacity(4);
265 for (k, v) in val.iter() {
267 if let Value::Timestamp(ts) = v {
268 if !ts_unit_map.contains_key(k) {
269 ts_unit_map.insert(k.clone(), ts.get_unit());
270 }
271 }
272 }
273 Ok(PipelineExecOutput::AutoTransform(AutoTransformOutput {
274 table_suffix,
275 ts_unit_map,
276 pipeline_map: val,
277 }))
278 }
279 }
280
281 pub fn processors(&self) -> &processor::Processors {
282 &self.processors
283 }
284
285 pub fn transformer(&self) -> Option<&GreptimeTransformer> {
286 self.transformer.as_ref()
287 }
288
289 pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
290 self.transformer.as_ref().map(|t| t.schemas())
291 }
292}
293
294pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
295 intermediate_keys
296 .iter()
297 .position(|k| k == key)
298 .context(IntermediateKeyIndexSnafu { kind, key })
299}
300
301#[cfg(test)]
302mod tests {
303 use api::v1::Rows;
304 use greptime_proto::v1::value::ValueData;
305 use greptime_proto::v1::{self, ColumnDataType, SemanticType};
306
307 use super::*;
308
309 #[test]
310 fn test_pipeline_prepare() {
311 let input_value_str = r#"
312 {
313 "my_field": "1,2",
314 "foo": "bar"
315 }
316 "#;
317 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
318
319 let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
320processors:
321 - csv:
322 field: my_field
323 target_fields: field1, field2
324transform:
325 - field: field1
326 type: uint32
327 - field: field2
328 type: uint32
329 "#;
330 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
331 let payload = json_to_map(input_value).unwrap();
332 let result = pipeline
333 .exec_mut(payload)
334 .unwrap()
335 .into_transformed()
336 .unwrap();
337
338 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
339 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
340 match &result.0.values[2].value_data {
341 Some(ValueData::TimestampNanosecondValue(v)) => {
342 assert_ne!(*v, 0);
343 }
344 _ => panic!("expect null value"),
345 }
346 }
347
348 #[test]
349 fn test_dissect_pipeline() {
350 let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
351 let pipeline_str = r#"processors:
352 - dissect:
353 fields:
354 - message
355 patterns:
356 - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
357 - timestamp:
358 fields:
359 - ts
360 formats:
361 - "%d/%b/%Y:%H:%M:%S %z"
362
363transform:
364 - fields:
365 - ip
366 - username
367 - method
368 - path
369 - proto
370 type: string
371 - fields:
372 - status
373 type: uint16
374 - fields:
375 - bytes
376 type: uint32
377 - field: ts
378 type: timestamp, ns
379 index: time"#;
380 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
381 let mut payload = PipelineMap::new();
382 payload.insert("message".to_string(), Value::String(message));
383 let result = pipeline
384 .exec_mut(payload)
385 .unwrap()
386 .into_transformed()
387 .unwrap();
388 let sechema = pipeline.schemas().unwrap();
389
390 assert_eq!(sechema.len(), result.0.values.len());
391 let test = vec![
392 (
393 ColumnDataType::String as i32,
394 Some(ValueData::StringValue("129.37.245.88".into())),
395 ),
396 (
397 ColumnDataType::String as i32,
398 Some(ValueData::StringValue("meln1ks".into())),
399 ),
400 (
401 ColumnDataType::String as i32,
402 Some(ValueData::StringValue("PATCH".into())),
403 ),
404 (
405 ColumnDataType::String as i32,
406 Some(ValueData::StringValue(
407 "/observability/metrics/production".into(),
408 )),
409 ),
410 (
411 ColumnDataType::String as i32,
412 Some(ValueData::StringValue("HTTP/1.0".into())),
413 ),
414 (
415 ColumnDataType::Uint16 as i32,
416 Some(ValueData::U16Value(501)),
417 ),
418 (
419 ColumnDataType::Uint32 as i32,
420 Some(ValueData::U32Value(33085)),
421 ),
422 (
423 ColumnDataType::TimestampNanosecond as i32,
424 Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
425 ),
426 ];
427 for i in 0..sechema.len() {
428 let schema = &sechema[i];
429 let value = &result.0.values[i];
430 assert_eq!(schema.datatype, test[i].0);
431 assert_eq!(value.value_data, test[i].1);
432 }
433 }
434
435 #[test]
436 fn test_csv_pipeline() {
437 let input_value_str = r#"
438 {
439 "my_field": "1,2",
440 "foo": "bar"
441 }
442 "#;
443 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
444
445 let pipeline_yaml = r#"
446 description: Pipeline for Apache Tomcat
447 processors:
448 - csv:
449 field: my_field
450 target_fields: field1, field2
451 transform:
452 - field: field1
453 type: uint32
454 - field: field2
455 type: uint32
456 "#;
457
458 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
459 let payload = json_to_map(input_value).unwrap();
460 let result = pipeline
461 .exec_mut(payload)
462 .unwrap()
463 .into_transformed()
464 .unwrap();
465 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
466 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
467 match &result.0.values[2].value_data {
468 Some(ValueData::TimestampNanosecondValue(v)) => {
469 assert_ne!(*v, 0);
470 }
471 _ => panic!("expect null value"),
472 }
473 }
474
475 #[test]
476 fn test_date_pipeline() {
477 let input_value_str = r#"
478 {
479 "my_field": "1,2",
480 "foo": "bar",
481 "test_time": "2014-5-17T04:34:56+00:00"
482 }
483 "#;
484 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
485
486 let pipeline_yaml = r#"---
487description: Pipeline for Apache Tomcat
488
489processors:
490 - timestamp:
491 field: test_time
492
493transform:
494 - field: test_time
495 type: timestamp, ns
496 index: time
497 "#;
498
499 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
500 let schema = pipeline.schemas().unwrap().clone();
501 let result = json_to_map(input_value).unwrap();
502
503 let row = pipeline
504 .exec_mut(result)
505 .unwrap()
506 .into_transformed()
507 .unwrap();
508 let output = Rows {
509 schema,
510 rows: vec![row.0],
511 };
512 let schemas = output.schema;
513
514 assert_eq!(schemas.len(), 1);
515 let schema = schemas[0].clone();
516 assert_eq!("test_time", schema.column_name);
517 assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
518 assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
519
520 let row = output.rows[0].clone();
521 assert_eq!(1, row.values.len());
522 let value_data = row.values[0].clone().value_data;
523 assert_eq!(
524 Some(v1::value::ValueData::TimestampNanosecondValue(
525 1400301296000000000
526 )),
527 value_data
528 );
529 }
530
531 #[test]
532 fn test_dispatcher() {
533 let pipeline_yaml = r#"
534---
535description: Pipeline for Apache Tomcat
536
537processors:
538
539dispatcher:
540 field: typename
541 rules:
542 - value: http
543 table_suffix: http_events
544 - value: database
545 table_suffix: db_events
546 pipeline: database_pipeline
547
548transform:
549 - field: typename
550 type: string
551
552"#;
553 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
554 let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
555 assert_eq!(dispatcher.field, "typename");
556
557 assert_eq!(dispatcher.rules.len(), 2);
558
559 assert_eq!(
560 dispatcher.rules[0],
561 crate::dispatcher::Rule {
562 value: Value::String("http".to_string()),
563 table_suffix: "http_events".to_string(),
564 pipeline: None
565 }
566 );
567
568 assert_eq!(
569 dispatcher.rules[1],
570 crate::dispatcher::Rule {
571 value: Value::String("database".to_string()),
572 table_suffix: "db_events".to_string(),
573 pipeline: Some("database_pipeline".to_string()),
574 }
575 );
576
577 let bad_yaml1 = r#"
578---
579description: Pipeline for Apache Tomcat
580
581processors:
582
583dispatcher:
584 _field: typename
585 rules:
586 - value: http
587 table_suffix: http_events
588 - value: database
589 table_suffix: db_events
590 pipeline: database_pipeline
591
592transform:
593 - field: typename
594 type: string
595
596"#;
597 let bad_yaml2 = r#"
598---
599description: Pipeline for Apache Tomcat
600
601processors:
602
603dispatcher:
604 field: typename
605 rules:
606 - value: http
607 _table_suffix: http_events
608 - value: database
609 _table_suffix: db_events
610 pipeline: database_pipeline
611
612transform:
613 - field: typename
614 type: string
615
616"#;
617 let bad_yaml3 = r#"
618---
619description: Pipeline for Apache Tomcat
620
621processors:
622
623dispatcher:
624 field: typename
625 rules:
626 - _value: http
627 table_suffix: http_events
628 - _value: database
629 table_suffix: db_events
630 pipeline: database_pipeline
631
632transform:
633 - field: typename
634 type: string
635
636"#;
637
638 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
639 assert!(r.is_err());
640 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
641 assert!(r.is_err());
642 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
643 assert!(r.is_err());
644 }
645}