1use common_time::Timestamp;
16use datafusion_expr::{col, lit, Expr};
17use datatypes::timestamp::TimestampNanosecond;
18
19use crate::error::{InvalidPipelineVersionSnafu, Result};
20use crate::table::{
21 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME,
22};
23use crate::PipelineVersion;
24
25pub fn to_pipeline_version(version_str: Option<&str>) -> Result<PipelineVersion> {
26 match version_str {
27 Some(version) => {
28 let ts = Timestamp::from_str_utc(version)
29 .map_err(|_| InvalidPipelineVersionSnafu { version }.build())?;
30 Ok(Some(TimestampNanosecond(ts)))
31 }
32 None => Ok(None),
33 }
34}
35
36pub(crate) fn prepare_dataframe_conditions(name: &str, version: PipelineVersion) -> Expr {
37 let mut conditions = vec![col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name))];
38
39 if let Some(v) = version {
40 conditions
41 .push(col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())));
42 }
43
44 conditions.into_iter().reduce(Expr::and).unwrap()
45}
46
47pub(crate) fn generate_pipeline_cache_key(
48 schema: &str,
49 name: &str,
50 version: PipelineVersion,
51) -> String {
52 match version {
53 Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)),
54 None => format!("{}/{}/latest", schema, name),
55 }
56}
57
58pub(crate) fn generate_pipeline_cache_key_suffix(name: &str, version: PipelineVersion) -> String {
59 match version {
60 Some(version) => format!("/{}/{}", name, i64::from(version)),
61 None => format!("/{}/latest", name),
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68
69 #[test]
70 fn test_to_pipeline_version() {
71 let none_result = to_pipeline_version(None);
72 assert!(none_result.is_ok());
73 assert!(none_result.unwrap().is_none());
74
75 let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z"));
76 assert!(some_result.is_ok());
77 assert_eq!(
78 some_result.unwrap(),
79 Some(TimestampNanosecond::new(1672531200000000000))
80 );
81
82 let invalid = to_pipeline_version(Some("invalid"));
83 assert!(invalid.is_err());
84 }
85
86 #[test]
87 fn test_generate_pipeline_cache_key() {
88 let schema = "test_schema";
89 let name = "test_name";
90 let latest = generate_pipeline_cache_key(schema, name, None);
91 assert_eq!(latest, "test_schema/test_name/latest");
92
93 let versioned = generate_pipeline_cache_key(
94 schema,
95 name,
96 Some(TimestampNanosecond::new(1672531200000000000)),
97 );
98 assert_eq!(versioned, "test_schema/test_name/1672531200000000000");
99 }
100}