pipeline/
tablesuffix.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use dyn_fmt::AsStrFormatExt;
16use regex::Regex;
17use snafu::{ensure, OptionExt};
18use vrl::value::Value as VrlValue;
19use yaml_rust::Yaml;
20
21use crate::error::{
22    Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result,
23};
24
25const REPLACE_KEY: &str = "{}";
26
27lazy_static::lazy_static! {
28    static ref NAME_TPL: Regex = Regex::new(r"\$\{([^}]+)\}").unwrap();
29}
30
31/// TableSuffixTemplate is used to generate suffix for the table name, so that the input data can be written to multiple tables.
32/// The config should be placed at the end of the pipeline.
33/// Use `${variable}` to refer to the variable in the pipeline context, the viarable can be from input data or be a processed result.
34/// Note the variable should be an integer number or a string.
35/// In case of any error occurs during runtime, no suffix will be added to the table name.
36///
37/// ```yaml
38/// table_suffix: _${xxx}_${b}
39/// ```
40///
41/// For example, if the template is `_${xxx}_${b}`, and the pipeline context is
42/// `{"xxx": "123", "b": "456"}`, the generated table name will be `_123_456`.
43#[derive(Debug, PartialEq)]
44pub(crate) struct TableSuffixTemplate {
45    pub template: String,
46    pub keys: Vec<String>,
47}
48
49impl TableSuffixTemplate {
50    pub fn apply(&self, val: &VrlValue) -> Option<String> {
51        let val = val.as_object()?;
52        let values = self
53            .keys
54            .iter()
55            .filter_map(|key| {
56                let v = val.get(key.as_str())?;
57                match v {
58                    VrlValue::Integer(v) => Some(v.to_string()),
59                    VrlValue::Bytes(v) => Some(String::from_utf8_lossy_owned(v.to_vec())),
60                    _ => None,
61                }
62            })
63            .collect::<Vec<_>>();
64        if values.len() != self.keys.len() {
65            return None;
66        }
67        Some(self.template.format(&values))
68    }
69}
70
71impl TryFrom<&Yaml> for TableSuffixTemplate {
72    type Error = Error;
73
74    fn try_from(value: &Yaml) -> Result<Self> {
75        let name_template = value
76            .as_str()
77            .context(RequiredTableSuffixTemplateSnafu)?
78            .to_string();
79
80        let mut keys = Vec::new();
81        for cap in NAME_TPL.captures_iter(&name_template) {
82            ensure!(
83                cap.len() >= 2,
84                InvalidTableSuffixTemplateSnafu {
85                    input: name_template.clone(),
86                }
87            );
88            let key = cap[1].trim().to_string();
89            keys.push(key);
90        }
91
92        let template = NAME_TPL
93            .replace_all(&name_template, REPLACE_KEY)
94            .to_string();
95
96        Ok(TableSuffixTemplate { template, keys })
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use yaml_rust::YamlLoader;
103
104    use crate::tablesuffix::TableSuffixTemplate;
105
106    #[test]
107    fn test_table_suffix_parsing() {
108        let yaml = r#"
109        table_suffix: _${xxx}_${b}
110        "#;
111        let config = YamlLoader::load_from_str(yaml);
112        assert!(config.is_ok());
113        let config = config.unwrap()[0]["table_suffix"].clone();
114        let name_template = TableSuffixTemplate::try_from(&config);
115        assert!(name_template.is_ok());
116        let name_template = name_template.unwrap();
117        assert_eq!(name_template.template, "_{}_{}");
118        assert_eq!(name_template.keys, vec!["xxx", "b"]);
119    }
120}