pipeline/manager/
pipeline_operator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17use std::time::Instant;
18
19use api::v1::CreateTableExpr;
20use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
21use common_catalog::consts::{DEFAULT_PRIVATE_SCHEMA_NAME, default_engine};
22use common_telemetry::info;
23use common_time::FOREVER;
24use datatypes::timestamp::TimestampNanosecond;
25use futures::FutureExt;
26use operator::insert::InserterRef;
27use operator::statement::StatementExecutorRef;
28use query::QueryEngineRef;
29use session::context::QueryContextRef;
30use snafu::{OptionExt, ResultExt};
31use table::TableRef;
32use table::requests::TTL_KEY;
33
34use crate::Pipeline;
35use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
36use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion};
37use crate::metrics::{
38    METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM,
39    METRIC_PIPELINE_RETRIEVE_HISTOGRAM,
40};
41use crate::table::{PIPELINE_TABLE_NAME, PipelineTable};
42
43/// PipelineOperator is responsible for managing pipelines.
44/// It provides the ability to:
45/// - Create a pipeline table if it does not exist
46/// - Get a pipeline from the pipeline table
47/// - Insert a pipeline into the pipeline table
48/// - Compile a pipeline
49/// - Add a pipeline table to the cache
50/// - Get a pipeline table from the cache
51pub struct PipelineOperator {
52    inserter: InserterRef,
53    statement_executor: StatementExecutorRef,
54    catalog_manager: CatalogManagerRef,
55    query_engine: QueryEngineRef,
56    tables: RwLock<HashMap<String, PipelineTableRef>>,
57}
58
59impl PipelineOperator {
60    /// Create a table request for the pipeline table.
61    fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
62        let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();
63
64        let mut table_options = HashMap::new();
65        table_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
66
67        let create_table_expr = CreateTableExpr {
68            catalog_name: catalog.to_string(),
69            schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
70            table_name: PIPELINE_TABLE_NAME.to_string(),
71            desc: "GreptimeDB pipeline table for Log".to_string(),
72            column_defs,
73            time_index,
74            primary_keys,
75            create_if_not_exists: true,
76            table_options,
77            table_id: None, // Should and will be assigned by Meta.
78            engine: default_engine().to_string(),
79        };
80
81        RegisterSystemTableRequest {
82            create_table_expr,
83            open_hook: None,
84        }
85    }
86
87    fn add_pipeline_table_to_cache(&self, catalog: &str, table: TableRef) {
88        let mut tables = self.tables.write().unwrap();
89        if tables.contains_key(catalog) {
90            return;
91        }
92        tables.insert(
93            catalog.to_string(),
94            Arc::new(PipelineTable::new(
95                self.inserter.clone(),
96                self.statement_executor.clone(),
97                table,
98                self.query_engine.clone(),
99            )),
100        );
101    }
102
103    async fn create_pipeline_table_if_not_exists(&self, ctx: QueryContextRef) -> Result<()> {
104        let catalog = ctx.current_catalog();
105
106        // exist in cache
107        if self.get_pipeline_table_from_cache(catalog).is_some() {
108            return Ok(());
109        }
110
111        let RegisterSystemTableRequest {
112            create_table_expr: mut expr,
113            open_hook: _,
114        } = self.create_table_request(catalog);
115
116        // exist in catalog, just open
117        if let Some(table) = self
118            .catalog_manager
119            .table(
120                &expr.catalog_name,
121                &expr.schema_name,
122                &expr.table_name,
123                Some(&ctx),
124            )
125            .await
126            .context(CatalogSnafu)?
127        {
128            self.add_pipeline_table_to_cache(catalog, table);
129            return Ok(());
130        }
131
132        // create table
133        self.statement_executor
134            .create_table_inner(&mut expr, None, ctx.clone())
135            .await
136            .context(CreateTableSnafu)?;
137
138        let schema = &expr.schema_name;
139        let table_name = &expr.table_name;
140
141        // get from catalog
142        let table = self
143            .catalog_manager
144            .table(catalog, schema, table_name, Some(&ctx))
145            .await
146            .context(CatalogSnafu)?
147            .context(PipelineTableNotFoundSnafu)?;
148
149        info!(
150            "Created pipelines table {} with table id {}.",
151            table.table_info().full_table_name(),
152            table.table_info().table_id()
153        );
154
155        // put to cache
156        self.add_pipeline_table_to_cache(catalog, table);
157
158        Ok(())
159    }
160
161    /// Get a pipeline table from the cache.
162    pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
163        self.tables.read().unwrap().get(catalog).cloned()
164    }
165}
166
167impl PipelineOperator {
168    /// Create a new PipelineOperator.
169    pub fn new(
170        inserter: InserterRef,
171        statement_executor: StatementExecutorRef,
172        catalog_manager: CatalogManagerRef,
173        query_engine: QueryEngineRef,
174    ) -> Self {
175        Self {
176            inserter,
177            statement_executor,
178            catalog_manager,
179            tables: RwLock::new(HashMap::new()),
180            query_engine,
181        }
182    }
183
184    /// Get a pipeline from the pipeline table.
185    pub async fn get_pipeline(
186        &self,
187        query_ctx: QueryContextRef,
188        name: &str,
189        version: PipelineVersion,
190    ) -> Result<Arc<Pipeline>> {
191        let schema = query_ctx.current_schema();
192        self.create_pipeline_table_if_not_exists(query_ctx.clone())
193            .await?;
194
195        let timer = Instant::now();
196        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
197            .context(PipelineTableNotFoundSnafu)?
198            .get_pipeline(&schema, name, version)
199            .inspect(|re| {
200                METRIC_PIPELINE_RETRIEVE_HISTOGRAM
201                    .with_label_values(&[&re.is_ok().to_string()])
202                    .observe(timer.elapsed().as_secs_f64())
203            })
204            .await
205    }
206
207    /// Get a original pipeline by name.
208    pub async fn get_pipeline_str(
209        &self,
210        name: &str,
211        version: PipelineVersion,
212        query_ctx: QueryContextRef,
213    ) -> Result<(String, TimestampNanosecond)> {
214        let schema = query_ctx.current_schema();
215        self.create_pipeline_table_if_not_exists(query_ctx.clone())
216            .await?;
217
218        let timer = Instant::now();
219        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
220            .context(PipelineTableNotFoundSnafu)?
221            .get_pipeline_str(&schema, name, version)
222            .inspect(|re| {
223                METRIC_PIPELINE_RETRIEVE_HISTOGRAM
224                    .with_label_values(&[&re.is_ok().to_string()])
225                    .observe(timer.elapsed().as_secs_f64())
226            })
227            .await
228            .map(|p| (p.content, p.version))
229    }
230
231    /// Insert a pipeline into the pipeline table.
232    pub async fn insert_pipeline(
233        &self,
234        name: &str,
235        content_type: &str,
236        pipeline: &str,
237        query_ctx: QueryContextRef,
238    ) -> Result<PipelineInfo> {
239        self.create_pipeline_table_if_not_exists(query_ctx.clone())
240            .await?;
241
242        let timer = Instant::now();
243        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
244            .context(PipelineTableNotFoundSnafu)?
245            .insert_and_compile(name, content_type, pipeline)
246            .inspect(|re| {
247                METRIC_PIPELINE_CREATE_HISTOGRAM
248                    .with_label_values(&[&re.is_ok().to_string()])
249                    .observe(timer.elapsed().as_secs_f64())
250            })
251            .await
252    }
253
254    /// Delete a pipeline by name from pipeline table.
255    pub async fn delete_pipeline(
256        &self,
257        name: &str,
258        version: PipelineVersion,
259        query_ctx: QueryContextRef,
260    ) -> Result<Option<()>> {
261        // trigger load pipeline table
262        self.create_pipeline_table_if_not_exists(query_ctx.clone())
263            .await?;
264
265        let timer = Instant::now();
266        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
267            .context(PipelineTableNotFoundSnafu)?
268            .delete_pipeline(name, version)
269            .inspect(|re| {
270                METRIC_PIPELINE_DELETE_HISTOGRAM
271                    .with_label_values(&[&re.is_ok().to_string()])
272                    .observe(timer.elapsed().as_secs_f64())
273            })
274            .await
275    }
276
277    /// Compile a pipeline.
278    pub fn build_pipeline(pipeline: &str) -> Result<Pipeline> {
279        PipelineTable::compile_pipeline(pipeline)
280    }
281}