pipeline/
tablesuffix.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use dyn_fmt::AsStrFormatExt;
16use regex::Regex;
17use snafu::{ensure, OptionExt};
18use yaml_rust::Yaml;
19
20use crate::error::{
21    Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result,
22};
23use crate::{PipelineMap, Value};
24
25const REPLACE_KEY: &str = "{}";
26
27lazy_static::lazy_static! {
28    static ref NAME_TPL: Regex = Regex::new(r"\$\{([^}]+)\}").unwrap();
29}
30
31/// TableSuffixTemplate is used to generate suffix for the table name, so that the input data can be written to multiple tables.
32/// The config should be placed at the end of the pipeline.
33/// Use `${variable}` to refer to the variable in the pipeline context, the viarable can be from input data or be a processed result.
34/// Note the variable should be an integer number or a string.
35/// In case of any error occurs during runtime, no suffix will be added to the table name.
36///
37/// ```yaml
38/// table_suffix: _${xxx}_${b}
39/// ```
40///
41/// For example, if the template is `_${xxx}_${b}`, and the pipeline context is
42/// `{"xxx": "123", "b": "456"}`, the generated table name will be `_123_456`.
43#[derive(Debug, PartialEq)]
44pub(crate) struct TableSuffixTemplate {
45    pub template: String,
46    pub keys: Vec<String>,
47}
48
49impl TableSuffixTemplate {
50    pub fn apply(&self, val: &PipelineMap) -> Option<String> {
51        let values = self
52            .keys
53            .iter()
54            .filter_map(|key| {
55                let v = val.get(key)?;
56                match v {
57                    Value::Int8(v) => Some(v.to_string()),
58                    Value::Int16(v) => Some(v.to_string()),
59                    Value::Int32(v) => Some(v.to_string()),
60                    Value::Int64(v) => Some(v.to_string()),
61                    Value::Uint8(v) => Some(v.to_string()),
62                    Value::Uint16(v) => Some(v.to_string()),
63                    Value::Uint32(v) => Some(v.to_string()),
64                    Value::Uint64(v) => Some(v.to_string()),
65                    Value::String(v) => Some(v.clone()),
66                    _ => None,
67                }
68            })
69            .collect::<Vec<_>>();
70        if values.len() != self.keys.len() {
71            return None;
72        }
73        Some(self.template.format(&values))
74    }
75}
76
77impl TryFrom<&Yaml> for TableSuffixTemplate {
78    type Error = Error;
79
80    fn try_from(value: &Yaml) -> Result<Self> {
81        let name_template = value
82            .as_str()
83            .context(RequiredTableSuffixTemplateSnafu)?
84            .to_string();
85
86        let mut keys = Vec::new();
87        for cap in NAME_TPL.captures_iter(&name_template) {
88            ensure!(
89                cap.len() >= 2,
90                InvalidTableSuffixTemplateSnafu {
91                    input: name_template.clone(),
92                }
93            );
94            let key = cap[1].trim().to_string();
95            keys.push(key);
96        }
97
98        let template = NAME_TPL
99            .replace_all(&name_template, REPLACE_KEY)
100            .to_string();
101
102        Ok(TableSuffixTemplate { template, keys })
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use yaml_rust::YamlLoader;
109
110    use crate::tablesuffix::TableSuffixTemplate;
111
112    #[test]
113    fn test_table_suffix_parsing() {
114        let yaml = r#"
115        table_suffix: _${xxx}_${b}
116        "#;
117        let config = YamlLoader::load_from_str(yaml);
118        assert!(config.is_ok());
119        let config = config.unwrap()[0]["table_suffix"].clone();
120        let name_template = TableSuffixTemplate::try_from(&config);
121        assert!(name_template.is_ok());
122        let name_template = name_template.unwrap();
123        assert_eq!(name_template.template, "_{}_{}");
124        assert_eq!(name_template.keys, vec!["xxx", "b"]);
125    }
126}