pipeline/etl/processor/
dissect.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use itertools::Itertools;
19use snafu::OptionExt;
20
21use crate::error::{
22    DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu,
23    DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu,
24    DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu,
25    DissectOrderOnlyAppendSnafu, DissectSplitExceedsInputSnafu, DissectSplitNotMatchInputSnafu,
26    Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
27};
28use crate::etl::field::Fields;
29use crate::etl::processor::{
30    yaml_bool, yaml_new_field, yaml_new_fields, yaml_parse_string, yaml_parse_strings, yaml_string,
31    Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
32};
33use crate::etl::value::Value;
34use crate::etl::PipelineMap;
35
36pub(crate) const PROCESSOR_DISSECT: &str = "dissect";
37
38const APPEND_SEPARATOR_NAME: &str = "append_separator";
39
40#[derive(Debug, PartialEq)]
41enum StartModifier {
42    Append(Option<u32>),
43    NamedSkip,
44    MapKey,
45    MapVal,
46}
47
48impl std::fmt::Display for StartModifier {
49    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
50        match self {
51            StartModifier::Append(o) => match o {
52                Some(v) => write!(f, "+/{v}"),
53                None => write!(f, "+"),
54            },
55            StartModifier::NamedSkip => write!(f, "?"),
56            StartModifier::MapKey => write!(f, "*"),
57            StartModifier::MapVal => write!(f, "&"),
58        }
59    }
60}
61
62#[derive(Debug, PartialEq)]
63struct EndModifier;
64
65impl std::fmt::Display for EndModifier {
66    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67        write!(f, "->",)
68    }
69}
70
71impl Name {
72    fn is_name_empty(&self) -> bool {
73        self.name.is_empty()
74    }
75
76    fn is_empty(&self) -> bool {
77        self.name.is_empty() && self.start_modifier.is_none() && self.end_modifier.is_none()
78    }
79
80    fn try_start_modifier(&mut self, modifier: StartModifier) -> Result<()> {
81        match &self.start_modifier {
82            Some(m) => DissectModifierAlreadySetSnafu {
83                m: m.to_string(),
84                modifier: modifier.to_string(),
85            }
86            .fail(),
87            None => {
88                self.start_modifier = Some(modifier);
89                Ok(())
90            }
91        }
92    }
93
94    fn try_append_order(&mut self, order: u32) -> Result<()> {
95        match &mut self.start_modifier {
96            Some(StartModifier::Append(o)) => match o {
97                Some(n) => DissectAppendOrderAlreadySetSnafu {
98                    n: n.to_string(),
99                    order,
100                }
101                .fail(),
102                None => {
103                    *o = Some(order);
104                    Ok(())
105                }
106            },
107            Some(m) => DissectOrderOnlyAppendSnafu { m: m.to_string() }.fail(),
108            None => DissectOrderOnlyAppendModifierSnafu.fail(),
109        }
110    }
111
112    fn try_end_modifier(&mut self) -> Result<()> {
113        match &self.end_modifier {
114            Some(m) => DissectEndModifierAlreadySetSnafu { m: m.to_string() }.fail(),
115            None => {
116                self.end_modifier = Some(EndModifier);
117                Ok(())
118            }
119        }
120    }
121
122    fn is_append_modifier_set(&self) -> bool {
123        matches!(self.start_modifier, Some(StartModifier::Append(_)))
124    }
125
126    fn is_start_modifier_set(&self) -> bool {
127        self.start_modifier.is_some()
128    }
129
130    fn is_end_modifier_set(&self) -> bool {
131        self.end_modifier.is_some()
132    }
133}
134
135#[derive(Debug, PartialEq, Default)]
136struct Name {
137    name: String,
138    start_modifier: Option<StartModifier>,
139    end_modifier: Option<EndModifier>,
140}
141
142impl std::fmt::Display for Name {
143    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
144        write!(f, "{}", self.name)
145    }
146}
147
148impl From<&str> for Name {
149    fn from(value: &str) -> Self {
150        Name {
151            name: value.to_string(),
152            start_modifier: None,
153            end_modifier: None,
154        }
155    }
156}
157
158#[derive(Debug, PartialEq)]
159enum Part {
160    Split(String),
161    Name(Name),
162}
163
164impl Part {
165    fn is_empty(&self) -> bool {
166        match self {
167            Part::Split(v) => v.is_empty(),
168            Part::Name(v) => v.is_empty(),
169        }
170    }
171
172    fn empty_split() -> Self {
173        Part::Split(String::new())
174    }
175
176    fn empty_name() -> Self {
177        Part::Name(Name::default())
178    }
179
180    fn push(&mut self, ch: char) {
181        match self {
182            Part::Split(v) => v.push(ch),
183            Part::Name(v) => v.name.push(ch),
184        }
185    }
186}
187
188#[derive(Debug, Default)]
189struct Pattern {
190    origin: String,
191    parts: Vec<Part>,
192}
193
194impl Deref for Pattern {
195    type Target = Vec<Part>;
196
197    fn deref(&self) -> &Self::Target {
198        &self.parts
199    }
200}
201
202impl std::str::FromStr for Pattern {
203    type Err = Error;
204
205    fn from_str(s: &str) -> Result<Self> {
206        let mut parts = vec![];
207        let mut cursor = Part::empty_split();
208
209        let origin = s.to_string();
210        let chars: Vec<char> = origin.chars().collect();
211
212        let mut pos = 0;
213        while pos < chars.len() {
214            let ch = chars[pos];
215            match (ch, &mut cursor) {
216                // if cursor is Split part, and found %{, then ready to start a Name part
217                ('%', Part::Split(_)) if matches!(chars.get(pos + 1), Some('{')) => {
218                    if !cursor.is_empty() {
219                        parts.push(cursor);
220                    }
221
222                    cursor = Part::empty_name();
223                    pos += 1; // skip '{'
224                }
225                // if cursor is Split part, and not found % or {, then continue the Split part
226                (_, Part::Split(_)) => {
227                    cursor.push(ch);
228                }
229                // if cursor is Name part, and found }, then end the Name part, start the next Split part
230                ('}', Part::Name(_)) => {
231                    parts.push(cursor);
232                    cursor = Part::empty_split();
233                }
234                ('+', Part::Name(name)) if !name.is_start_modifier_set() => {
235                    name.try_start_modifier(StartModifier::Append(None))?;
236                }
237                ('/', Part::Name(name)) if name.is_append_modifier_set() => {
238                    let mut order = 0;
239                    let mut j = pos + 1;
240                    while j < chars.len() {
241                        let digit = chars[j];
242                        if digit.is_ascii_digit() {
243                            order = order * 10 + digit.to_digit(10).unwrap();
244                            j += 1;
245                        } else {
246                            break;
247                        }
248                    }
249
250                    if j == pos + 1 {
251                        return DissectInvalidPatternSnafu {
252                            s,
253                            detail: "Digit order must be set after '/'",
254                        }
255                        .fail();
256                    }
257
258                    name.try_append_order(order)?;
259                    pos = j - 1; // this will change the position to the last digit of the order
260                }
261                ('?', Part::Name(name)) if !name.is_start_modifier_set() => {
262                    name.try_start_modifier(StartModifier::NamedSkip)?;
263                }
264                ('*', Part::Name(name)) if !name.is_start_modifier_set() => {
265                    name.try_start_modifier(StartModifier::MapKey)?;
266                }
267                ('&', Part::Name(name)) if !name.is_start_modifier_set() => {
268                    name.try_start_modifier(StartModifier::MapVal)?;
269                }
270                ('-', Part::Name(name)) if !name.is_end_modifier_set() => {
271                    if let Some('>') = chars.get(pos + 1) {
272                    } else {
273                        return DissectInvalidPatternSnafu {
274                            s,
275                            detail: "Expected '->' but only '-'",
276                        }
277                        .fail();
278                    }
279
280                    if let Some('}') = chars.get(pos + 2) {
281                    } else {
282                        return DissectInvalidPatternSnafu {
283                            s,
284                            detail: "Expected '}' after '->'",
285                        }
286                        .fail();
287                    }
288
289                    name.try_end_modifier()?;
290                    pos += 1; // only skip '>', the next loop will skip '}'
291                }
292                (_, Part::Name(name)) if !is_valid_char(ch) => {
293                    let tail: String = if name.is_name_empty() {
294                        format!("Invalid '{ch}'")
295                    } else {
296                        format!("Invalid '{ch}' in '{name}'")
297                    };
298                    return DissectInvalidPatternSnafu { s, detail: tail }.fail();
299                }
300                (_, Part::Name(_)) => {
301                    cursor.push(ch);
302                }
303            }
304
305            pos += 1;
306        }
307
308        match cursor {
309            Part::Split(ref split) if !split.is_empty() => parts.push(cursor),
310            Part::Name(name) if !name.is_empty() => {
311                return DissectInvalidPatternSnafu {
312                    s,
313                    detail: format!("'{name}' is not closed"),
314                }
315                .fail();
316            }
317            _ => {}
318        }
319
320        let pattern = Self { parts, origin };
321        pattern.check()?;
322        Ok(pattern)
323    }
324}
325
326impl Pattern {
327    fn check(&self) -> Result<()> {
328        if self.is_empty() {
329            return DissectEmptyPatternSnafu.fail();
330        }
331
332        let mut map_keys = HashSet::new();
333        let mut map_vals = HashSet::new();
334
335        for i in 0..self.len() {
336            let this_part = &self[i];
337            let next_part = self.get(i + 1);
338            match (this_part, next_part) {
339                (Part::Split(split), _) if split.is_empty() => {
340                    return DissectInvalidPatternSnafu {
341                        s: &self.origin,
342                        detail: "Empty split is not allowed",
343                    }
344                    .fail();
345                }
346                (Part::Name(name1), Some(Part::Name(name2))) => {
347                    return DissectInvalidPatternSnafu {
348                        s: &self.origin,
349                        detail: format!("consecutive names are not allowed: '{name1}' '{name2}'",),
350                    }
351                    .fail();
352                }
353                (Part::Name(name), _) if name.is_name_empty() => {
354                    if let Some(ref m) = name.start_modifier {
355                        return DissectInvalidPatternSnafu {
356                            s: &self.origin,
357                            detail: format!("only '{m}' modifier is invalid"),
358                        }
359                        .fail();
360                    }
361                }
362                (Part::Name(name), _) => match name.start_modifier {
363                    Some(StartModifier::MapKey) => {
364                        if map_keys.contains(&name.name) {
365                            return DissectInvalidPatternSnafu {
366                                s: &self.origin,
367                                detail: format!("Duplicate map key: '{}'", name.name),
368                            }
369                            .fail();
370                        } else {
371                            map_keys.insert(&name.name);
372                        }
373                    }
374                    Some(StartModifier::MapVal) => {
375                        if map_vals.contains(&name.name) {
376                            return DissectInvalidPatternSnafu {
377                                s: &self.origin,
378                                detail: format!("Duplicate map val: '{}'", name.name),
379                            }
380                            .fail();
381                        } else {
382                            map_vals.insert(&name.name);
383                        }
384                    }
385                    _ => {}
386                },
387                _ => {}
388            }
389        }
390
391        if map_keys != map_vals {
392            return DissectInvalidPatternSnafu {
393                s: &self.origin,
394                detail: format!(
395                    "key and value not matched: '{}'",
396                    map_keys
397                        .symmetric_difference(&map_vals)
398                        .map(|s| s.as_str())
399                        .collect::<Vec<&str>>()
400                        .join(",")
401                ),
402            }
403            .fail();
404        }
405
406        Ok(())
407    }
408}
409
410#[derive(Debug, Default)]
411pub struct DissectProcessor {
412    fields: Fields,
413    patterns: Vec<Pattern>,
414    ignore_missing: bool,
415
416    // The character(s) that separate the appended fields. Default is an empty string.
417    append_separator: Option<String>,
418}
419
420impl DissectProcessor {
421    fn process_name_value<'a>(
422        name: &'a Name,
423        value: String,
424        appends: &mut HashMap<&'a String, Vec<(String, u32)>>,
425        map: &mut Vec<(&'a String, Value)>,
426    ) {
427        match name.start_modifier {
428            Some(StartModifier::NamedSkip) => {
429                // do nothing, ignore this match
430            }
431            Some(StartModifier::Append(order)) => {
432                appends
433                    .entry(&name.name)
434                    .or_default()
435                    .push((value, order.unwrap_or_default()));
436            }
437            Some(_) => {
438                // do nothing, ignore MapKey and MapVal
439                // because transform can know the key name
440            }
441            None => {
442                map.push((&name.name, Value::String(value)));
443            }
444        }
445    }
446
447    fn process_pattern(&self, chs: &[char], pattern: &Pattern) -> Result<Vec<(String, Value)>> {
448        let mut map = Vec::new();
449        let mut pos = 0;
450
451        let mut appends: HashMap<&String, Vec<(String, u32)>> = HashMap::new();
452
453        for i in 0..pattern.len() {
454            let this_part = &pattern[i];
455            let next_part = pattern.get(i + 1);
456            match (this_part, next_part) {
457                // if Split part, and exactly matches, then move pos split.len() forward
458                (Part::Split(split), _) => {
459                    let split_chs = split.chars().collect::<Vec<char>>();
460                    let split_len = split_chs.len();
461                    if pos + split_len > chs.len() {
462                        return DissectSplitExceedsInputSnafu { split }.fail();
463                    }
464
465                    if &chs[pos..pos + split_len] != split_chs.as_slice() {
466                        return DissectSplitNotMatchInputSnafu {
467                            split,
468                            input: chs[pos..pos + split_len].iter().collect::<String>(),
469                        }
470                        .fail();
471                    }
472
473                    pos += split_len;
474                }
475
476                (Part::Name(name1), Some(Part::Name(name2))) => {
477                    return DissectConsecutiveNamesSnafu {
478                        name1: name1.to_string(),
479                        name2: name2.to_string(),
480                    }
481                    .fail();
482                }
483
484                // if Name part is the last part, then the rest of the input is the value
485                (Part::Name(name), None) => {
486                    let value = chs[pos..].iter().collect::<String>();
487                    Self::process_name_value(name, value, &mut appends, &mut map);
488                }
489
490                // if Name part, and next part is Split, then find the matched value of the name
491                (Part::Name(name), Some(Part::Split(split))) => {
492                    let stop = split.chars().next().context(DissectInvalidPatternSnafu {
493                        s: &pattern.origin,
494                        detail: "Empty split is not allowed",
495                    })?; // this won't happen
496                    let mut end = pos;
497                    while end < chs.len() && chs[end] != stop {
498                        end += 1;
499                    }
500
501                    if !name.is_name_empty() {
502                        let value = chs[pos..end].iter().collect::<String>();
503                        Self::process_name_value(name, value, &mut appends, &mut map);
504                    }
505
506                    if name.is_end_modifier_set() {
507                        while end < chs.len() && chs[end] == stop {
508                            end += 1;
509                        }
510                        end -= 1; // leave the last stop character to match the next split
511                    }
512
513                    pos = end;
514                }
515            }
516        }
517
518        if !appends.is_empty() {
519            let sep = match self.append_separator {
520                Some(ref sep) => sep,
521                None => " ",
522            };
523
524            for (name, mut values) in appends {
525                values.sort_by(|a, b| a.1.cmp(&b.1));
526                let value = values.into_iter().map(|(a, _)| a).join(sep);
527                map.push((name, Value::String(value)));
528            }
529        }
530
531        Ok(map.into_iter().map(|(k, v)| (k.to_string(), v)).collect())
532    }
533
534    fn process(&self, val: &str) -> Result<Vec<(String, Value)>> {
535        let chs = val.chars().collect::<Vec<char>>();
536
537        for pattern in &self.patterns {
538            if let Ok(map) = self.process_pattern(&chs, pattern) {
539                return Ok(map);
540            }
541        }
542        DissectNoMatchingPatternSnafu.fail()
543    }
544}
545
546impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor {
547    type Error = Error;
548
549    fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
550        let mut fields = Fields::default();
551        let mut patterns = vec![];
552        let mut ignore_missing = false;
553        let mut append_separator = None;
554
555        for (k, v) in value.iter() {
556            let key = k
557                .as_str()
558                .with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;
559
560            match key {
561                FIELD_NAME => {
562                    fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
563                }
564                FIELDS_NAME => {
565                    fields = yaml_new_fields(v, FIELDS_NAME)?;
566                }
567                PATTERN_NAME => {
568                    let pattern: Pattern = yaml_parse_string(v, PATTERN_NAME)?;
569                    patterns = vec![pattern];
570                }
571                PATTERNS_NAME => {
572                    patterns = yaml_parse_strings(v, PATTERNS_NAME)?;
573                }
574                IGNORE_MISSING_NAME => {
575                    ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
576                }
577                APPEND_SEPARATOR_NAME => {
578                    append_separator = Some(yaml_string(v, APPEND_SEPARATOR_NAME)?);
579                }
580                _ => {}
581            }
582        }
583        // let output_keys = Self::build_output_keys(&patterns);
584        let builder = DissectProcessor {
585            fields,
586            patterns,
587            ignore_missing,
588            append_separator,
589        };
590
591        Ok(builder)
592    }
593}
594
595impl Processor for DissectProcessor {
596    fn kind(&self) -> &str {
597        PROCESSOR_DISSECT
598    }
599
600    fn ignore_missing(&self) -> bool {
601        self.ignore_missing
602    }
603
604    fn exec_mut(&self, mut val: PipelineMap) -> Result<PipelineMap> {
605        for field in self.fields.iter() {
606            let index = field.input_field();
607            match val.get(index) {
608                Some(Value::String(val_str)) => {
609                    let r = self.process(val_str)?;
610                    for (k, v) in r {
611                        val.insert(k, v);
612                    }
613                }
614                Some(Value::Null) | None => {
615                    if !self.ignore_missing {
616                        return ProcessorMissingFieldSnafu {
617                            processor: self.kind(),
618                            field: field.input_field(),
619                        }
620                        .fail();
621                    }
622                }
623                Some(v) => {
624                    return ProcessorExpectStringSnafu {
625                        processor: self.kind(),
626                        v: v.clone(),
627                    }
628                    .fail();
629                }
630            }
631        }
632        Ok(val)
633    }
634}
635
636fn is_valid_char(ch: char) -> bool {
637    ch.is_alphanumeric() || ch == '_'
638}
639
640#[cfg(test)]
641mod tests {
642    use ahash::HashMap;
643
644    use super::{DissectProcessor, EndModifier, Name, Part, StartModifier};
645    use crate::etl::processor::dissect::Pattern;
646    use crate::etl::value::Value;
647
648    fn assert(pattern_str: &str, input: &str, expected: HashMap<String, Value>) {
649        let chs = input.chars().collect::<Vec<char>>();
650        let patterns: Vec<Pattern> = vec![pattern_str.parse().unwrap()];
651
652        let processor = DissectProcessor::default();
653        let result: HashMap<String, Value> = processor
654            .process_pattern(&chs, &patterns[0])
655            .unwrap()
656            .into_iter()
657            .collect();
658
659        assert_eq!(result, expected, "pattern: {}", pattern_str);
660    }
661
662    #[test]
663    fn test_dissect_simple_pattern() {
664        let cases = [(
665            "%{clientip} %{ident} %{auth} [%{timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}",
666            vec![
667                Part::Name("clientip".into()),
668                Part::Split(" ".into()),
669                Part::Name("ident".into()),
670                Part::Split(" ".into()),
671                Part::Name("auth".into()),
672                Part::Split(" [".into()),
673                Part::Name("timestamp".into()),
674                Part::Split("] \"".into()),
675                Part::Name("verb".into()),
676                Part::Split(" ".into()),
677                Part::Name("request".into()),
678                Part::Split(" HTTP/".into()),
679                Part::Name("httpversion".into()),
680                Part::Split("\" ".into()),
681                Part::Name("status".into()),
682                Part::Split(" ".into()),
683                Part::Name("size".into()),
684            ],
685        )];
686
687        for (pattern, expected) in cases.into_iter() {
688            let p: Pattern = pattern.parse().unwrap();
689            assert_eq!(p.parts, expected);
690        }
691    }
692
693    #[test]
694    fn test_dissect_modifier_pattern() {
695        let cases = [
696            (
697                "%{} %{}",
698                vec![
699                    Part::Name(Name {
700                        name: "".into(),
701                        start_modifier: None,
702                        end_modifier: None,
703                    }),
704                    Part::Split(" ".into()),
705                    Part::Name(Name {
706                        name: "".into(),
707                        start_modifier: None,
708                        end_modifier: None,
709                    }),
710                ],
711            ),
712            (
713                "%{ts->} %{level}",
714                vec![
715                    Part::Name(Name {
716                        name: "ts".into(),
717                        start_modifier: None,
718                        end_modifier: Some(EndModifier),
719                    }),
720                    Part::Split(" ".into()),
721                    Part::Name("level".into()),
722                ],
723            ),
724            (
725                "[%{ts}]%{->}[%{level}]",
726                vec![
727                    Part::Split("[".into()),
728                    Part::Name(Name {
729                        name: "ts".into(),
730                        start_modifier: None,
731                        end_modifier: None,
732                    }),
733                    Part::Split("]".into()),
734                    Part::Name(Name {
735                        name: "".into(),
736                        start_modifier: None,
737                        end_modifier: Some(EndModifier),
738                    }),
739                    Part::Split("[".into()),
740                    Part::Name(Name {
741                        name: "level".into(),
742                        start_modifier: None,
743                        end_modifier: None,
744                    }),
745                    Part::Split("]".into()),
746                ],
747            ),
748            (
749                "%{+name} %{+name} %{+name} %{+name}",
750                vec![
751                    Part::Name(Name {
752                        name: "name".into(),
753                        start_modifier: Some(StartModifier::Append(None)),
754                        end_modifier: None,
755                    }),
756                    Part::Split(" ".into()),
757                    Part::Name(Name {
758                        name: "name".into(),
759                        start_modifier: Some(StartModifier::Append(None)),
760                        end_modifier: None,
761                    }),
762                    Part::Split(" ".into()),
763                    Part::Name(Name {
764                        name: "name".into(),
765                        start_modifier: Some(StartModifier::Append(None)),
766                        end_modifier: None,
767                    }),
768                    Part::Split(" ".into()),
769                    Part::Name(Name {
770                        name: "name".into(),
771                        start_modifier: Some(StartModifier::Append(None)),
772                        end_modifier: None,
773                    }),
774                ],
775            ),
776            (
777                "%{+name/2} %{+name/4} %{+name/3} %{+name/1}",
778                vec![
779                    Part::Name(Name {
780                        name: "name".into(),
781                        start_modifier: Some(StartModifier::Append(Some(2))),
782                        end_modifier: None,
783                    }),
784                    Part::Split(" ".into()),
785                    Part::Name(Name {
786                        name: "name".into(),
787                        start_modifier: Some(StartModifier::Append(Some(4))),
788                        end_modifier: None,
789                    }),
790                    Part::Split(" ".into()),
791                    Part::Name(Name {
792                        name: "name".into(),
793                        start_modifier: Some(StartModifier::Append(Some(3))),
794                        end_modifier: None,
795                    }),
796                    Part::Split(" ".into()),
797                    Part::Name(Name {
798                        name: "name".into(),
799                        start_modifier: Some(StartModifier::Append(Some(1))),
800                        end_modifier: None,
801                    }),
802                ],
803            ),
804            (
805                "%{clientip} %{?ident} %{?auth} [%{timestamp}]",
806                vec![
807                    Part::Name(Name {
808                        name: "clientip".into(),
809                        start_modifier: None,
810                        end_modifier: None,
811                    }),
812                    Part::Split(" ".into()),
813                    Part::Name(Name {
814                        name: "ident".into(),
815                        start_modifier: Some(StartModifier::NamedSkip),
816                        end_modifier: None,
817                    }),
818                    Part::Split(" ".into()),
819                    Part::Name(Name {
820                        name: "auth".into(),
821                        start_modifier: Some(StartModifier::NamedSkip),
822                        end_modifier: None,
823                    }),
824                    Part::Split(" [".into()),
825                    Part::Name(Name {
826                        name: "timestamp".into(),
827                        start_modifier: None,
828                        end_modifier: None,
829                    }),
830                    Part::Split("]".into()),
831                ],
832            ),
833            (
834                "[%{ts}] [%{level}] %{*p1}:%{&p1} %{*p2}:%{&p2}",
835                vec![
836                    Part::Split("[".into()),
837                    Part::Name(Name {
838                        name: "ts".into(),
839                        start_modifier: None,
840                        end_modifier: None,
841                    }),
842                    Part::Split("] [".into()),
843                    Part::Name(Name {
844                        name: "level".into(),
845                        start_modifier: None,
846                        end_modifier: None,
847                    }),
848                    Part::Split("] ".into()),
849                    Part::Name(Name {
850                        name: "p1".into(),
851                        start_modifier: Some(StartModifier::MapKey),
852                        end_modifier: None,
853                    }),
854                    Part::Split(":".into()),
855                    Part::Name(Name {
856                        name: "p1".into(),
857                        start_modifier: Some(StartModifier::MapVal),
858                        end_modifier: None,
859                    }),
860                    Part::Split(" ".into()),
861                    Part::Name(Name {
862                        name: "p2".into(),
863                        start_modifier: Some(StartModifier::MapKey),
864                        end_modifier: None,
865                    }),
866                    Part::Split(":".into()),
867                    Part::Name(Name {
868                        name: "p2".into(),
869                        start_modifier: Some(StartModifier::MapVal),
870                        end_modifier: None,
871                    }),
872                ],
873            ),
874            (
875                "%{&p1}:%{*p1}",
876                vec![
877                    Part::Name(Name {
878                        name: "p1".into(),
879                        start_modifier: Some(StartModifier::MapVal),
880                        end_modifier: None,
881                    }),
882                    Part::Split(":".into()),
883                    Part::Name(Name {
884                        name: "p1".into(),
885                        start_modifier: Some(StartModifier::MapKey),
886                        end_modifier: None,
887                    }),
888                ],
889            ),
890        ];
891
892        for (pattern, expected) in cases.into_iter() {
893            let p: Pattern = pattern.parse().unwrap();
894            assert_eq!(p.parts, expected);
895        }
896    }
897
898    #[test]
899    fn test_dissect_invalid_pattern() {
900        let cases = [
901            ("", "Empty pattern is not allowed"),
902            (
903                "%{name1}%{name2}",
904                "Invalid Pattern: '%{name1}%{name2}'. consecutive names are not allowed: 'name1' 'name2'"
905            ),
906            (
907                "%{} %{ident",
908                "Invalid Pattern: '%{} %{ident'. 'ident' is not closed",
909            ),
910            (
911                "%{->clientip} ",
912                "Invalid Pattern: '%{->clientip} '. Expected '}' after '->'",
913            ),
914            (
915                "%{/clientip} ",
916                "Invalid Pattern: '%{/clientip} '. Invalid '/'",
917            ),
918            (
919                "%{+?clientip} ",
920                "Invalid Pattern: '%{+?clientip} '. Invalid '?'",
921            ),
922            (
923                "%{+clientip/} ",
924                "Invalid Pattern: '%{+clientip/} '. Digit order must be set after '/'",
925            ),
926            (
927                "%{+clientip/a} ",
928                "Invalid Pattern: '%{+clientip/a} '. Digit order must be set after '/'",
929            ),
930            (
931                "%{clientip/1} ",
932                "Invalid Pattern: '%{clientip/1} '. Invalid '/' in 'clientip'",
933            ),
934            (
935                "%{+clientip/1/2} ",
936                "Append Order modifier is already set to '1', cannot be set to 2",
937            ),
938            (
939                "%{+/1} ",
940                "Invalid Pattern: '%{+/1} '. only '+/1' modifier is invalid",
941            ),
942            (
943                "%{+} ",
944                "Invalid Pattern: '%{+} '. only '+' modifier is invalid",
945            ),
946            (
947                "%{?} ",
948                "Invalid Pattern: '%{?} '. only '?' modifier is invalid",
949            ),
950            (
951                "%{*} ",
952                "Invalid Pattern: '%{*} '. only '*' modifier is invalid",
953            ),
954            (
955                "%{&} ",
956                "Invalid Pattern: '%{&} '. only '&' modifier is invalid",
957            ),
958            (
959                "%{*ip}",
960                "Invalid Pattern: '%{*ip}'. key and value not matched: 'ip'"
961            ),
962            (
963                "%{*ip} %{*ip}",
964                "Invalid Pattern: '%{*ip} %{*ip}'. Duplicate map key: 'ip'",
965            ),
966            (
967                "%{*ip1} %{&ip2}",
968                "Invalid Pattern: '%{*ip1} %{&ip2}'. key and value not matched: 'ip1,ip2'"
969            ),
970        ];
971
972        for (pattern, expected) in cases.into_iter() {
973            let err = pattern.parse::<Pattern>().unwrap_err();
974            assert_eq!(err.to_string(), expected);
975        }
976    }
977
978    #[test]
979    fn test_dissect_process() {
980        let expected = [
981            ("timestamp", "30/Apr/1998:22:00:52 +0000"),
982            ("status", "200"),
983            ("clientip", "1.2.3.4"),
984            ("ident", "-"),
985            ("size", "3171"),
986            (
987                "request",
988                "/english/venues/cities/images/montpellier/18.gif",
989            ),
990            ("auth", "-"),
991            ("verb", "GET"),
992            ("httpversion", "1.0"),
993        ]
994        .into_iter()
995        .map(|(k, v)| (k.to_string(), Value::String(v.to_string())))
996        .collect::<HashMap<String, Value>>();
997
998        {
999            // pattern start with Name
1000            let pattern_str = "%{clientip} %{ident} %{auth} [%{timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}";
1001            let input = "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171";
1002
1003            assert(pattern_str, input, expected.clone());
1004        }
1005
1006        {
1007            // pattern start with Split
1008            let pattern_str = " %{clientip} %{ident} %{auth} [%{timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}";
1009            let input = " 1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171";
1010
1011            assert(pattern_str, input, expected);
1012        }
1013    }
1014
1015    #[test]
1016    fn test_dissect_right_padding_modifier() {
1017        let cases = [
1018            (
1019                "%{ts->} %{level}",
1020                "1998-08-10T17:15:42,466          WARN",
1021                [("ts", "1998-08-10T17:15:42,466"), ("level", "WARN")],
1022            ),
1023            (
1024                "[%{ts}]%{->}[%{level}]",
1025                "[1998-08-10T17:15:42,466]            [WARN]",
1026                [("ts", "1998-08-10T17:15:42,466"), ("level", "WARN")],
1027            ),
1028            (
1029                "[%{ts}]%{->}[%{level}]",
1030                "[1998-08-10T17:15:42,466]            [[[[WARN]",
1031                [("ts", "1998-08-10T17:15:42,466"), ("level", "WARN")],
1032            ),
1033        ]
1034        .into_iter()
1035        .map(|(pattern, input, expected)| {
1036            let map = expected
1037                .into_iter()
1038                .map(|(k, v)| (k.to_string(), Value::String(v.to_string())));
1039            (pattern, input, map)
1040        });
1041
1042        for (pattern_str, input, expected) in cases {
1043            assert(
1044                pattern_str,
1045                input,
1046                expected.collect::<HashMap<String, Value>>(),
1047            );
1048        }
1049    }
1050
1051    #[test]
1052    fn test_dissect_append_modifier() {
1053        let cases = [
1054            (
1055                "%{+name} %{+name} %{+name} %{+name}",
1056                "john jacob jingleheimer schmidt",
1057                [("name", "john jacob jingleheimer schmidt")],
1058            ),
1059            (
1060                "%{+name/2} %{+name/4} %{+name/3} %{+name/1}",
1061                "john jacob jingleheimer schmidt",
1062                [("name", "schmidt john jingleheimer jacob")],
1063            ),
1064        ]
1065        .into_iter()
1066        .map(|(pattern, input, expected)| {
1067            let map = expected
1068                .into_iter()
1069                .map(|(k, v)| (k.to_string(), Value::String(v.to_string())));
1070            (pattern, input, map)
1071        });
1072
1073        for (pattern_str, input, expected) in cases {
1074            assert(
1075                pattern_str,
1076                input,
1077                expected.collect::<HashMap<String, Value>>(),
1078            );
1079        }
1080    }
1081
1082    #[test]
1083    fn test_dissect_named_skip_modifier() {
1084        let cases = [(
1085            "%{clientip} %{?ident} %{?auth} [%{timestamp}]",
1086            "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]",
1087            [
1088                ("clientip", "1.2.3.4"),
1089                ("timestamp", "30/Apr/1998:22:00:52 +0000"),
1090            ],
1091        )]
1092        .into_iter()
1093        .map(|(pattern, input, expected)| {
1094            let map = expected
1095                .into_iter()
1096                .map(|(k, v)| (k.to_string(), Value::String(v.to_string())));
1097            (pattern, input, map)
1098        });
1099
1100        for (pattern_str, input, expected) in cases {
1101            assert(
1102                pattern_str,
1103                input,
1104                expected.collect::<HashMap<String, Value>>(),
1105            );
1106        }
1107    }
1108}