pipeline/manager/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
21    RowInsertRequests, Rows, SemanticType,
22};
23use common_query::OutputData;
24use common_recordbatch::util as record_util;
25use common_telemetry::{debug, info};
26use common_time::timestamp::{TimeUnit, Timestamp};
27use datafusion::logical_expr::col;
28use datafusion_common::{TableReference, ToDFSchema};
29use datafusion_expr::{DmlStatement, LogicalPlan};
30use datatypes::prelude::ScalarVector;
31use datatypes::timestamp::TimestampNanosecond;
32use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
33use moka::sync::Cache;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::dataframe::DataFrame;
37use query::QueryEngineRef;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{ensure, OptionExt, ResultExt};
40use table::metadata::TableInfo;
41use table::TableRef;
42
43use crate::error::{
44    BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
45    ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
46    PipelineNotFoundSnafu, Result,
47};
48use crate::etl::{parse, Content, Pipeline};
49use crate::manager::{PipelineInfo, PipelineVersion};
50use crate::util::{generate_pipeline_cache_key, prepare_dataframe_conditions};
51
52pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
53pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
54pub(crate) const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
55const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
56const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
57pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
58
59/// Pipeline table cache size.
60const PIPELINES_CACHE_SIZE: u64 = 10000;
61/// Pipeline table cache time to live.
62const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
63
64/// PipelineTable is a table that stores the pipeline schema and content.
65/// Every catalog has its own pipeline table.
66pub struct PipelineTable {
67    inserter: InserterRef,
68    statement_executor: StatementExecutorRef,
69    table: TableRef,
70    query_engine: QueryEngineRef,
71    pipelines: Cache<String, Arc<Pipeline>>,
72    original_pipelines: Cache<String, (String, TimestampNanosecond)>,
73}
74
75impl PipelineTable {
76    /// Create a new PipelineTable.
77    pub fn new(
78        inserter: InserterRef,
79        statement_executor: StatementExecutorRef,
80        table: TableRef,
81        query_engine: QueryEngineRef,
82    ) -> Self {
83        Self {
84            inserter,
85            statement_executor,
86            table,
87            query_engine,
88            pipelines: Cache::builder()
89                .max_capacity(PIPELINES_CACHE_SIZE)
90                .time_to_live(PIPELINES_CACHE_TTL)
91                .build(),
92            original_pipelines: Cache::builder()
93                .max_capacity(PIPELINES_CACHE_SIZE)
94                .time_to_live(PIPELINES_CACHE_TTL)
95                .build(),
96        }
97    }
98
99    /// Build the schema for the pipeline table.
100    /// Returns the (time index, primary keys, column) definitions.
101    pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
102        (
103            PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
104            vec![
105                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
106                PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
107                PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
108            ],
109            vec![
110                ColumnDef {
111                    name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
112                    data_type: ColumnDataType::String as i32,
113                    is_nullable: false,
114                    default_constraint: vec![],
115                    semantic_type: SemanticType::Tag as i32,
116                    comment: "".to_string(),
117                    datatype_extension: None,
118                    options: None,
119                },
120                ColumnDef {
121                    name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
122                    data_type: ColumnDataType::String as i32,
123                    is_nullable: false,
124                    default_constraint: vec![],
125                    semantic_type: SemanticType::Tag as i32,
126                    comment: "".to_string(),
127                    datatype_extension: None,
128                    options: None,
129                },
130                ColumnDef {
131                    name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
132                    data_type: ColumnDataType::String as i32,
133                    is_nullable: false,
134                    default_constraint: vec![],
135                    semantic_type: SemanticType::Tag as i32,
136                    comment: "".to_string(),
137                    datatype_extension: None,
138                    options: None,
139                },
140                ColumnDef {
141                    name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
142                    data_type: ColumnDataType::String as i32,
143                    is_nullable: false,
144                    default_constraint: vec![],
145                    semantic_type: SemanticType::Field as i32,
146                    comment: "".to_string(),
147                    datatype_extension: None,
148                    options: None,
149                },
150                ColumnDef {
151                    name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
152                    data_type: ColumnDataType::TimestampNanosecond as i32,
153                    is_nullable: false,
154                    default_constraint: vec![],
155                    semantic_type: SemanticType::Timestamp as i32,
156                    comment: "".to_string(),
157                    datatype_extension: None,
158                    options: None,
159                },
160            ],
161        )
162    }
163
164    /// Build the column schemas for inserting a row into the pipeline table.
165    fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
166        vec![
167            PbColumnSchema {
168                column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
169                datatype: ColumnDataType::String.into(),
170                semantic_type: SemanticType::Tag.into(),
171                ..Default::default()
172            },
173            PbColumnSchema {
174                column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
175                datatype: ColumnDataType::String.into(),
176                semantic_type: SemanticType::Tag.into(),
177                ..Default::default()
178            },
179            PbColumnSchema {
180                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
181                datatype: ColumnDataType::String.into(),
182                semantic_type: SemanticType::Tag.into(),
183                ..Default::default()
184            },
185            PbColumnSchema {
186                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
187                datatype: ColumnDataType::String.into(),
188                semantic_type: SemanticType::Field.into(),
189                ..Default::default()
190            },
191            PbColumnSchema {
192                column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
193                datatype: ColumnDataType::TimestampNanosecond.into(),
194                semantic_type: SemanticType::Timestamp.into(),
195                ..Default::default()
196            },
197        ]
198    }
199
200    fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
201        QueryContextBuilder::default()
202            .current_catalog(table_info.catalog_name.to_string())
203            .current_schema(table_info.schema_name.to_string())
204            .build()
205            .into()
206    }
207
208    /// Compile a pipeline from a string.
209    pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
210        let yaml_content = Content::Yaml(pipeline);
211        parse(&yaml_content)
212    }
213
214    /// Insert a pipeline into the pipeline table.
215    async fn insert_pipeline_to_pipeline_table(
216        &self,
217        schema: &str,
218        name: &str,
219        content_type: &str,
220        pipeline: &str,
221    ) -> Result<Timestamp> {
222        let now = Timestamp::current_time(TimeUnit::Nanosecond);
223
224        let table_info = self.table.table_info();
225
226        let insert = RowInsertRequest {
227            table_name: PIPELINE_TABLE_NAME.to_string(),
228            rows: Some(Rows {
229                schema: Self::build_insert_column_schemas(),
230                rows: vec![Row {
231                    values: vec![
232                        ValueData::StringValue(name.to_string()).into(),
233                        ValueData::StringValue(schema.to_string()).into(),
234                        ValueData::StringValue(content_type.to_string()).into(),
235                        ValueData::StringValue(pipeline.to_string()).into(),
236                        ValueData::TimestampNanosecondValue(now.value()).into(),
237                    ],
238                }],
239            }),
240        };
241
242        let requests = RowInsertRequests {
243            inserts: vec![insert],
244        };
245
246        let output = self
247            .inserter
248            .handle_row_inserts(
249                requests,
250                Self::query_ctx(&table_info),
251                &self.statement_executor,
252            )
253            .await
254            .context(InsertPipelineSnafu)?;
255
256        info!(
257            "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
258            name,
259            table_info.full_table_name(),
260            output
261        );
262
263        Ok(now)
264    }
265
266    /// Get a pipeline by name.
267    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
268    pub async fn get_pipeline(
269        &self,
270        schema: &str,
271        name: &str,
272        version: PipelineVersion,
273    ) -> Result<Arc<Pipeline>> {
274        if let Some(pipeline) = self
275            .pipelines
276            .get(&generate_pipeline_cache_key(schema, name, version))
277        {
278            return Ok(pipeline);
279        }
280
281        let pipeline = self.get_pipeline_str(schema, name, version).await?;
282        let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
283
284        self.pipelines.insert(
285            generate_pipeline_cache_key(schema, name, version),
286            compiled_pipeline.clone(),
287        );
288        Ok(compiled_pipeline)
289    }
290
291    /// Get a original pipeline by name.
292    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
293    pub async fn get_pipeline_str(
294        &self,
295        schema: &str,
296        name: &str,
297        version: PipelineVersion,
298    ) -> Result<(String, TimestampNanosecond)> {
299        if let Some(pipeline) = self
300            .original_pipelines
301            .get(&generate_pipeline_cache_key(schema, name, version))
302        {
303            return Ok(pipeline);
304        }
305        let pipeline = self
306            .find_pipeline(schema, name, version)
307            .await?
308            .context(PipelineNotFoundSnafu { name, version })?;
309        self.original_pipelines.insert(
310            generate_pipeline_cache_key(schema, name, version),
311            pipeline.clone(),
312        );
313        Ok(pipeline)
314    }
315
316    /// Insert a pipeline into the pipeline table and compile it.
317    /// The compiled pipeline will be inserted into the cache.
318    pub async fn insert_and_compile(
319        &self,
320        schema: &str,
321        name: &str,
322        content_type: &str,
323        pipeline: &str,
324    ) -> Result<PipelineInfo> {
325        let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
326        // we will use the version in the future
327        let version = self
328            .insert_pipeline_to_pipeline_table(schema, name, content_type, pipeline)
329            .await?;
330
331        {
332            self.pipelines.insert(
333                generate_pipeline_cache_key(schema, name, None),
334                compiled_pipeline.clone(),
335            );
336            self.pipelines.insert(
337                generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
338                compiled_pipeline.clone(),
339            );
340
341            self.original_pipelines.insert(
342                generate_pipeline_cache_key(schema, name, None),
343                (pipeline.to_owned(), TimestampNanosecond(version)),
344            );
345            self.original_pipelines.insert(
346                generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
347                (pipeline.to_owned(), TimestampNanosecond(version)),
348            );
349        }
350
351        Ok((version, compiled_pipeline))
352    }
353
354    pub async fn delete_pipeline(
355        &self,
356        schema: &str,
357        name: &str,
358        version: PipelineVersion,
359    ) -> Result<Option<()>> {
360        // 0. version is ensured at the http api level not None
361        ensure!(
362            version.is_some(),
363            InvalidPipelineVersionSnafu { version: "None" }
364        );
365
366        // 1. check pipeline exist in catalog
367        let pipeline = self.find_pipeline(schema, name, version).await?;
368        if pipeline.is_none() {
369            return Ok(None);
370        }
371
372        // 2. prepare dataframe
373        let dataframe = self
374            .query_engine
375            .read_table(self.table.clone())
376            .context(DataFrameSnafu)?;
377        let DataFrame::DataFusion(dataframe) = dataframe;
378
379        let dataframe = dataframe
380            .filter(prepare_dataframe_conditions(schema, name, version))
381            .context(BuildDfLogicalPlanSnafu)?;
382
383        // 3. prepare dml stmt
384        let table_info = self.table.table_info();
385        let table_name = TableReference::full(
386            table_info.catalog_name.clone(),
387            table_info.schema_name.clone(),
388            table_info.name.clone(),
389        );
390
391        let df_schema = Arc::new(
392            table_info
393                .meta
394                .schema
395                .arrow_schema()
396                .clone()
397                .to_dfschema()
398                .context(BuildDfLogicalPlanSnafu)?,
399        );
400
401        // create dml stmt
402        let stmt = DmlStatement::new(
403            table_name,
404            df_schema,
405            datafusion_expr::WriteOp::Delete,
406            Arc::new(dataframe.into_parts().1),
407        );
408
409        let plan = LogicalPlan::Dml(stmt);
410
411        // 4. execute dml stmt
412        let output = self
413            .query_engine
414            .execute(plan, Self::query_ctx(&table_info))
415            .await
416            .context(ExecuteInternalStatementSnafu)?;
417
418        info!(
419            "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
420            name,
421            version,
422            table_info.full_table_name(),
423            output
424        );
425
426        // remove cache with version and latest
427        self.pipelines
428            .remove(&generate_pipeline_cache_key(schema, name, version));
429        self.pipelines
430            .remove(&generate_pipeline_cache_key(schema, name, None));
431        self.original_pipelines
432            .remove(&generate_pipeline_cache_key(schema, name, version));
433        self.original_pipelines
434            .remove(&generate_pipeline_cache_key(schema, name, None));
435
436        Ok(Some(()))
437    }
438
439    async fn find_pipeline(
440        &self,
441        schema: &str,
442        name: &str,
443        version: PipelineVersion,
444    ) -> Result<Option<(String, TimestampNanosecond)>> {
445        // 1. prepare dataframe
446        let dataframe = self
447            .query_engine
448            .read_table(self.table.clone())
449            .context(DataFrameSnafu)?;
450        let DataFrame::DataFusion(dataframe) = dataframe;
451
452        let dataframe = dataframe
453            .filter(prepare_dataframe_conditions(schema, name, version))
454            .context(BuildDfLogicalPlanSnafu)?
455            .select_columns(&[
456                PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
457                PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
458            ])
459            .context(BuildDfLogicalPlanSnafu)?
460            .sort(vec![
461                col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
462            ])
463            .context(BuildDfLogicalPlanSnafu)?
464            .limit(0, Some(1))
465            .context(BuildDfLogicalPlanSnafu)?;
466
467        let plan = dataframe.into_parts().1;
468
469        let table_info = self.table.table_info();
470
471        debug!("find_pipeline_by_name: plan: {:?}", plan);
472
473        // 2. execute plan
474        let output = self
475            .query_engine
476            .execute(plan, Self::query_ctx(&table_info))
477            .await
478            .context(ExecuteInternalStatementSnafu)?;
479        let stream = match output.data {
480            OutputData::Stream(stream) => stream,
481            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
482            _ => unreachable!(),
483        };
484
485        // 3. construct result
486        let records = record_util::collect(stream)
487            .await
488            .context(CollectRecordsSnafu)?;
489
490        if records.is_empty() {
491            return Ok(None);
492        }
493
494        // limit 1
495        ensure!(
496            records.len() == 1 && records[0].num_columns() == 2,
497            PipelineNotFoundSnafu { name, version }
498        );
499
500        let pipeline_content_column = records[0].column(0);
501        let pipeline_content = pipeline_content_column
502            .as_any()
503            .downcast_ref::<StringVector>()
504            .with_context(|| CastTypeSnafu {
505                msg: format!(
506                    "can't downcast {:?} array into string vector",
507                    pipeline_content_column.data_type()
508                ),
509            })?;
510
511        let pipeline_created_at_column = records[0].column(1);
512        let pipeline_created_at = pipeline_created_at_column
513            .as_any()
514            .downcast_ref::<TimestampNanosecondVector>()
515            .with_context(|| CastTypeSnafu {
516                msg: format!(
517                    "can't downcast {:?} array into scalar vector",
518                    pipeline_created_at_column.data_type()
519                ),
520            })?;
521
522        debug!(
523            "find_pipeline_by_name: pipeline_content: {:?}, pipeline_created_at: {:?}",
524            pipeline_content, pipeline_created_at
525        );
526
527        ensure!(
528            pipeline_content.len() == 1,
529            PipelineNotFoundSnafu { name, version }
530        );
531
532        // Safety: asserted above
533        Ok(Some((
534            pipeline_content.get_data(0).unwrap().to_string(),
535            pipeline_created_at.get_data(0).unwrap(),
536        )))
537    }
538}