1use std::sync::Arc;
16use std::time::Duration;
17
18use common_telemetry::debug;
19use datatypes::timestamp::TimestampNanosecond;
20use moka::sync::Cache;
21
22use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result};
23use crate::etl::Pipeline;
24use crate::manager::PipelineVersion;
25use crate::table::EMPTY_SCHEMA_NAME;
26use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix};
27
28const PIPELINES_CACHE_SIZE: u64 = 10000;
30const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
32
33pub(crate) struct PipelineCache {
36 pipelines: Cache<String, Arc<Pipeline>>,
37 original_pipelines: Cache<String, PipelineContent>,
38 failover_cache: Cache<String, PipelineContent>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct PipelineContent {
45 pub name: String,
46 pub content: String,
47 pub version: TimestampNanosecond,
48 pub schema: String,
49}
50
51impl PipelineCache {
52 pub(crate) fn new() -> Self {
53 Self {
54 pipelines: Cache::builder()
55 .max_capacity(PIPELINES_CACHE_SIZE)
56 .time_to_live(PIPELINES_CACHE_TTL)
57 .name("pipelines")
58 .build(),
59 original_pipelines: Cache::builder()
60 .max_capacity(PIPELINES_CACHE_SIZE)
61 .time_to_live(PIPELINES_CACHE_TTL)
62 .name("original_pipelines")
63 .build(),
64 failover_cache: Cache::builder()
65 .max_capacity(PIPELINES_CACHE_SIZE)
66 .name("failover_cache")
67 .build(),
68 }
69 }
70
71 pub(crate) fn insert_pipeline_cache(
72 &self,
73 schema: &str,
74 name: &str,
75 version: PipelineVersion,
76 pipeline: Arc<Pipeline>,
77 with_latest: bool,
78 ) {
79 insert_cache_generic(
80 &self.pipelines,
81 schema,
82 name,
83 version,
84 pipeline.clone(),
85 with_latest,
86 );
87 }
88
89 pub(crate) fn insert_pipeline_str_cache(&self, pipeline: &PipelineContent, with_latest: bool) {
90 let schema = pipeline.schema.as_str();
91 let name = pipeline.name.as_str();
92 let version = pipeline.version;
93 insert_cache_generic(
94 &self.original_pipelines,
95 schema,
96 name,
97 Some(version),
98 pipeline.clone(),
99 with_latest,
100 );
101 insert_cache_generic(
102 &self.failover_cache,
103 schema,
104 name,
105 Some(version),
106 pipeline.clone(),
107 with_latest,
108 );
109 }
110
111 pub(crate) fn get_pipeline_cache(
112 &self,
113 schema: &str,
114 name: &str,
115 version: PipelineVersion,
116 ) -> Result<Option<Arc<Pipeline>>> {
117 get_cache_generic(&self.pipelines, schema, name, version)
118 }
119
120 pub(crate) fn get_failover_cache(
121 &self,
122 schema: &str,
123 name: &str,
124 version: PipelineVersion,
125 ) -> Result<Option<PipelineContent>> {
126 get_cache_generic(&self.failover_cache, schema, name, version)
127 }
128
129 pub(crate) fn get_pipeline_str_cache(
130 &self,
131 schema: &str,
132 name: &str,
133 version: PipelineVersion,
134 ) -> Result<Option<PipelineContent>> {
135 get_cache_generic(&self.original_pipelines, schema, name, version)
136 }
137
138 pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) {
140 let version_suffix = generate_pipeline_cache_key_suffix(name, version);
141 let latest_suffix = generate_pipeline_cache_key_suffix(name, None);
142
143 let ks = self
144 .pipelines
145 .iter()
146 .filter_map(|(k, _)| {
147 if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) {
148 Some(k.clone())
149 } else {
150 None
151 }
152 })
153 .collect::<Vec<_>>();
154
155 for k in ks {
156 let k = k.as_str();
157 self.pipelines.remove(k);
158 self.original_pipelines.remove(k);
159 self.failover_cache.remove(k);
160 }
161 }
162}
163
164fn insert_cache_generic<T: Clone + Send + Sync + 'static>(
165 cache: &Cache<String, T>,
166 schema: &str,
167 name: &str,
168 version: PipelineVersion,
169 value: T,
170 with_latest: bool,
171) {
172 let k = generate_pipeline_cache_key(schema, name, version);
173 cache.insert(k, value.clone());
174 if with_latest {
175 let k = generate_pipeline_cache_key(schema, name, None);
176 cache.insert(k, value);
177 }
178}
179
180fn get_cache_generic<T: Clone + Send + Sync + 'static>(
181 cache: &Cache<String, T>,
182 schema: &str,
183 name: &str,
184 version: PipelineVersion,
185) -> Result<Option<T>> {
186 let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
188 if let Some(value) = cache.get(&emp_key) {
189 return Ok(Some(value));
190 }
191 let schema_k = generate_pipeline_cache_key(schema, name, version);
193 if let Some(value) = cache.get(&schema_k) {
194 return Ok(Some(value));
195 }
196
197 let suffix_key = generate_pipeline_cache_key_suffix(name, version);
199 let mut ks = cache
200 .iter()
201 .filter(|e| e.0.ends_with(&suffix_key))
202 .collect::<Vec<_>>();
203
204 match ks.len() {
205 0 => Ok(None),
206 1 => {
207 let (_, value) = ks.remove(0);
208 Ok(Some(value))
209 }
210 _ => {
211 debug!(
212 "caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
213 cache.iter().map(|e| e.0).collect::<Vec<_>>(),
214 emp_key,
215 schema_k,
216 suffix_key
217 );
218 MultiPipelineWithDiffSchemaSnafu {
219 name: name.to_string(),
220 current_schema: schema.to_string(),
221 schemas: ks
222 .iter()
223 .filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
224 .collect::<Vec<_>>()
225 .join(","),
226 }
227 .fail()?
228 }
229 }
230}