pipeline/manager/
pipeline_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use datatypes::timestamp::TimestampNanosecond;
19use moka::sync::Cache;
20
21use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result};
22use crate::etl::Pipeline;
23use crate::manager::PipelineVersion;
24use crate::table::EMPTY_SCHEMA_NAME;
25use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix};
26
27/// Pipeline table cache size.
28const PIPELINES_CACHE_SIZE: u64 = 10000;
29/// Pipeline table cache time to live.
30const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
31
32/// Pipeline cache is located on a separate file on purpose,
33/// to encapsulate inner cache. Only public methods are exposed.
34pub(crate) struct PipelineCache {
35    pipelines: Cache<String, Arc<Pipeline>>,
36    original_pipelines: Cache<String, (String, TimestampNanosecond)>,
37    /// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline
38    /// The failover cache never expires, but it will be updated when the pipelines cache is updated.
39    failover_cache: Cache<String, (String, TimestampNanosecond)>,
40}
41
42impl PipelineCache {
43    pub(crate) fn new() -> Self {
44        Self {
45            pipelines: Cache::builder()
46                .max_capacity(PIPELINES_CACHE_SIZE)
47                .time_to_live(PIPELINES_CACHE_TTL)
48                .build(),
49            original_pipelines: Cache::builder()
50                .max_capacity(PIPELINES_CACHE_SIZE)
51                .time_to_live(PIPELINES_CACHE_TTL)
52                .build(),
53            failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
54        }
55    }
56
57    pub(crate) fn insert_pipeline_cache(
58        &self,
59        schema: &str,
60        name: &str,
61        version: PipelineVersion,
62        pipeline: Arc<Pipeline>,
63        with_latest: bool,
64    ) {
65        insert_cache_generic(
66            &self.pipelines,
67            schema,
68            name,
69            version,
70            pipeline.clone(),
71            with_latest,
72        );
73    }
74
75    pub(crate) fn insert_pipeline_str_cache(
76        &self,
77        schema: &str,
78        name: &str,
79        version: PipelineVersion,
80        pipeline: (String, TimestampNanosecond),
81        with_latest: bool,
82    ) {
83        insert_cache_generic(
84            &self.original_pipelines,
85            schema,
86            name,
87            version,
88            pipeline.clone(),
89            with_latest,
90        );
91        insert_cache_generic(
92            &self.failover_cache,
93            schema,
94            name,
95            version,
96            pipeline,
97            with_latest,
98        );
99    }
100
101    pub(crate) fn get_pipeline_cache(
102        &self,
103        schema: &str,
104        name: &str,
105        version: PipelineVersion,
106    ) -> Result<Option<Arc<Pipeline>>> {
107        get_cache_generic(&self.pipelines, schema, name, version)
108    }
109
110    pub(crate) fn get_failover_cache(
111        &self,
112        schema: &str,
113        name: &str,
114        version: PipelineVersion,
115    ) -> Result<Option<(String, TimestampNanosecond)>> {
116        get_cache_generic(&self.failover_cache, schema, name, version)
117    }
118
119    pub(crate) fn get_pipeline_str_cache(
120        &self,
121        schema: &str,
122        name: &str,
123        version: PipelineVersion,
124    ) -> Result<Option<(String, TimestampNanosecond)>> {
125        get_cache_generic(&self.original_pipelines, schema, name, version)
126    }
127
128    // remove cache with version and latest in all schemas
129    pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) {
130        let version_suffix = generate_pipeline_cache_key_suffix(name, version);
131        let latest_suffix = generate_pipeline_cache_key_suffix(name, None);
132
133        let ks = self
134            .pipelines
135            .iter()
136            .filter_map(|(k, _)| {
137                if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) {
138                    Some(k.clone())
139                } else {
140                    None
141                }
142            })
143            .collect::<Vec<_>>();
144
145        for k in ks {
146            let k = k.as_str();
147            self.pipelines.remove(k);
148            self.original_pipelines.remove(k);
149            self.failover_cache.remove(k);
150        }
151    }
152}
153
154fn insert_cache_generic<T: Clone + Send + Sync + 'static>(
155    cache: &Cache<String, T>,
156    schema: &str,
157    name: &str,
158    version: PipelineVersion,
159    value: T,
160    with_latest: bool,
161) {
162    let k = generate_pipeline_cache_key(schema, name, version);
163    cache.insert(k, value.clone());
164    if with_latest {
165        let k = generate_pipeline_cache_key(schema, name, None);
166        cache.insert(k, value);
167    }
168}
169
170fn get_cache_generic<T: Clone + Send + Sync + 'static>(
171    cache: &Cache<String, T>,
172    schema: &str,
173    name: &str,
174    version: PipelineVersion,
175) -> Result<Option<T>> {
176    // lets try empty schema first
177    let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
178    if let Some(value) = cache.get(&k) {
179        return Ok(Some(value));
180    }
181    // use input schema
182    let k = generate_pipeline_cache_key(schema, name, version);
183    if let Some(value) = cache.get(&k) {
184        return Ok(Some(value));
185    }
186
187    // try all schemas
188    let suffix_key = generate_pipeline_cache_key_suffix(name, version);
189    let mut ks = cache
190        .iter()
191        .filter(|e| e.0.ends_with(&suffix_key))
192        .collect::<Vec<_>>();
193
194    match ks.len() {
195        0 => Ok(None),
196        1 => Ok(Some(ks.remove(0).1)),
197        _ => MultiPipelineWithDiffSchemaSnafu {
198            schemas: ks
199                .iter()
200                .filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
201                .collect::<Vec<_>>()
202                .join(","),
203        }
204        .fail()?,
205    }
206}