1#![allow(dead_code)]
16pub mod field;
17pub mod processor;
18pub mod transform;
19pub mod value;
20
21use ahash::{HashMap, HashMapExt};
22use api::v1::Row;
23use common_time::timestamp::TimeUnit;
24use processor::{Processor, Processors};
25use snafu::{ensure, OptionExt, ResultExt};
26use transform::Transforms;
27use value::Value;
28use yaml_rust::YamlLoader;
29
30use crate::dispatcher::{Dispatcher, Rule};
31use crate::error::{
32 IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result,
33 TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu,
34};
35use crate::etl::processor::ProcessorKind;
36use crate::tablesuffix::TableSuffixTemplate;
37use crate::GreptimeTransformer;
38
39const DESCRIPTION: &str = "description";
40const PROCESSORS: &str = "processors";
41const TRANSFORM: &str = "transform";
42const TRANSFORMS: &str = "transforms";
43const DISPATCHER: &str = "dispatcher";
44const TABLESUFFIX: &str = "table_suffix";
45
46pub type PipelineMap = std::collections::BTreeMap<String, Value>;
47
48pub enum Content<'a> {
49 Json(&'a str),
50 Yaml(&'a str),
51}
52
53pub fn parse(input: &Content) -> Result<Pipeline> {
54 match input {
55 Content::Yaml(str) => {
56 let docs = YamlLoader::load_from_str(str).context(YamlLoadSnafu)?;
57
58 ensure!(docs.len() == 1, YamlParseSnafu);
59
60 let doc = &docs[0];
61
62 let description = doc[DESCRIPTION].as_str().map(|s| s.to_string());
63
64 let processors = if let Some(v) = doc[PROCESSORS].as_vec() {
65 v.try_into()?
66 } else {
67 Processors::default()
68 };
69
70 let transformers = if let Some(v) = doc[TRANSFORMS].as_vec().or(doc[TRANSFORM].as_vec())
71 {
72 v.try_into()?
73 } else {
74 Transforms::default()
75 };
76
77 let transformer = if transformers.is_empty() {
78 let cnt = processors
81 .iter()
82 .filter(|p| {
83 matches!(
84 p,
85 ProcessorKind::Date(_)
86 | ProcessorKind::Timestamp(_)
87 | ProcessorKind::Epoch(_)
88 )
89 })
90 .count();
91 ensure!(cnt > 0, TransformNoTimestampProcessorSnafu);
92 None
93 } else {
94 Some(GreptimeTransformer::new(transformers)?)
95 };
96
97 let dispatcher = if !doc[DISPATCHER].is_badvalue() {
98 Some(Dispatcher::try_from(&doc[DISPATCHER])?)
99 } else {
100 None
101 };
102
103 let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
104 Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
105 } else {
106 None
107 };
108
109 Ok(Pipeline {
110 description,
111 processors,
112 transformer,
113 dispatcher,
114 tablesuffix,
115 })
116 }
117 Content::Json(_) => unimplemented!(),
118 }
119}
120
121#[derive(Debug)]
122pub struct Pipeline {
123 description: Option<String>,
124 processors: processor::Processors,
125 dispatcher: Option<Dispatcher>,
126 transformer: Option<GreptimeTransformer>,
127 tablesuffix: Option<TableSuffixTemplate>,
128}
129
130#[derive(Debug, Hash, PartialEq, Eq, Clone, PartialOrd, Ord)]
132pub struct DispatchedTo {
133 pub table_suffix: String,
134 pub pipeline: Option<String>,
135}
136
137impl From<&Rule> for DispatchedTo {
138 fn from(value: &Rule) -> Self {
139 DispatchedTo {
140 table_suffix: value.table_suffix.clone(),
141 pipeline: value.pipeline.clone(),
142 }
143 }
144}
145
146impl DispatchedTo {
147 pub fn dispatched_to_table_name(&self, original: &str) -> String {
149 format!("{}_{}", &original, self.table_suffix)
150 }
151}
152
153#[derive(Debug)]
155pub enum PipelineExecOutput {
156 Transformed((Row, Option<String>)),
157 AutoTransform(Option<String>, HashMap<String, TimeUnit>),
159 DispatchedTo(DispatchedTo),
160}
161
162impl PipelineExecOutput {
163 pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
164 if let Self::Transformed(o) = self {
165 Some(o)
166 } else {
167 None
168 }
169 }
170
171 pub fn into_dispatched(self) -> Option<DispatchedTo> {
172 if let Self::DispatchedTo(d) = self {
173 Some(d)
174 } else {
175 None
176 }
177 }
178}
179
180pub fn json_to_map(val: serde_json::Value) -> Result<PipelineMap> {
181 match val {
182 serde_json::Value::Object(map) => {
183 let mut intermediate_state = PipelineMap::new();
184 for (k, v) in map {
185 intermediate_state.insert(k, Value::try_from(v)?);
186 }
187 Ok(intermediate_state)
188 }
189 _ => PrepareValueMustBeObjectSnafu.fail(),
190 }
191}
192
193pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>> {
194 val.into_iter().map(json_to_map).collect()
195}
196
197pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<PipelineMap> {
198 match val {
199 simd_json::OwnedValue::Object(map) => {
200 let mut intermediate_state = PipelineMap::new();
201 for (k, v) in map.into_iter() {
202 intermediate_state.insert(k, Value::try_from(v)?);
203 }
204 Ok(intermediate_state)
205 }
206 _ => PrepareValueMustBeObjectSnafu.fail(),
207 }
208}
209
210pub fn simd_json_array_to_map(val: Vec<simd_json::OwnedValue>) -> Result<Vec<PipelineMap>> {
211 val.into_iter().map(simd_json_to_map).collect()
212}
213
214impl Pipeline {
215 pub fn exec_mut(&self, val: &mut PipelineMap) -> Result<PipelineExecOutput> {
216 for processor in self.processors.iter() {
218 processor.exec_mut(val)?;
219 }
220
221 if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) {
223 return Ok(PipelineExecOutput::DispatchedTo(rule.into()));
224 }
225
226 if let Some(transformer) = self.transformer() {
227 let row = transformer.transform_mut(val)?;
228 let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
229 Ok(PipelineExecOutput::Transformed((row, table_suffix)))
230 } else {
231 let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
232 let mut ts_unit_map = HashMap::with_capacity(4);
233 for (k, v) in val {
235 if let Value::Timestamp(ts) = v {
236 if !ts_unit_map.contains_key(k) {
237 ts_unit_map.insert(k.clone(), ts.get_unit());
238 }
239 }
240 }
241 Ok(PipelineExecOutput::AutoTransform(table_suffix, ts_unit_map))
242 }
243 }
244
245 pub fn processors(&self) -> &processor::Processors {
246 &self.processors
247 }
248
249 pub fn transformer(&self) -> Option<&GreptimeTransformer> {
250 self.transformer.as_ref()
251 }
252
253 pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
254 self.transformer.as_ref().map(|t| t.schemas())
255 }
256}
257
258pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {
259 intermediate_keys
260 .iter()
261 .position(|k| k == key)
262 .context(IntermediateKeyIndexSnafu { kind, key })
263}
264
265#[cfg(test)]
266mod tests {
267 use api::v1::Rows;
268 use greptime_proto::v1::value::ValueData;
269 use greptime_proto::v1::{self, ColumnDataType, SemanticType};
270
271 use super::*;
272
273 #[test]
274 fn test_pipeline_prepare() {
275 let input_value_str = r#"
276 {
277 "my_field": "1,2",
278 "foo": "bar"
279 }
280 "#;
281 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
282
283 let pipeline_yaml = r#"description: 'Pipeline for Apache Tomcat'
284processors:
285 - csv:
286 field: my_field
287 target_fields: field1, field2
288transform:
289 - field: field1
290 type: uint32
291 - field: field2
292 type: uint32
293 "#;
294 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
295 let mut payload = json_to_map(input_value).unwrap();
296 let result = pipeline
297 .exec_mut(&mut payload)
298 .unwrap()
299 .into_transformed()
300 .unwrap();
301
302 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
303 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
304 match &result.0.values[2].value_data {
305 Some(ValueData::TimestampNanosecondValue(v)) => {
306 assert_ne!(*v, 0);
307 }
308 _ => panic!("expect null value"),
309 }
310 }
311
312 #[test]
313 fn test_dissect_pipeline() {
314 let message = r#"129.37.245.88 - meln1ks [01/Aug/2024:14:22:47 +0800] "PATCH /observability/metrics/production HTTP/1.0" 501 33085"#.to_string();
315 let pipeline_str = r#"processors:
316 - dissect:
317 fields:
318 - message
319 patterns:
320 - "%{ip} %{?ignored} %{username} [%{ts}] \"%{method} %{path} %{proto}\" %{status} %{bytes}"
321 - timestamp:
322 fields:
323 - ts
324 formats:
325 - "%d/%b/%Y:%H:%M:%S %z"
326
327transform:
328 - fields:
329 - ip
330 - username
331 - method
332 - path
333 - proto
334 type: string
335 - fields:
336 - status
337 type: uint16
338 - fields:
339 - bytes
340 type: uint32
341 - field: ts
342 type: timestamp, ns
343 index: time"#;
344 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap();
345 let mut payload = PipelineMap::new();
346 payload.insert("message".to_string(), Value::String(message));
347 let result = pipeline
348 .exec_mut(&mut payload)
349 .unwrap()
350 .into_transformed()
351 .unwrap();
352 let sechema = pipeline.schemas().unwrap();
353
354 assert_eq!(sechema.len(), result.0.values.len());
355 let test = vec![
356 (
357 ColumnDataType::String as i32,
358 Some(ValueData::StringValue("129.37.245.88".into())),
359 ),
360 (
361 ColumnDataType::String as i32,
362 Some(ValueData::StringValue("meln1ks".into())),
363 ),
364 (
365 ColumnDataType::String as i32,
366 Some(ValueData::StringValue("PATCH".into())),
367 ),
368 (
369 ColumnDataType::String as i32,
370 Some(ValueData::StringValue(
371 "/observability/metrics/production".into(),
372 )),
373 ),
374 (
375 ColumnDataType::String as i32,
376 Some(ValueData::StringValue("HTTP/1.0".into())),
377 ),
378 (
379 ColumnDataType::Uint16 as i32,
380 Some(ValueData::U16Value(501)),
381 ),
382 (
383 ColumnDataType::Uint32 as i32,
384 Some(ValueData::U32Value(33085)),
385 ),
386 (
387 ColumnDataType::TimestampNanosecond as i32,
388 Some(ValueData::TimestampNanosecondValue(1722493367000000000)),
389 ),
390 ];
391 for i in 0..sechema.len() {
392 let schema = &sechema[i];
393 let value = &result.0.values[i];
394 assert_eq!(schema.datatype, test[i].0);
395 assert_eq!(value.value_data, test[i].1);
396 }
397 }
398
399 #[test]
400 fn test_csv_pipeline() {
401 let input_value_str = r#"
402 {
403 "my_field": "1,2",
404 "foo": "bar"
405 }
406 "#;
407 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
408
409 let pipeline_yaml = r#"
410 description: Pipeline for Apache Tomcat
411 processors:
412 - csv:
413 field: my_field
414 target_fields: field1, field2
415 transform:
416 - field: field1
417 type: uint32
418 - field: field2
419 type: uint32
420 "#;
421
422 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
423 let mut payload = json_to_map(input_value).unwrap();
424 let result = pipeline
425 .exec_mut(&mut payload)
426 .unwrap()
427 .into_transformed()
428 .unwrap();
429 assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
430 assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
431 match &result.0.values[2].value_data {
432 Some(ValueData::TimestampNanosecondValue(v)) => {
433 assert_ne!(*v, 0);
434 }
435 _ => panic!("expect null value"),
436 }
437 }
438
439 #[test]
440 fn test_date_pipeline() {
441 let input_value_str = r#"
442 {
443 "my_field": "1,2",
444 "foo": "bar",
445 "test_time": "2014-5-17T04:34:56+00:00"
446 }
447 "#;
448 let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
449
450 let pipeline_yaml = r#"---
451description: Pipeline for Apache Tomcat
452
453processors:
454 - timestamp:
455 field: test_time
456
457transform:
458 - field: test_time
459 type: timestamp, ns
460 index: time
461 "#;
462
463 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
464 let schema = pipeline.schemas().unwrap().clone();
465 let mut result = json_to_map(input_value).unwrap();
466
467 let row = pipeline
468 .exec_mut(&mut result)
469 .unwrap()
470 .into_transformed()
471 .unwrap();
472 let output = Rows {
473 schema,
474 rows: vec![row.0],
475 };
476 let schemas = output.schema;
477
478 assert_eq!(schemas.len(), 1);
479 let schema = schemas[0].clone();
480 assert_eq!("test_time", schema.column_name);
481 assert_eq!(ColumnDataType::TimestampNanosecond as i32, schema.datatype);
482 assert_eq!(SemanticType::Timestamp as i32, schema.semantic_type);
483
484 let row = output.rows[0].clone();
485 assert_eq!(1, row.values.len());
486 let value_data = row.values[0].clone().value_data;
487 assert_eq!(
488 Some(v1::value::ValueData::TimestampNanosecondValue(
489 1400301296000000000
490 )),
491 value_data
492 );
493 }
494
495 #[test]
496 fn test_dispatcher() {
497 let pipeline_yaml = r#"
498---
499description: Pipeline for Apache Tomcat
500
501processors:
502
503dispatcher:
504 field: typename
505 rules:
506 - value: http
507 table_suffix: http_events
508 - value: database
509 table_suffix: db_events
510 pipeline: database_pipeline
511
512transform:
513 - field: typename
514 type: string
515
516"#;
517 let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
518 let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
519 assert_eq!(dispatcher.field, "typename");
520
521 assert_eq!(dispatcher.rules.len(), 2);
522
523 assert_eq!(
524 dispatcher.rules[0],
525 crate::dispatcher::Rule {
526 value: Value::String("http".to_string()),
527 table_suffix: "http_events".to_string(),
528 pipeline: None
529 }
530 );
531
532 assert_eq!(
533 dispatcher.rules[1],
534 crate::dispatcher::Rule {
535 value: Value::String("database".to_string()),
536 table_suffix: "db_events".to_string(),
537 pipeline: Some("database_pipeline".to_string()),
538 }
539 );
540
541 let bad_yaml1 = r#"
542---
543description: Pipeline for Apache Tomcat
544
545processors:
546
547dispatcher:
548 _field: typename
549 rules:
550 - value: http
551 table_suffix: http_events
552 - value: database
553 table_suffix: db_events
554 pipeline: database_pipeline
555
556transform:
557 - field: typename
558 type: string
559
560"#;
561 let bad_yaml2 = r#"
562---
563description: Pipeline for Apache Tomcat
564
565processors:
566
567dispatcher:
568 field: typename
569 rules:
570 - value: http
571 _table_suffix: http_events
572 - value: database
573 _table_suffix: db_events
574 pipeline: database_pipeline
575
576transform:
577 - field: typename
578 type: string
579
580"#;
581 let bad_yaml3 = r#"
582---
583description: Pipeline for Apache Tomcat
584
585processors:
586
587dispatcher:
588 field: typename
589 rules:
590 - _value: http
591 table_suffix: http_events
592 - _value: database
593 table_suffix: db_events
594 pipeline: database_pipeline
595
596transform:
597 - field: typename
598 type: string
599
600"#;
601
602 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml1));
603 assert!(r.is_err());
604 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml2));
605 assert!(r.is_err());
606 let r: Result<Pipeline> = parse(&Content::Yaml(bad_yaml3));
607 assert!(r.is_err());
608 }
609}