pipeline/manager/
pipeline_cache.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use datatypes::timestamp::TimestampNanosecond;
19use moka::sync::Cache;
20
21use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result};
22use crate::etl::Pipeline;
23use crate::manager::PipelineVersion;
24use crate::table::EMPTY_SCHEMA_NAME;
25use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix};
26
27const PIPELINES_CACHE_SIZE: u64 = 10000;
29const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
31
32pub(crate) struct PipelineCache {
35 pipelines: Cache<String, Arc<Pipeline>>,
36 original_pipelines: Cache<String, (String, TimestampNanosecond)>,
37}
38
39impl PipelineCache {
40 pub(crate) fn new() -> Self {
41 Self {
42 pipelines: Cache::builder()
43 .max_capacity(PIPELINES_CACHE_SIZE)
44 .time_to_live(PIPELINES_CACHE_TTL)
45 .build(),
46 original_pipelines: Cache::builder()
47 .max_capacity(PIPELINES_CACHE_SIZE)
48 .time_to_live(PIPELINES_CACHE_TTL)
49 .build(),
50 }
51 }
52
53 pub(crate) fn insert_pipeline_cache(
54 &self,
55 schema: &str,
56 name: &str,
57 version: PipelineVersion,
58 pipeline: Arc<Pipeline>,
59 with_latest: bool,
60 ) {
61 insert_cache_generic(
62 &self.pipelines,
63 schema,
64 name,
65 version,
66 pipeline,
67 with_latest,
68 );
69 }
70
71 pub(crate) fn insert_pipeline_str_cache(
72 &self,
73 schema: &str,
74 name: &str,
75 version: PipelineVersion,
76 pipeline: (String, TimestampNanosecond),
77 with_latest: bool,
78 ) {
79 insert_cache_generic(
80 &self.original_pipelines,
81 schema,
82 name,
83 version,
84 pipeline,
85 with_latest,
86 );
87 }
88
89 pub(crate) fn get_pipeline_cache(
90 &self,
91 schema: &str,
92 name: &str,
93 version: PipelineVersion,
94 ) -> Result<Option<Arc<Pipeline>>> {
95 get_cache_generic(&self.pipelines, schema, name, version)
96 }
97
98 pub(crate) fn get_pipeline_str_cache(
99 &self,
100 schema: &str,
101 name: &str,
102 version: PipelineVersion,
103 ) -> Result<Option<(String, TimestampNanosecond)>> {
104 get_cache_generic(&self.original_pipelines, schema, name, version)
105 }
106
107 pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) {
109 let version_suffix = generate_pipeline_cache_key_suffix(name, version);
110 let latest_suffix = generate_pipeline_cache_key_suffix(name, None);
111
112 let ks = self
113 .pipelines
114 .iter()
115 .filter_map(|(k, _)| {
116 if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) {
117 Some(k.clone())
118 } else {
119 None
120 }
121 })
122 .collect::<Vec<_>>();
123
124 for k in ks {
125 let k = k.as_str();
126 self.pipelines.remove(k);
127 self.original_pipelines.remove(k);
128 }
129 }
130}
131
132fn insert_cache_generic<T: Clone + Send + Sync + 'static>(
133 cache: &Cache<String, T>,
134 schema: &str,
135 name: &str,
136 version: PipelineVersion,
137 value: T,
138 with_latest: bool,
139) {
140 let k = generate_pipeline_cache_key(schema, name, version);
141 cache.insert(k, value.clone());
142 if with_latest {
143 let k = generate_pipeline_cache_key(schema, name, None);
144 cache.insert(k, value);
145 }
146}
147
148fn get_cache_generic<T: Clone + Send + Sync + 'static>(
149 cache: &Cache<String, T>,
150 schema: &str,
151 name: &str,
152 version: PipelineVersion,
153) -> Result<Option<T>> {
154 let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
156 if let Some(value) = cache.get(&k) {
157 return Ok(Some(value));
158 }
159 let k = generate_pipeline_cache_key(schema, name, version);
161 if let Some(value) = cache.get(&k) {
162 return Ok(Some(value));
163 }
164
165 let suffix_key = generate_pipeline_cache_key_suffix(name, version);
167 let mut ks = cache
168 .iter()
169 .filter(|e| e.0.ends_with(&suffix_key))
170 .collect::<Vec<_>>();
171
172 match ks.len() {
173 0 => Ok(None),
174 1 => Ok(Some(ks.remove(0).1)),
175 _ => MultiPipelineWithDiffSchemaSnafu {
176 schemas: ks
177 .iter()
178 .filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
179 .collect::<Vec<_>>()
180 .join(","),
181 }
182 .fail()?,
183 }
184}