1use std::sync::Arc;
16use std::time::Duration;
17
18use datatypes::timestamp::TimestampNanosecond;
19use moka::sync::Cache;
20
21use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result};
22use crate::etl::Pipeline;
23use crate::manager::PipelineVersion;
24use crate::table::EMPTY_SCHEMA_NAME;
25use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix};
26
27const PIPELINES_CACHE_SIZE: u64 = 10000;
29const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
31
32pub(crate) struct PipelineCache {
35 pipelines: Cache<String, Arc<Pipeline>>,
36 original_pipelines: Cache<String, (String, TimestampNanosecond)>,
37 failover_cache: Cache<String, (String, TimestampNanosecond)>,
40}
41
42impl PipelineCache {
43 pub(crate) fn new() -> Self {
44 Self {
45 pipelines: Cache::builder()
46 .max_capacity(PIPELINES_CACHE_SIZE)
47 .time_to_live(PIPELINES_CACHE_TTL)
48 .build(),
49 original_pipelines: Cache::builder()
50 .max_capacity(PIPELINES_CACHE_SIZE)
51 .time_to_live(PIPELINES_CACHE_TTL)
52 .build(),
53 failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
54 }
55 }
56
57 pub(crate) fn insert_pipeline_cache(
58 &self,
59 schema: &str,
60 name: &str,
61 version: PipelineVersion,
62 pipeline: Arc<Pipeline>,
63 with_latest: bool,
64 ) {
65 insert_cache_generic(
66 &self.pipelines,
67 schema,
68 name,
69 version,
70 pipeline.clone(),
71 with_latest,
72 );
73 }
74
75 pub(crate) fn insert_pipeline_str_cache(
76 &self,
77 schema: &str,
78 name: &str,
79 version: PipelineVersion,
80 pipeline: (String, TimestampNanosecond),
81 with_latest: bool,
82 ) {
83 insert_cache_generic(
84 &self.original_pipelines,
85 schema,
86 name,
87 version,
88 pipeline.clone(),
89 with_latest,
90 );
91 insert_cache_generic(
92 &self.failover_cache,
93 schema,
94 name,
95 version,
96 pipeline,
97 with_latest,
98 );
99 }
100
101 pub(crate) fn get_pipeline_cache(
102 &self,
103 schema: &str,
104 name: &str,
105 version: PipelineVersion,
106 ) -> Result<Option<Arc<Pipeline>>> {
107 get_cache_generic(&self.pipelines, schema, name, version)
108 }
109
110 pub(crate) fn get_failover_cache(
111 &self,
112 schema: &str,
113 name: &str,
114 version: PipelineVersion,
115 ) -> Result<Option<(String, TimestampNanosecond)>> {
116 get_cache_generic(&self.failover_cache, schema, name, version)
117 }
118
119 pub(crate) fn get_pipeline_str_cache(
120 &self,
121 schema: &str,
122 name: &str,
123 version: PipelineVersion,
124 ) -> Result<Option<(String, TimestampNanosecond)>> {
125 get_cache_generic(&self.original_pipelines, schema, name, version)
126 }
127
128 pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) {
130 let version_suffix = generate_pipeline_cache_key_suffix(name, version);
131 let latest_suffix = generate_pipeline_cache_key_suffix(name, None);
132
133 let ks = self
134 .pipelines
135 .iter()
136 .filter_map(|(k, _)| {
137 if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) {
138 Some(k.clone())
139 } else {
140 None
141 }
142 })
143 .collect::<Vec<_>>();
144
145 for k in ks {
146 let k = k.as_str();
147 self.pipelines.remove(k);
148 self.original_pipelines.remove(k);
149 self.failover_cache.remove(k);
150 }
151 }
152}
153
154fn insert_cache_generic<T: Clone + Send + Sync + 'static>(
155 cache: &Cache<String, T>,
156 schema: &str,
157 name: &str,
158 version: PipelineVersion,
159 value: T,
160 with_latest: bool,
161) {
162 let k = generate_pipeline_cache_key(schema, name, version);
163 cache.insert(k, value.clone());
164 if with_latest {
165 let k = generate_pipeline_cache_key(schema, name, None);
166 cache.insert(k, value);
167 }
168}
169
170fn get_cache_generic<T: Clone + Send + Sync + 'static>(
171 cache: &Cache<String, T>,
172 schema: &str,
173 name: &str,
174 version: PipelineVersion,
175) -> Result<Option<T>> {
176 let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
178 if let Some(value) = cache.get(&k) {
179 return Ok(Some(value));
180 }
181 let k = generate_pipeline_cache_key(schema, name, version);
183 if let Some(value) = cache.get(&k) {
184 return Ok(Some(value));
185 }
186
187 let suffix_key = generate_pipeline_cache_key_suffix(name, version);
189 let mut ks = cache
190 .iter()
191 .filter(|e| e.0.ends_with(&suffix_key))
192 .collect::<Vec<_>>();
193
194 match ks.len() {
195 0 => Ok(None),
196 1 => Ok(Some(ks.remove(0).1)),
197 _ => MultiPipelineWithDiffSchemaSnafu {
198 schemas: ks
199 .iter()
200 .filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
201 .collect::<Vec<_>>()
202 .join(","),
203 }
204 .fail()?,
205 }
206}