pipeline/manager/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_time::Timestamp;
16use datafusion_expr::{col, lit, Expr};
17use datatypes::timestamp::TimestampNanosecond;
18
19use crate::error::{InvalidPipelineVersionSnafu, Result};
20use crate::table::{
21    PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME,
22};
23use crate::PipelineVersion;
24
25pub fn to_pipeline_version(version_str: Option<&str>) -> Result<PipelineVersion> {
26    match version_str {
27        Some(version) => {
28            let ts = Timestamp::from_str_utc(version)
29                .map_err(|_| InvalidPipelineVersionSnafu { version }.build())?;
30            Ok(Some(TimestampNanosecond(ts)))
31        }
32        None => Ok(None),
33    }
34}
35
36pub(crate) fn prepare_dataframe_conditions(name: &str, version: PipelineVersion) -> Expr {
37    let mut conditions = vec![col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name))];
38
39    if let Some(v) = version {
40        conditions
41            .push(col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())));
42    }
43
44    conditions.into_iter().reduce(Expr::and).unwrap()
45}
46
47pub(crate) fn generate_pipeline_cache_key(
48    schema: &str,
49    name: &str,
50    version: PipelineVersion,
51) -> String {
52    match version {
53        Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)),
54        None => format!("{}/{}/latest", schema, name),
55    }
56}
57
58pub(crate) fn generate_pipeline_cache_key_suffix(name: &str, version: PipelineVersion) -> String {
59    match version {
60        Some(version) => format!("/{}/{}", name, i64::from(version)),
61        None => format!("/{}/latest", name),
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68
69    #[test]
70    fn test_to_pipeline_version() {
71        let none_result = to_pipeline_version(None);
72        assert!(none_result.is_ok());
73        assert!(none_result.unwrap().is_none());
74
75        let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z"));
76        assert!(some_result.is_ok());
77        assert_eq!(
78            some_result.unwrap(),
79            Some(TimestampNanosecond::new(1672531200000000000))
80        );
81
82        let invalid = to_pipeline_version(Some("invalid"));
83        assert!(invalid.is_err());
84    }
85
86    #[test]
87    fn test_generate_pipeline_cache_key() {
88        let schema = "test_schema";
89        let name = "test_name";
90        let latest = generate_pipeline_cache_key(schema, name, None);
91        assert_eq!(latest, "test_schema/test_name/latest");
92
93        let versioned = generate_pipeline_cache_key(
94            schema,
95            name,
96            Some(TimestampNanosecond::new(1672531200000000000)),
97        );
98        assert_eq!(versioned, "test_schema/test_name/1672531200000000000");
99    }
100}