pipeline/manager/
pipeline_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use common_telemetry::debug;
19use datatypes::timestamp::TimestampNanosecond;
20use moka::sync::Cache;
21
22use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result};
23use crate::etl::Pipeline;
24use crate::manager::PipelineVersion;
25use crate::table::EMPTY_SCHEMA_NAME;
26use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix};
27
28/// Pipeline table cache size.
29const PIPELINES_CACHE_SIZE: u64 = 10000;
30/// Pipeline table cache time to live.
31const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
32
33/// Pipeline cache is located on a separate file on purpose,
34/// to encapsulate inner cache. Only public methods are exposed.
35pub(crate) struct PipelineCache {
36    pipelines: Cache<String, Arc<Pipeline>>,
37    original_pipelines: Cache<String, PipelineContent>,
38    /// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline
39    /// The failover cache never expires, but it will be updated when the pipelines cache is updated.
40    failover_cache: Cache<String, PipelineContent>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct PipelineContent {
45    pub name: String,
46    pub content: String,
47    pub version: TimestampNanosecond,
48    pub schema: String,
49}
50
51impl PipelineCache {
52    pub(crate) fn new() -> Self {
53        Self {
54            pipelines: Cache::builder()
55                .max_capacity(PIPELINES_CACHE_SIZE)
56                .time_to_live(PIPELINES_CACHE_TTL)
57                .name("pipelines")
58                .build(),
59            original_pipelines: Cache::builder()
60                .max_capacity(PIPELINES_CACHE_SIZE)
61                .time_to_live(PIPELINES_CACHE_TTL)
62                .name("original_pipelines")
63                .build(),
64            failover_cache: Cache::builder()
65                .max_capacity(PIPELINES_CACHE_SIZE)
66                .name("failover_cache")
67                .build(),
68        }
69    }
70
71    pub(crate) fn insert_pipeline_cache(
72        &self,
73        schema: &str,
74        name: &str,
75        version: PipelineVersion,
76        pipeline: Arc<Pipeline>,
77        with_latest: bool,
78    ) {
79        insert_cache_generic(
80            &self.pipelines,
81            schema,
82            name,
83            version,
84            pipeline.clone(),
85            with_latest,
86        );
87    }
88
89    pub(crate) fn insert_pipeline_str_cache(&self, pipeline: &PipelineContent, with_latest: bool) {
90        let schema = pipeline.schema.as_str();
91        let name = pipeline.name.as_str();
92        let version = pipeline.version;
93        insert_cache_generic(
94            &self.original_pipelines,
95            schema,
96            name,
97            Some(version),
98            pipeline.clone(),
99            with_latest,
100        );
101        insert_cache_generic(
102            &self.failover_cache,
103            schema,
104            name,
105            Some(version),
106            pipeline.clone(),
107            with_latest,
108        );
109    }
110
111    pub(crate) fn get_pipeline_cache(
112        &self,
113        schema: &str,
114        name: &str,
115        version: PipelineVersion,
116    ) -> Result<Option<Arc<Pipeline>>> {
117        get_cache_generic(&self.pipelines, schema, name, version)
118    }
119
120    pub(crate) fn get_failover_cache(
121        &self,
122        schema: &str,
123        name: &str,
124        version: PipelineVersion,
125    ) -> Result<Option<PipelineContent>> {
126        get_cache_generic(&self.failover_cache, schema, name, version)
127    }
128
129    pub(crate) fn get_pipeline_str_cache(
130        &self,
131        schema: &str,
132        name: &str,
133        version: PipelineVersion,
134    ) -> Result<Option<PipelineContent>> {
135        get_cache_generic(&self.original_pipelines, schema, name, version)
136    }
137
138    // remove cache with version and latest in all schemas
139    pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) {
140        let version_suffix = generate_pipeline_cache_key_suffix(name, version);
141        let latest_suffix = generate_pipeline_cache_key_suffix(name, None);
142
143        let ks = self
144            .pipelines
145            .iter()
146            .filter_map(|(k, _)| {
147                if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) {
148                    Some(k.clone())
149                } else {
150                    None
151                }
152            })
153            .collect::<Vec<_>>();
154
155        for k in ks {
156            let k = k.as_str();
157            self.pipelines.remove(k);
158            self.original_pipelines.remove(k);
159            self.failover_cache.remove(k);
160        }
161    }
162}
163
164fn insert_cache_generic<T: Clone + Send + Sync + 'static>(
165    cache: &Cache<String, T>,
166    schema: &str,
167    name: &str,
168    version: PipelineVersion,
169    value: T,
170    with_latest: bool,
171) {
172    let k = generate_pipeline_cache_key(schema, name, version);
173    cache.insert(k, value.clone());
174    if with_latest {
175        let k = generate_pipeline_cache_key(schema, name, None);
176        cache.insert(k, value);
177    }
178}
179
180fn get_cache_generic<T: Clone + Send + Sync + 'static>(
181    cache: &Cache<String, T>,
182    schema: &str,
183    name: &str,
184    version: PipelineVersion,
185) -> Result<Option<T>> {
186    // lets try empty schema first
187    let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
188    if let Some(value) = cache.get(&emp_key) {
189        return Ok(Some(value));
190    }
191    // use input schema
192    let schema_k = generate_pipeline_cache_key(schema, name, version);
193    if let Some(value) = cache.get(&schema_k) {
194        return Ok(Some(value));
195    }
196
197    // try all schemas
198    let suffix_key = generate_pipeline_cache_key_suffix(name, version);
199    let mut ks = cache
200        .iter()
201        .filter(|e| e.0.ends_with(&suffix_key))
202        .collect::<Vec<_>>();
203
204    match ks.len() {
205        0 => Ok(None),
206        1 => {
207            let (_, value) = ks.remove(0);
208            Ok(Some(value))
209        }
210        _ => {
211            debug!(
212                "caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
213                cache.iter().map(|e| e.0).collect::<Vec<_>>(),
214                emp_key,
215                schema_k,
216                suffix_key
217            );
218            MultiPipelineWithDiffSchemaSnafu {
219                name: name.to_string(),
220                current_schema: schema.to_string(),
221                schemas: ks
222                    .iter()
223                    .filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
224                    .collect::<Vec<_>>()
225                    .join(","),
226            }
227            .fail()?
228        }
229    }
230}