pipeline/
tablesuffix.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use dyn_fmt::AsStrFormatExt;
use regex::Regex;
use snafu::{ensure, OptionExt};
use yaml_rust::Yaml;

use crate::error::{
    Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result,
};
use crate::{PipelineMap, Value};

const REPLACE_KEY: &str = "{}";

lazy_static::lazy_static! {
    static ref NAME_TPL: Regex = Regex::new(r"\$\{([^}]+)\}").unwrap();
}

/// TableSuffixTemplate is used to generate suffix for the table name, so that the input data can be written to multiple tables.
/// The config should be placed at the end of the pipeline.
/// Use `${variable}` to refer to the variable in the pipeline context, the viarable can be from input data or be a processed result.
/// Note the variable should be an integer number or a string.
/// In case of any error occurs during runtime, no suffix will be added to the table name.
///
/// ```yaml
/// table_suffix: _${xxx}_${b}
/// ```
///
/// For example, if the template is `_${xxx}_${b}`, and the pipeline context is
/// `{"xxx": "123", "b": "456"}`, the generated table name will be `_123_456`.
#[derive(Debug, PartialEq)]
pub(crate) struct TableSuffixTemplate {
    pub template: String,
    pub keys: Vec<String>,
}

impl TableSuffixTemplate {
    pub fn apply(&self, val: &PipelineMap) -> Option<String> {
        let values = self
            .keys
            .iter()
            .filter_map(|key| {
                let v = val.get(key)?;
                match v {
                    Value::Int8(v) => Some(v.to_string()),
                    Value::Int16(v) => Some(v.to_string()),
                    Value::Int32(v) => Some(v.to_string()),
                    Value::Int64(v) => Some(v.to_string()),
                    Value::Uint8(v) => Some(v.to_string()),
                    Value::Uint16(v) => Some(v.to_string()),
                    Value::Uint32(v) => Some(v.to_string()),
                    Value::Uint64(v) => Some(v.to_string()),
                    Value::String(v) => Some(v.clone()),
                    _ => None,
                }
            })
            .collect::<Vec<_>>();
        if values.len() != self.keys.len() {
            return None;
        }
        Some(self.template.format(&values))
    }
}

impl TryFrom<&Yaml> for TableSuffixTemplate {
    type Error = Error;

    fn try_from(value: &Yaml) -> Result<Self> {
        let name_template = value
            .as_str()
            .context(RequiredTableSuffixTemplateSnafu)?
            .to_string();

        let mut keys = Vec::new();
        for cap in NAME_TPL.captures_iter(&name_template) {
            ensure!(
                cap.len() >= 2,
                InvalidTableSuffixTemplateSnafu {
                    input: name_template.clone(),
                }
            );
            let key = cap[1].trim().to_string();
            keys.push(key);
        }

        let template = NAME_TPL
            .replace_all(&name_template, REPLACE_KEY)
            .to_string();

        Ok(TableSuffixTemplate { template, keys })
    }
}

#[cfg(test)]
mod tests {
    use yaml_rust::YamlLoader;

    use crate::tablesuffix::TableSuffixTemplate;

    #[test]
    fn test_table_suffix_parsing() {
        let yaml = r#"
        table_suffix: _${xxx}_${b}
        "#;
        let config = YamlLoader::load_from_str(yaml);
        assert!(config.is_ok());
        let config = config.unwrap()[0]["table_suffix"].clone();
        let name_template = TableSuffixTemplate::try_from(&config);
        assert!(name_template.is_ok());
        let name_template = name_template.unwrap();
        assert_eq!(name_template.template, "_{}_{}");
        assert_eq!(name_template.keys, vec!["xxx", "b"]);
    }
}