pipeline/manager/
pipeline_operator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17use std::time::Instant;
18
19use api::v1::CreateTableExpr;
20use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
21use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
22use common_telemetry::info;
23use datatypes::timestamp::TimestampNanosecond;
24use futures::FutureExt;
25use operator::insert::InserterRef;
26use operator::statement::StatementExecutorRef;
27use query::QueryEngineRef;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use table::TableRef;
31
32use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
33use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion};
34use crate::metrics::{
35    METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM,
36    METRIC_PIPELINE_RETRIEVE_HISTOGRAM,
37};
38use crate::table::{PipelineTable, PIPELINE_TABLE_NAME};
39use crate::Pipeline;
40
41/// PipelineOperator is responsible for managing pipelines.
42/// It provides the ability to:
43/// - Create a pipeline table if it does not exist
44/// - Get a pipeline from the pipeline table
45/// - Insert a pipeline into the pipeline table
46/// - Compile a pipeline
47/// - Add a pipeline table to the cache
48/// - Get a pipeline table from the cache
49pub struct PipelineOperator {
50    inserter: InserterRef,
51    statement_executor: StatementExecutorRef,
52    catalog_manager: CatalogManagerRef,
53    query_engine: QueryEngineRef,
54    tables: RwLock<HashMap<String, PipelineTableRef>>,
55}
56
57impl PipelineOperator {
58    /// Create a table request for the pipeline table.
59    fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
60        let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();
61
62        let create_table_expr = CreateTableExpr {
63            catalog_name: catalog.to_string(),
64            schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
65            table_name: PIPELINE_TABLE_NAME.to_string(),
66            desc: "GreptimeDB pipeline table for Log".to_string(),
67            column_defs,
68            time_index,
69            primary_keys,
70            create_if_not_exists: true,
71            table_options: Default::default(),
72            table_id: None, // Should and will be assigned by Meta.
73            engine: default_engine().to_string(),
74        };
75
76        RegisterSystemTableRequest {
77            create_table_expr,
78            open_hook: None,
79        }
80    }
81
82    fn add_pipeline_table_to_cache(&self, catalog: &str, table: TableRef) {
83        let mut tables = self.tables.write().unwrap();
84        if tables.contains_key(catalog) {
85            return;
86        }
87        tables.insert(
88            catalog.to_string(),
89            Arc::new(PipelineTable::new(
90                self.inserter.clone(),
91                self.statement_executor.clone(),
92                table,
93                self.query_engine.clone(),
94            )),
95        );
96    }
97
98    async fn create_pipeline_table_if_not_exists(&self, ctx: QueryContextRef) -> Result<()> {
99        let catalog = ctx.current_catalog();
100
101        // exist in cache
102        if self.get_pipeline_table_from_cache(catalog).is_some() {
103            return Ok(());
104        }
105
106        let RegisterSystemTableRequest {
107            create_table_expr: mut expr,
108            open_hook: _,
109        } = self.create_table_request(catalog);
110
111        // exist in catalog, just open
112        if let Some(table) = self
113            .catalog_manager
114            .table(
115                &expr.catalog_name,
116                &expr.schema_name,
117                &expr.table_name,
118                Some(&ctx),
119            )
120            .await
121            .context(CatalogSnafu)?
122        {
123            self.add_pipeline_table_to_cache(catalog, table);
124            return Ok(());
125        }
126
127        // create table
128        self.statement_executor
129            .create_table_inner(&mut expr, None, ctx.clone())
130            .await
131            .context(CreateTableSnafu)?;
132
133        let schema = &expr.schema_name;
134        let table_name = &expr.table_name;
135
136        // get from catalog
137        let table = self
138            .catalog_manager
139            .table(catalog, schema, table_name, Some(&ctx))
140            .await
141            .context(CatalogSnafu)?
142            .context(PipelineTableNotFoundSnafu)?;
143
144        info!(
145            "Created pipelines table {} with table id {}.",
146            table.table_info().full_table_name(),
147            table.table_info().table_id()
148        );
149
150        // put to cache
151        self.add_pipeline_table_to_cache(catalog, table);
152
153        Ok(())
154    }
155
156    /// Get a pipeline table from the cache.
157    pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
158        self.tables.read().unwrap().get(catalog).cloned()
159    }
160}
161
162impl PipelineOperator {
163    /// Create a new PipelineOperator.
164    pub fn new(
165        inserter: InserterRef,
166        statement_executor: StatementExecutorRef,
167        catalog_manager: CatalogManagerRef,
168        query_engine: QueryEngineRef,
169    ) -> Self {
170        Self {
171            inserter,
172            statement_executor,
173            catalog_manager,
174            tables: RwLock::new(HashMap::new()),
175            query_engine,
176        }
177    }
178
179    /// Get a pipeline from the pipeline table.
180    pub async fn get_pipeline(
181        &self,
182        query_ctx: QueryContextRef,
183        name: &str,
184        version: PipelineVersion,
185    ) -> Result<Arc<Pipeline>> {
186        let schema = query_ctx.current_schema();
187        self.create_pipeline_table_if_not_exists(query_ctx.clone())
188            .await?;
189
190        let timer = Instant::now();
191        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
192            .context(PipelineTableNotFoundSnafu)?
193            .get_pipeline(&schema, name, version)
194            .inspect(|re| {
195                METRIC_PIPELINE_RETRIEVE_HISTOGRAM
196                    .with_label_values(&[&re.is_ok().to_string()])
197                    .observe(timer.elapsed().as_secs_f64())
198            })
199            .await
200    }
201
202    /// Get a original pipeline by name.
203    pub async fn get_pipeline_str(
204        &self,
205        name: &str,
206        version: PipelineVersion,
207        query_ctx: QueryContextRef,
208    ) -> Result<(String, TimestampNanosecond)> {
209        let schema = query_ctx.current_schema();
210        self.create_pipeline_table_if_not_exists(query_ctx.clone())
211            .await?;
212
213        let timer = Instant::now();
214        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
215            .context(PipelineTableNotFoundSnafu)?
216            .get_pipeline_str(&schema, name, version)
217            .inspect(|re| {
218                METRIC_PIPELINE_RETRIEVE_HISTOGRAM
219                    .with_label_values(&[&re.is_ok().to_string()])
220                    .observe(timer.elapsed().as_secs_f64())
221            })
222            .await
223    }
224
225    /// Insert a pipeline into the pipeline table.
226    pub async fn insert_pipeline(
227        &self,
228        name: &str,
229        content_type: &str,
230        pipeline: &str,
231        query_ctx: QueryContextRef,
232    ) -> Result<PipelineInfo> {
233        self.create_pipeline_table_if_not_exists(query_ctx.clone())
234            .await?;
235
236        let timer = Instant::now();
237        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
238            .context(PipelineTableNotFoundSnafu)?
239            .insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline)
240            .inspect(|re| {
241                METRIC_PIPELINE_CREATE_HISTOGRAM
242                    .with_label_values(&[&re.is_ok().to_string()])
243                    .observe(timer.elapsed().as_secs_f64())
244            })
245            .await
246    }
247
248    /// Delete a pipeline by name from pipeline table.
249    pub async fn delete_pipeline(
250        &self,
251        name: &str,
252        version: PipelineVersion,
253        query_ctx: QueryContextRef,
254    ) -> Result<Option<()>> {
255        // trigger load pipeline table
256        self.create_pipeline_table_if_not_exists(query_ctx.clone())
257            .await?;
258
259        let timer = Instant::now();
260        self.get_pipeline_table_from_cache(query_ctx.current_catalog())
261            .context(PipelineTableNotFoundSnafu)?
262            .delete_pipeline(&query_ctx.current_schema(), name, version)
263            .inspect(|re| {
264                METRIC_PIPELINE_DELETE_HISTOGRAM
265                    .with_label_values(&[&re.is_ok().to_string()])
266                    .observe(timer.elapsed().as_secs_f64())
267            })
268            .await
269    }
270
271    /// Compile a pipeline.
272    pub fn build_pipeline(pipeline: &str) -> Result<Pipeline> {
273        PipelineTable::compile_pipeline(pipeline)
274    }
275}