pipeline/etl/processor/
dissect.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use itertools::Itertools;
19use snafu::OptionExt;
20use vrl::prelude::Bytes;
21use vrl::value::{KeyString, Value as VrlValue};
22
23use crate::error::{
24    DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu,
25    DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu,
26    DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu,
27    DissectOrderOnlyAppendSnafu, DissectSplitExceedsInputSnafu, DissectSplitNotMatchInputSnafu,
28    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
29    ValueMustBeMapSnafu,
30};
31use crate::etl::field::Fields;
32use crate::etl::processor::{
33    yaml_bool, yaml_new_field, yaml_new_fields, yaml_parse_string, yaml_parse_strings, yaml_string,
34    Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
35};
36
37pub(crate) const PROCESSOR_DISSECT: &str = "dissect";
38
39const APPEND_SEPARATOR_NAME: &str = "append_separator";
40
41#[derive(Debug, PartialEq)]
42enum StartModifier {
43    Append(Option<u32>),
44    NamedSkip,
45    MapKey,
46    MapVal,
47}
48
49impl std::fmt::Display for StartModifier {
50    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
51        match self {
52            StartModifier::Append(o) => match o {
53                Some(v) => write!(f, "+/{v}"),
54                None => write!(f, "+"),
55            },
56            StartModifier::NamedSkip => write!(f, "?"),
57            StartModifier::MapKey => write!(f, "*"),
58            StartModifier::MapVal => write!(f, "&"),
59        }
60    }
61}
62
63#[derive(Debug, PartialEq)]
64struct EndModifier;
65
66impl std::fmt::Display for EndModifier {
67    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
68        write!(f, "->",)
69    }
70}
71
72impl Name {
73    fn is_name_empty(&self) -> bool {
74        self.name.is_empty()
75    }
76
77    fn is_empty(&self) -> bool {
78        self.name.is_empty() && self.start_modifier.is_none() && self.end_modifier.is_none()
79    }
80
81    fn try_start_modifier(&mut self, modifier: StartModifier) -> Result<()> {
82        match &self.start_modifier {
83            Some(m) => DissectModifierAlreadySetSnafu {
84                m: m.to_string(),
85                modifier: modifier.to_string(),
86            }
87            .fail(),
88            None => {
89                self.start_modifier = Some(modifier);
90                Ok(())
91            }
92        }
93    }
94
95    fn try_append_order(&mut self, order: u32) -> Result<()> {
96        match &mut self.start_modifier {
97            Some(StartModifier::Append(o)) => match o {
98                Some(n) => DissectAppendOrderAlreadySetSnafu {
99                    n: n.to_string(),
100                    order,
101                }
102                .fail(),
103                None => {
104                    *o = Some(order);
105                    Ok(())
106                }
107            },
108            Some(m) => DissectOrderOnlyAppendSnafu { m: m.to_string() }.fail(),
109            None => DissectOrderOnlyAppendModifierSnafu.fail(),
110        }
111    }
112
113    fn try_end_modifier(&mut self) -> Result<()> {
114        match &self.end_modifier {
115            Some(m) => DissectEndModifierAlreadySetSnafu { m: m.to_string() }.fail(),
116            None => {
117                self.end_modifier = Some(EndModifier);
118                Ok(())
119            }
120        }
121    }
122
123    fn is_append_modifier_set(&self) -> bool {
124        matches!(self.start_modifier, Some(StartModifier::Append(_)))
125    }
126
127    fn is_start_modifier_set(&self) -> bool {
128        self.start_modifier.is_some()
129    }
130
131    fn is_end_modifier_set(&self) -> bool {
132        self.end_modifier.is_some()
133    }
134}
135
136#[derive(Debug, PartialEq, Default)]
137struct Name {
138    name: String,
139    start_modifier: Option<StartModifier>,
140    end_modifier: Option<EndModifier>,
141}
142
143impl std::fmt::Display for Name {
144    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
145        write!(f, "{}", self.name)
146    }
147}
148
149impl From<&str> for Name {
150    fn from(value: &str) -> Self {
151        Name {
152            name: value.to_string(),
153            start_modifier: None,
154            end_modifier: None,
155        }
156    }
157}
158
159#[derive(Debug, PartialEq)]
160enum Part {
161    Split(String),
162    Name(Name),
163}
164
165impl Part {
166    fn is_empty(&self) -> bool {
167        match self {
168            Part::Split(v) => v.is_empty(),
169            Part::Name(v) => v.is_empty(),
170        }
171    }
172
173    fn empty_split() -> Self {
174        Part::Split(String::new())
175    }
176
177    fn empty_name() -> Self {
178        Part::Name(Name::default())
179    }
180
181    fn push(&mut self, ch: char) {
182        match self {
183            Part::Split(v) => v.push(ch),
184            Part::Name(v) => v.name.push(ch),
185        }
186    }
187}
188
189#[derive(Debug, Default)]
190struct Pattern {
191    origin: String,
192    parts: Vec<Part>,
193}
194
195impl Deref for Pattern {
196    type Target = Vec<Part>;
197
198    fn deref(&self) -> &Self::Target {
199        &self.parts
200    }
201}
202
203impl std::str::FromStr for Pattern {
204    type Err = Error;
205
206    fn from_str(s: &str) -> Result<Self> {
207        let mut parts = vec![];
208        let mut cursor = Part::empty_split();
209
210        let origin = s.to_string();
211        let chars: Vec<char> = origin.chars().collect();
212
213        let mut pos = 0;
214        while pos < chars.len() {
215            let ch = chars[pos];
216            match (ch, &mut cursor) {
217                // if cursor is Split part, and found %{, then ready to start a Name part
218                ('%', Part::Split(_)) if matches!(chars.get(pos + 1), Some('{')) => {
219                    if !cursor.is_empty() {
220                        parts.push(cursor);
221                    }
222
223                    cursor = Part::empty_name();
224                    pos += 1; // skip '{'
225                }
226                // if cursor is Split part, and not found % or {, then continue the Split part
227                (_, Part::Split(_)) => {
228                    cursor.push(ch);
229                }
230                // if cursor is Name part, and found }, then end the Name part, start the next Split part
231                ('}', Part::Name(_)) => {
232                    parts.push(cursor);
233                    cursor = Part::empty_split();
234                }
235                ('+', Part::Name(name)) if !name.is_start_modifier_set() => {
236                    name.try_start_modifier(StartModifier::Append(None))?;
237                }
238                ('/', Part::Name(name)) if name.is_append_modifier_set() => {
239                    let mut order = 0;
240                    let mut j = pos + 1;
241                    while j < chars.len() {
242                        let digit = chars[j];
243                        if digit.is_ascii_digit() {
244                            order = order * 10 + digit.to_digit(10).unwrap();
245                            j += 1;
246                        } else {
247                            break;
248                        }
249                    }
250
251                    if j == pos + 1 {
252                        return DissectInvalidPatternSnafu {
253                            s,
254                            detail: "Digit order must be set after '/'",
255                        }
256                        .fail();
257                    }
258
259                    name.try_append_order(order)?;
260                    pos = j - 1; // this will change the position to the last digit of the order
261                }
262                ('?', Part::Name(name)) if !name.is_start_modifier_set() => {
263                    name.try_start_modifier(StartModifier::NamedSkip)?;
264                }
265                ('*', Part::Name(name)) if !name.is_start_modifier_set() => {
266                    name.try_start_modifier(StartModifier::MapKey)?;
267                }
268                ('&', Part::Name(name)) if !name.is_start_modifier_set() => {
269                    name.try_start_modifier(StartModifier::MapVal)?;
270                }
271                ('-', Part::Name(name)) if !name.is_end_modifier_set() => {
272                    if let Some('>') = chars.get(pos + 1) {
273                    } else {
274                        return DissectInvalidPatternSnafu {
275                            s,
276                            detail: "Expected '->' but only '-'",
277                        }
278                        .fail();
279                    }
280
281                    if let Some('}') = chars.get(pos + 2) {
282                    } else {
283                        return DissectInvalidPatternSnafu {
284                            s,
285                            detail: "Expected '}' after '->'",
286                        }
287                        .fail();
288                    }
289
290                    name.try_end_modifier()?;
291                    pos += 1; // only skip '>', the next loop will skip '}'
292                }
293                (_, Part::Name(name)) if !is_valid_char(ch) => {
294                    let tail: String = if name.is_name_empty() {
295                        format!("Invalid '{ch}'")
296                    } else {
297                        format!("Invalid '{ch}' in '{name}'")
298                    };
299                    return DissectInvalidPatternSnafu { s, detail: tail }.fail();
300                }
301                (_, Part::Name(_)) => {
302                    cursor.push(ch);
303                }
304            }
305
306            pos += 1;
307        }
308
309        match cursor {
310            Part::Split(ref split) if !split.is_empty() => parts.push(cursor),
311            Part::Name(name) if !name.is_empty() => {
312                return DissectInvalidPatternSnafu {
313                    s,
314                    detail: format!("'{name}' is not closed"),
315                }
316                .fail();
317            }
318            _ => {}
319        }
320
321        let pattern = Self { parts, origin };
322        pattern.check()?;
323        Ok(pattern)
324    }
325}
326
327impl Pattern {
328    fn check(&self) -> Result<()> {
329        if self.is_empty() {
330            return DissectEmptyPatternSnafu.fail();
331        }
332
333        let mut map_keys = HashSet::new();
334        let mut map_vals = HashSet::new();
335
336        for i in 0..self.len() {
337            let this_part = &self[i];
338            let next_part = self.get(i + 1);
339            match (this_part, next_part) {
340                (Part::Split(split), _) if split.is_empty() => {
341                    return DissectInvalidPatternSnafu {
342                        s: &self.origin,
343                        detail: "Empty split is not allowed",
344                    }
345                    .fail();
346                }
347                (Part::Name(name1), Some(Part::Name(name2))) => {
348                    return DissectInvalidPatternSnafu {
349                        s: &self.origin,
350                        detail: format!("consecutive names are not allowed: '{name1}' '{name2}'",),
351                    }
352                    .fail();
353                }
354                (Part::Name(name), _) if name.is_name_empty() => {
355                    if let Some(ref m) = name.start_modifier {
356                        return DissectInvalidPatternSnafu {
357                            s: &self.origin,
358                            detail: format!("only '{m}' modifier is invalid"),
359                        }
360                        .fail();
361                    }
362                }
363                (Part::Name(name), _) => match name.start_modifier {
364                    Some(StartModifier::MapKey) => {
365                        if map_keys.contains(&name.name) {
366                            return DissectInvalidPatternSnafu {
367                                s: &self.origin,
368                                detail: format!("Duplicate map key: '{}'", name.name),
369                            }
370                            .fail();
371                        } else {
372                            map_keys.insert(&name.name);
373                        }
374                    }
375                    Some(StartModifier::MapVal) => {
376                        if map_vals.contains(&name.name) {
377                            return DissectInvalidPatternSnafu {
378                                s: &self.origin,
379                                detail: format!("Duplicate map val: '{}'", name.name),
380                            }
381                            .fail();
382                        } else {
383                            map_vals.insert(&name.name);
384                        }
385                    }
386                    _ => {}
387                },
388                _ => {}
389            }
390        }
391
392        if map_keys != map_vals {
393            return DissectInvalidPatternSnafu {
394                s: &self.origin,
395                detail: format!(
396                    "key and value not matched: '{}'",
397                    map_keys
398                        .symmetric_difference(&map_vals)
399                        .map(|s| s.as_str())
400                        .collect::<Vec<&str>>()
401                        .join(",")
402                ),
403            }
404            .fail();
405        }
406
407        Ok(())
408    }
409}
410
411#[derive(Debug, Default)]
412pub struct DissectProcessor {
413    fields: Fields,
414    patterns: Vec<Pattern>,
415    ignore_missing: bool,
416
417    // The character(s) that separate the appended fields. Default is an empty string.
418    append_separator: Option<String>,
419}
420
421impl DissectProcessor {
422    fn process_name_value<'a>(
423        name: &'a Name,
424        value: String,
425        appends: &mut HashMap<&'a String, Vec<(String, u32)>>,
426        map: &mut Vec<(&'a String, VrlValue)>,
427    ) {
428        match name.start_modifier {
429            Some(StartModifier::NamedSkip) => {
430                // do nothing, ignore this match
431            }
432            Some(StartModifier::Append(order)) => {
433                appends
434                    .entry(&name.name)
435                    .or_default()
436                    .push((value, order.unwrap_or_default()));
437            }
438            Some(_) => {
439                // do nothing, ignore MapKey and MapVal
440                // because transform can know the key name
441            }
442            None => {
443                map.push((&name.name, VrlValue::Bytes(Bytes::from(value))));
444            }
445        }
446    }
447
448    fn process_pattern(
449        &self,
450        chs: &[char],
451        pattern: &Pattern,
452    ) -> Result<Vec<(KeyString, VrlValue)>> {
453        let mut map = Vec::new();
454        let mut pos = 0;
455
456        let mut appends: HashMap<&String, Vec<(String, u32)>> = HashMap::new();
457
458        for i in 0..pattern.len() {
459            let this_part = &pattern[i];
460            let next_part = pattern.get(i + 1);
461            match (this_part, next_part) {
462                // if Split part, and exactly matches, then move pos split.len() forward
463                (Part::Split(split), _) => {
464                    let split_chs = split.chars().collect::<Vec<char>>();
465                    let split_len = split_chs.len();
466                    if pos + split_len > chs.len() {
467                        return DissectSplitExceedsInputSnafu { split }.fail();
468                    }
469
470                    if &chs[pos..pos + split_len] != split_chs.as_slice() {
471                        return DissectSplitNotMatchInputSnafu {
472                            split,
473                            input: chs[pos..pos + split_len].iter().collect::<String>(),
474                        }
475                        .fail();
476                    }
477
478                    pos += split_len;
479                }
480
481                (Part::Name(name1), Some(Part::Name(name2))) => {
482                    return DissectConsecutiveNamesSnafu {
483                        name1: name1.to_string(),
484                        name2: name2.to_string(),
485                    }
486                    .fail();
487                }
488
489                // if Name part is the last part, then the rest of the input is the value
490                (Part::Name(name), None) => {
491                    let value = chs[pos..].iter().collect::<String>();
492                    Self::process_name_value(name, value, &mut appends, &mut map);
493                }
494
495                // if Name part, and next part is Split, then find the matched value of the name
496                (Part::Name(name), Some(Part::Split(split))) => {
497                    let stop = split.chars().next().context(DissectInvalidPatternSnafu {
498                        s: &pattern.origin,
499                        detail: "Empty split is not allowed",
500                    })?; // this won't happen
501                    let mut end = pos;
502                    while end < chs.len() && chs[end] != stop {
503                        end += 1;
504                    }
505
506                    if !name.is_name_empty() {
507                        let value = chs[pos..end].iter().collect::<String>();
508                        Self::process_name_value(name, value, &mut appends, &mut map);
509                    }
510
511                    if name.is_end_modifier_set() {
512                        while end < chs.len() && chs[end] == stop {
513                            end += 1;
514                        }
515                        end -= 1; // leave the last stop character to match the next split
516                    }
517
518                    pos = end;
519                }
520            }
521        }
522
523        if !appends.is_empty() {
524            let sep = match self.append_separator {
525                Some(ref sep) => sep,
526                None => " ",
527            };
528
529            for (name, mut values) in appends {
530                values.sort_by(|a, b| a.1.cmp(&b.1));
531                let value = values.into_iter().map(|(a, _)| a).join(sep);
532                map.push((name, VrlValue::Bytes(Bytes::from(value))));
533            }
534        }
535
536        Ok(map
537            .into_iter()
538            .map(|(k, v)| (KeyString::from(k.clone()), v))
539            .collect())
540    }
541
542    fn process(&self, val: &str) -> Result<Vec<(KeyString, VrlValue)>> {
543        let chs = val.chars().collect::<Vec<char>>();
544
545        for pattern in &self.patterns {
546            if let Ok(map) = self.process_pattern(&chs, pattern) {
547                return Ok(map);
548            }
549        }
550        DissectNoMatchingPatternSnafu.fail()
551    }
552}
553
554impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor {
555    type Error = Error;
556
557    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
558        let mut fields = Fields::default();
559        let mut patterns = vec![];
560        let mut ignore_missing = false;
561        let mut append_separator = None;
562
563        for (k, v) in value.iter() {
564            let key = k
565                .as_str()
566                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
567
568            match key {
569                FIELD_NAME => {
570                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
571                }
572                FIELDS_NAME => {
573                    fields = yaml_new_fields(v, FIELDS_NAME)?;
574                }
575                PATTERN_NAME => {
576                    let pattern: Pattern = yaml_parse_string(v, PATTERN_NAME)?;
577                    patterns = vec![pattern];
578                }
579                PATTERNS_NAME => {
580                    patterns = yaml_parse_strings(v, PATTERNS_NAME)?;
581                }
582                IGNORE_MISSING_NAME => {
583                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
584                }
585                APPEND_SEPARATOR_NAME => {
586                    append_separator = Some(yaml_string(v, APPEND_SEPARATOR_NAME)?);
587                }
588                _ => {}
589            }
590        }
591        // let output_keys = Self::build_output_keys(&patterns);
592        let builder = DissectProcessor {
593            fields,
594            patterns,
595            ignore_missing,
596            append_separator,
597        };
598
599        Ok(builder)
600    }
601}
602
603impl Processor for DissectProcessor {
604    fn kind(&self) -> &str {
605        PROCESSOR_DISSECT
606    }
607
608    fn ignore_missing(&self) -> bool {
609        self.ignore_missing
610    }
611
612    fn exec_mut(&self, mut val: VrlValue) -> Result<VrlValue> {
613        for field in self.fields.iter() {
614            let index = field.input_field();
615            let val = val.as_object_mut().context(ValueMustBeMapSnafu)?;
616            match val.get(index) {
617                Some(VrlValue::Bytes(val_str)) => {
618                    let r = self.process(String::from_utf8_lossy(val_str).as_ref())?;
619                    for (k, v) in r {
620                        val.insert(k, v);
621                    }
622                }
623                Some(VrlValue::Null) | None => {
624                    if !self.ignore_missing {
625                        return ProcessorMissingFieldSnafu {
626                            processor: self.kind(),
627                            field: field.input_field(),
628                        }
629                        .fail();
630                    }
631                }
632                Some(v) => {
633                    return ProcessorExpectStringSnafu {
634                        processor: self.kind(),
635                        v: v.clone(),
636                    }
637                    .fail();
638                }
639            }
640        }
641        Ok(val)
642    }
643}
644
645fn is_valid_char(ch: char) -> bool {
646    ch.is_alphanumeric() || ch == '_'
647}
648
649#[cfg(test)]
650mod tests {
651    use ahash::HashMap;
652    use vrl::prelude::Bytes;
653    use vrl::value::{KeyString, Value as VrlValue};
654
655    use super::{DissectProcessor, EndModifier, Name, Part, StartModifier};
656    use crate::etl::processor::dissect::Pattern;
657
658    fn assert(pattern_str: &str, input: &str, expected: HashMap<KeyString, VrlValue>) {
659        let chs = input.chars().collect::<Vec<char>>();
660        let patterns: Vec<Pattern> = vec![pattern_str.parse().unwrap()];
661
662        let processor = DissectProcessor::default();
663        let result: HashMap<KeyString, VrlValue> = processor
664            .process_pattern(&chs, &patterns[0])
665            .unwrap()
666            .into_iter()
667            .collect();
668
669        assert_eq!(result, expected, "pattern: {}", pattern_str);
670    }
671
672    #[test]
673    fn test_dissect_simple_pattern() {
674        let cases = [(
675            "%{clientip} %{ident} %{auth} [%{timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}",
676            vec![
677                Part::Name("clientip".into()),
678                Part::Split(" ".into()),
679                Part::Name("ident".into()),
680                Part::Split(" ".into()),
681                Part::Name("auth".into()),
682                Part::Split(" [".into()),
683                Part::Name("timestamp".into()),
684                Part::Split("] \"".into()),
685                Part::Name("verb".into()),
686                Part::Split(" ".into()),
687                Part::Name("request".into()),
688                Part::Split(" HTTP/".into()),
689                Part::Name("httpversion".into()),
690                Part::Split("\" ".into()),
691                Part::Name("status".into()),
692                Part::Split(" ".into()),
693                Part::Name("size".into()),
694            ],
695        )];
696
697        for (pattern, expected) in cases.into_iter() {
698            let p: Pattern = pattern.parse().unwrap();
699            assert_eq!(p.parts, expected);
700        }
701    }
702
703    #[test]
704    fn test_dissect_modifier_pattern() {
705        let cases = [
706            (
707                "%{} %{}",
708                vec![
709                    Part::Name(Name {
710                        name: "".into(),
711                        start_modifier: None,
712                        end_modifier: None,
713                    }),
714                    Part::Split(" ".into()),
715                    Part::Name(Name {
716                        name: "".into(),
717                        start_modifier: None,
718                        end_modifier: None,
719                    }),
720                ],
721            ),
722            (
723                "%{ts->} %{level}",
724                vec![
725                    Part::Name(Name {
726                        name: "ts".into(),
727                        start_modifier: None,
728                        end_modifier: Some(EndModifier),
729                    }),
730                    Part::Split(" ".into()),
731                    Part::Name("level".into()),
732                ],
733            ),
734            (
735                "[%{ts}]%{->}[%{level}]",
736                vec![
737                    Part::Split("[".into()),
738                    Part::Name(Name {
739                        name: "ts".into(),
740                        start_modifier: None,
741                        end_modifier: None,
742                    }),
743                    Part::Split("]".into()),
744                    Part::Name(Name {
745                        name: "".into(),
746                        start_modifier: None,
747                        end_modifier: Some(EndModifier),
748                    }),
749                    Part::Split("[".into()),
750                    Part::Name(Name {
751                        name: "level".into(),
752                        start_modifier: None,
753                        end_modifier: None,
754                    }),
755                    Part::Split("]".into()),
756                ],
757            ),
758            (
759                "%{+name} %{+name} %{+name} %{+name}",
760                vec![
761                    Part::Name(Name {
762                        name: "name".into(),
763                        start_modifier: Some(StartModifier::Append(None)),
764                        end_modifier: None,
765                    }),
766                    Part::Split(" ".into()),
767                    Part::Name(Name {
768                        name: "name".into(),
769                        start_modifier: Some(StartModifier::Append(None)),
770                        end_modifier: None,
771                    }),
772                    Part::Split(" ".into()),
773                    Part::Name(Name {
774                        name: "name".into(),
775                        start_modifier: Some(StartModifier::Append(None)),
776                        end_modifier: None,
777                    }),
778                    Part::Split(" ".into()),
779                    Part::Name(Name {
780                        name: "name".into(),
781                        start_modifier: Some(StartModifier::Append(None)),
782                        end_modifier: None,
783                    }),
784                ],
785            ),
786            (
787                "%{+name/2} %{+name/4} %{+name/3} %{+name/1}",
788                vec![
789                    Part::Name(Name {
790                        name: "name".into(),
791                        start_modifier: Some(StartModifier::Append(Some(2))),
792                        end_modifier: None,
793                    }),
794                    Part::Split(" ".into()),
795                    Part::Name(Name {
796                        name: "name".into(),
797                        start_modifier: Some(StartModifier::Append(Some(4))),
798                        end_modifier: None,
799                    }),
800                    Part::Split(" ".into()),
801                    Part::Name(Name {
802                        name: "name".into(),
803                        start_modifier: Some(StartModifier::Append(Some(3))),
804                        end_modifier: None,
805                    }),
806                    Part::Split(" ".into()),
807                    Part::Name(Name {
808                        name: "name".into(),
809                        start_modifier: Some(StartModifier::Append(Some(1))),
810                        end_modifier: None,
811                    }),
812                ],
813            ),
814            (
815                "%{clientip} %{?ident} %{?auth} [%{timestamp}]",
816                vec![
817                    Part::Name(Name {
818                        name: "clientip".into(),
819                        start_modifier: None,
820                        end_modifier: None,
821                    }),
822                    Part::Split(" ".into()),
823                    Part::Name(Name {
824                        name: "ident".into(),
825                        start_modifier: Some(StartModifier::NamedSkip),
826                        end_modifier: None,
827                    }),
828                    Part::Split(" ".into()),
829                    Part::Name(Name {
830                        name: "auth".into(),
831                        start_modifier: Some(StartModifier::NamedSkip),
832                        end_modifier: None,
833                    }),
834                    Part::Split(" [".into()),
835                    Part::Name(Name {
836                        name: "timestamp".into(),
837                        start_modifier: None,
838                        end_modifier: None,
839                    }),
840                    Part::Split("]".into()),
841                ],
842            ),
843            (
844                "[%{ts}] [%{level}] %{*p1}:%{&p1} %{*p2}:%{&p2}",
845                vec![
846                    Part::Split("[".into()),
847                    Part::Name(Name {
848                        name: "ts".into(),
849                        start_modifier: None,
850                        end_modifier: None,
851                    }),
852                    Part::Split("] [".into()),
853                    Part::Name(Name {
854                        name: "level".into(),
855                        start_modifier: None,
856                        end_modifier: None,
857                    }),
858                    Part::Split("] ".into()),
859                    Part::Name(Name {
860                        name: "p1".into(),
861                        start_modifier: Some(StartModifier::MapKey),
862                        end_modifier: None,
863                    }),
864                    Part::Split(":".into()),
865                    Part::Name(Name {
866                        name: "p1".into(),
867                        start_modifier: Some(StartModifier::MapVal),
868                        end_modifier: None,
869                    }),
870                    Part::Split(" ".into()),
871                    Part::Name(Name {
872                        name: "p2".into(),
873                        start_modifier: Some(StartModifier::MapKey),
874                        end_modifier: None,
875                    }),
876                    Part::Split(":".into()),
877                    Part::Name(Name {
878                        name: "p2".into(),
879                        start_modifier: Some(StartModifier::MapVal),
880                        end_modifier: None,
881                    }),
882                ],
883            ),
884            (
885                "%{&p1}:%{*p1}",
886                vec![
887                    Part::Name(Name {
888                        name: "p1".into(),
889                        start_modifier: Some(StartModifier::MapVal),
890                        end_modifier: None,
891                    }),
892                    Part::Split(":".into()),
893                    Part::Name(Name {
894                        name: "p1".into(),
895                        start_modifier: Some(StartModifier::MapKey),
896                        end_modifier: None,
897                    }),
898                ],
899            ),
900        ];
901
902        for (pattern, expected) in cases.into_iter() {
903            let p: Pattern = pattern.parse().unwrap();
904            assert_eq!(p.parts, expected);
905        }
906    }
907
908    #[test]
909    fn test_dissect_invalid_pattern() {
910        let cases = [
911            ("", "Empty pattern is not allowed"),
912            (
913                "%{name1}%{name2}",
914                "Invalid Pattern: '%{name1}%{name2}'. consecutive names are not allowed: 'name1' 'name2'"
915            ),
916            (
917                "%{} %{ident",
918                "Invalid Pattern: '%{} %{ident'. 'ident' is not closed",
919            ),
920            (
921                "%{->clientip} ",
922                "Invalid Pattern: '%{->clientip} '. Expected '}' after '->'",
923            ),
924            (
925                "%{/clientip} ",
926                "Invalid Pattern: '%{/clientip} '. Invalid '/'",
927            ),
928            (
929                "%{+?clientip} ",
930                "Invalid Pattern: '%{+?clientip} '. Invalid '?'",
931            ),
932            (
933                "%{+clientip/} ",
934                "Invalid Pattern: '%{+clientip/} '. Digit order must be set after '/'",
935            ),
936            (
937                "%{+clientip/a} ",
938                "Invalid Pattern: '%{+clientip/a} '. Digit order must be set after '/'",
939            ),
940            (
941                "%{clientip/1} ",
942                "Invalid Pattern: '%{clientip/1} '. Invalid '/' in 'clientip'",
943            ),
944            (
945                "%{+clientip/1/2} ",
946                "Append Order modifier is already set to '1', cannot be set to 2",
947            ),
948            (
949                "%{+/1} ",
950                "Invalid Pattern: '%{+/1} '. only '+/1' modifier is invalid",
951            ),
952            (
953                "%{+} ",
954                "Invalid Pattern: '%{+} '. only '+' modifier is invalid",
955            ),
956            (
957                "%{?} ",
958                "Invalid Pattern: '%{?} '. only '?' modifier is invalid",
959            ),
960            (
961                "%{*} ",
962                "Invalid Pattern: '%{*} '. only '*' modifier is invalid",
963            ),
964            (
965                "%{&} ",
966                "Invalid Pattern: '%{&} '. only '&' modifier is invalid",
967            ),
968            (
969                "%{*ip}",
970                "Invalid Pattern: '%{*ip}'. key and value not matched: 'ip'"
971            ),
972            (
973                "%{*ip} %{*ip}",
974                "Invalid Pattern: '%{*ip} %{*ip}'. Duplicate map key: 'ip'",
975            ),
976            (
977                "%{*ip1} %{&ip2}",
978                "Invalid Pattern: '%{*ip1} %{&ip2}'. key and value not matched: 'ip1,ip2'"
979            ),
980        ];
981
982        for (pattern, expected) in cases.into_iter() {
983            let err = pattern.parse::<Pattern>().unwrap_err();
984            assert_eq!(err.to_string(), expected);
985        }
986    }
987
988    #[test]
989    fn test_dissect_process() {
990        let expected = [
991            ("timestamp", "30/Apr/1998:22:00:52 +0000"),
992            ("status", "200"),
993            ("clientip", "1.2.3.4"),
994            ("ident", "-"),
995            ("size", "3171"),
996            (
997                "request",
998                "/english/venues/cities/images/montpellier/18.gif",
999            ),
1000            ("auth", "-"),
1001            ("verb", "GET"),
1002            ("httpversion", "1.0"),
1003        ]
1004        .into_iter()
1005        .map(|(k, v)| {
1006            (
1007                KeyString::from(k.to_string()),
1008                VrlValue::Bytes(Bytes::from(v.to_string())),
1009            )
1010        })
1011        .collect::<HashMap<KeyString, VrlValue>>();
1012
1013        {
1014            // pattern start with Name
1015            let pattern_str = "%{clientip} %{ident} %{auth} [%{timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}";
1016            let input = "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171";
1017
1018            assert(pattern_str, input, expected.clone());
1019        }
1020
1021        {
1022            // pattern start with Split
1023            let pattern_str = " %{clientip} %{ident} %{auth} [%{timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}";
1024            let input = " 1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171";
1025
1026            assert(pattern_str, input, expected);
1027        }
1028    }
1029
1030    #[test]
1031    fn test_dissect_right_padding_modifier() {
1032        let cases = [
1033            (
1034                "%{ts->} %{level}",
1035                "1998-08-10T17:15:42,466          WARN",
1036                [("ts", "1998-08-10T17:15:42,466"), ("level", "WARN")],
1037            ),
1038            (
1039                "[%{ts}]%{->}[%{level}]",
1040                "[1998-08-10T17:15:42,466]            [WARN]",
1041                [("ts", "1998-08-10T17:15:42,466"), ("level", "WARN")],
1042            ),
1043            (
1044                "[%{ts}]%{->}[%{level}]",
1045                "[1998-08-10T17:15:42,466]            [[[[WARN]",
1046                [("ts", "1998-08-10T17:15:42,466"), ("level", "WARN")],
1047            ),
1048        ]
1049        .into_iter()
1050        .map(|(pattern, input, expected)| {
1051            let map = expected.into_iter().map(|(k, v)| {
1052                (
1053                    KeyString::from(k.to_string()),
1054                    VrlValue::Bytes(Bytes::from(v.to_string())),
1055                )
1056            });
1057            (pattern, input, map)
1058        });
1059
1060        for (pattern_str, input, expected) in cases {
1061            assert(
1062                pattern_str,
1063                input,
1064                expected.collect::<HashMap<KeyString, VrlValue>>(),
1065            );
1066        }
1067    }
1068
1069    #[test]
1070    fn test_dissect_append_modifier() {
1071        let cases = [
1072            (
1073                "%{+name} %{+name} %{+name} %{+name}",
1074                "john jacob jingleheimer schmidt",
1075                [("name", "john jacob jingleheimer schmidt")],
1076            ),
1077            (
1078                "%{+name/2} %{+name/4} %{+name/3} %{+name/1}",
1079                "john jacob jingleheimer schmidt",
1080                [("name", "schmidt john jingleheimer jacob")],
1081            ),
1082        ]
1083        .into_iter()
1084        .map(|(pattern, input, expected)| {
1085            let map = expected.into_iter().map(|(k, v)| {
1086                (
1087                    KeyString::from(k.to_string()),
1088                    VrlValue::Bytes(Bytes::from(v.to_string())),
1089                )
1090            });
1091            (pattern, input, map)
1092        });
1093
1094        for (pattern_str, input, expected) in cases {
1095            assert(
1096                pattern_str,
1097                input,
1098                expected.collect::<HashMap<KeyString, VrlValue>>(),
1099            );
1100        }
1101    }
1102
1103    #[test]
1104    fn test_dissect_named_skip_modifier() {
1105        let cases = [(
1106            "%{clientip} %{?ident} %{?auth} [%{timestamp}]",
1107            "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]",
1108            [
1109                ("clientip", "1.2.3.4"),
1110                ("timestamp", "30/Apr/1998:22:00:52 +0000"),
1111            ],
1112        )]
1113        .into_iter()
1114        .map(|(pattern, input, expected)| {
1115            let map = expected.into_iter().map(|(k, v)| {
1116                (
1117                    KeyString::from(k.to_string()),
1118                    VrlValue::Bytes(Bytes::from(v.to_string())),
1119                )
1120            });
1121            (pattern, input, map)
1122        });
1123
1124        for (pattern_str, input, expected) in cases {
1125            assert(
1126                pattern_str,
1127                input,
1128                expected.collect::<HashMap<KeyString, VrlValue>>(),
1129            );
1130        }
1131    }
1132}