pipeline/manager/
pipeline_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use datatypes::timestamp::TimestampNanosecond;
19use moka::sync::Cache;
20
21use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result};
22use crate::etl::Pipeline;
23use crate::manager::PipelineVersion;
24use crate::table::EMPTY_SCHEMA_NAME;
25use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix};
26
27/// Pipeline table cache size.
28const PIPELINES_CACHE_SIZE: u64 = 10000;
29/// Pipeline table cache time to live.
30const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
31
32/// Pipeline cache is located on a separate file on purpose,
33/// to encapsulate inner cache. Only public methods are exposed.
34pub(crate) struct PipelineCache {
35    pipelines: Cache<String, Arc<Pipeline>>,
36    original_pipelines: Cache<String, (String, TimestampNanosecond)>,
37}
38
39impl PipelineCache {
40    pub(crate) fn new() -> Self {
41        Self {
42            pipelines: Cache::builder()
43                .max_capacity(PIPELINES_CACHE_SIZE)
44                .time_to_live(PIPELINES_CACHE_TTL)
45                .build(),
46            original_pipelines: Cache::builder()
47                .max_capacity(PIPELINES_CACHE_SIZE)
48                .time_to_live(PIPELINES_CACHE_TTL)
49                .build(),
50        }
51    }
52
53    pub(crate) fn insert_pipeline_cache(
54        &self,
55        schema: &str,
56        name: &str,
57        version: PipelineVersion,
58        pipeline: Arc<Pipeline>,
59        with_latest: bool,
60    ) {
61        insert_cache_generic(
62            &self.pipelines,
63            schema,
64            name,
65            version,
66            pipeline,
67            with_latest,
68        );
69    }
70
71    pub(crate) fn insert_pipeline_str_cache(
72        &self,
73        schema: &str,
74        name: &str,
75        version: PipelineVersion,
76        pipeline: (String, TimestampNanosecond),
77        with_latest: bool,
78    ) {
79        insert_cache_generic(
80            &self.original_pipelines,
81            schema,
82            name,
83            version,
84            pipeline,
85            with_latest,
86        );
87    }
88
89    pub(crate) fn get_pipeline_cache(
90        &self,
91        schema: &str,
92        name: &str,
93        version: PipelineVersion,
94    ) -> Result<Option<Arc<Pipeline>>> {
95        get_cache_generic(&self.pipelines, schema, name, version)
96    }
97
98    pub(crate) fn get_pipeline_str_cache(
99        &self,
100        schema: &str,
101        name: &str,
102        version: PipelineVersion,
103    ) -> Result<Option<(String, TimestampNanosecond)>> {
104        get_cache_generic(&self.original_pipelines, schema, name, version)
105    }
106
107    // remove cache with version and latest in all schemas
108    pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) {
109        let version_suffix = generate_pipeline_cache_key_suffix(name, version);
110        let latest_suffix = generate_pipeline_cache_key_suffix(name, None);
111
112        let ks = self
113            .pipelines
114            .iter()
115            .filter_map(|(k, _)| {
116                if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) {
117                    Some(k.clone())
118                } else {
119                    None
120                }
121            })
122            .collect::<Vec<_>>();
123
124        for k in ks {
125            let k = k.as_str();
126            self.pipelines.remove(k);
127            self.original_pipelines.remove(k);
128        }
129    }
130}
131
132fn insert_cache_generic<T: Clone + Send + Sync + 'static>(
133    cache: &Cache<String, T>,
134    schema: &str,
135    name: &str,
136    version: PipelineVersion,
137    value: T,
138    with_latest: bool,
139) {
140    let k = generate_pipeline_cache_key(schema, name, version);
141    cache.insert(k, value.clone());
142    if with_latest {
143        let k = generate_pipeline_cache_key(schema, name, None);
144        cache.insert(k, value);
145    }
146}
147
148fn get_cache_generic<T: Clone + Send + Sync + 'static>(
149    cache: &Cache<String, T>,
150    schema: &str,
151    name: &str,
152    version: PipelineVersion,
153) -> Result<Option<T>> {
154    // lets try empty schema first
155    let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
156    if let Some(value) = cache.get(&k) {
157        return Ok(Some(value));
158    }
159    // use input schema
160    let k = generate_pipeline_cache_key(schema, name, version);
161    if let Some(value) = cache.get(&k) {
162        return Ok(Some(value));
163    }
164
165    // try all schemas
166    let suffix_key = generate_pipeline_cache_key_suffix(name, version);
167    let mut ks = cache
168        .iter()
169        .filter(|e| e.0.ends_with(&suffix_key))
170        .collect::<Vec<_>>();
171
172    match ks.len() {
173        0 => Ok(None),
174        1 => Ok(Some(ks.remove(0).1)),
175        _ => MultiPipelineWithDiffSchemaSnafu {
176            schemas: ks
177                .iter()
178                .filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
179                .collect::<Vec<_>>()
180                .join(","),
181        }
182        .fail()?,
183    }
184}