pipeline/manager/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20    RowInsertRequests, Rows, SemanticType,
21};
22use common_query::OutputData;
23use common_recordbatch::util as record_util;
24use common_telemetry::{debug, info};
25use common_time::timestamp::{TimeUnit, Timestamp};
26use datafusion::datasource::DefaultTableSource;
27use datafusion::logical_expr::col;
28use datafusion_common::TableReference;
29use datafusion_expr::{DmlStatement, LogicalPlan};
30use datatypes::prelude::ScalarVector;
31use datatypes::timestamp::TimestampNanosecond;
32use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
33use itertools::Itertools;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::dataframe::DataFrame;
37use query::QueryEngineRef;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{ensure, OptionExt, ResultExt};
40use table::metadata::TableInfo;
41use table::table::adapter::DfTableProviderAdapter;
42use table::TableRef;
43
44use crate::error::{
45    BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
46    ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
47    MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
48};
49use crate::etl::{parse, Content, Pipeline};
50use crate::manager::pipeline_cache::PipelineCache;
51use crate::manager::{PipelineInfo, PipelineVersion};
52use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
53use crate::util::prepare_dataframe_conditions;
54
55pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
56pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
57const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
58const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
59const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
60pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
61pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
62
63/// PipelineTable is a table that stores the pipeline schema and content.
64/// Every catalog has its own pipeline table.
65pub struct PipelineTable {
66    inserter: InserterRef,
67    statement_executor: StatementExecutorRef,
68    table: TableRef,
69    query_engine: QueryEngineRef,
70    cache: PipelineCache,
71}
72
73impl PipelineTable {
74    /// Create a new PipelineTable.
75    pub fn new(
76        inserter: InserterRef,
77        statement_executor: StatementExecutorRef,
78        table: TableRef,
79        query_engine: QueryEngineRef,
80    ) -> Self {
81        Self {
82            inserter,
83            statement_executor,
84            table,
85            query_engine,
86            cache: PipelineCache::new(),
87        }
88    }
89
90    /// Build the schema for the pipeline table.
91    /// Returns the (time index, primary keys, column) definitions.
92    pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
93        (
94            PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
95            vec![
96                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
97                PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
98                PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
99            ],
100            vec![
101                ColumnDef {
102                    name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
103                    data_type: ColumnDataType::String as i32,
104                    is_nullable: false,
105                    default_constraint: vec![],
106                    semantic_type: SemanticType::Tag as i32,
107                    comment: "".to_string(),
108                    datatype_extension: None,
109                    options: None,
110                },
111                ColumnDef {
112                    name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
113                    data_type: ColumnDataType::String as i32,
114                    is_nullable: false,
115                    default_constraint: vec![],
116                    semantic_type: SemanticType::Tag as i32,
117                    comment: "".to_string(),
118                    datatype_extension: None,
119                    options: None,
120                },
121                ColumnDef {
122                    name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
123                    data_type: ColumnDataType::String as i32,
124                    is_nullable: false,
125                    default_constraint: vec![],
126                    semantic_type: SemanticType::Tag as i32,
127                    comment: "".to_string(),
128                    datatype_extension: None,
129                    options: None,
130                },
131                ColumnDef {
132                    name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
133                    data_type: ColumnDataType::String as i32,
134                    is_nullable: false,
135                    default_constraint: vec![],
136                    semantic_type: SemanticType::Field as i32,
137                    comment: "".to_string(),
138                    datatype_extension: None,
139                    options: None,
140                },
141                ColumnDef {
142                    name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
143                    data_type: ColumnDataType::TimestampNanosecond as i32,
144                    is_nullable: false,
145                    default_constraint: vec![],
146                    semantic_type: SemanticType::Timestamp as i32,
147                    comment: "".to_string(),
148                    datatype_extension: None,
149                    options: None,
150                },
151            ],
152        )
153    }
154
155    /// Build the column schemas for inserting a row into the pipeline table.
156    fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
157        vec![
158            PbColumnSchema {
159                column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
160                datatype: ColumnDataType::String.into(),
161                semantic_type: SemanticType::Tag.into(),
162                ..Default::default()
163            },
164            PbColumnSchema {
165                column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
166                datatype: ColumnDataType::String.into(),
167                semantic_type: SemanticType::Tag.into(),
168                ..Default::default()
169            },
170            PbColumnSchema {
171                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
172                datatype: ColumnDataType::String.into(),
173                semantic_type: SemanticType::Tag.into(),
174                ..Default::default()
175            },
176            PbColumnSchema {
177                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
178                datatype: ColumnDataType::String.into(),
179                semantic_type: SemanticType::Field.into(),
180                ..Default::default()
181            },
182            PbColumnSchema {
183                column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
184                datatype: ColumnDataType::TimestampNanosecond.into(),
185                semantic_type: SemanticType::Timestamp.into(),
186                ..Default::default()
187            },
188        ]
189    }
190
191    fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
192        QueryContextBuilder::default()
193            .current_catalog(table_info.catalog_name.to_string())
194            .current_schema(table_info.schema_name.to_string())
195            .build()
196            .into()
197    }
198
199    /// Compile a pipeline from a string.
200    pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
201        let yaml_content = Content::Yaml(pipeline);
202        parse(&yaml_content)
203    }
204
205    /// Insert a pipeline into the pipeline table.
206    async fn insert_pipeline_to_pipeline_table(
207        &self,
208        name: &str,
209        content_type: &str,
210        pipeline: &str,
211    ) -> Result<Timestamp> {
212        let now = Timestamp::current_time(TimeUnit::Nanosecond);
213
214        let table_info = self.table.table_info();
215
216        let insert = RowInsertRequest {
217            table_name: PIPELINE_TABLE_NAME.to_string(),
218            rows: Some(Rows {
219                schema: Self::build_insert_column_schemas(),
220                rows: vec![Row {
221                    values: vec![
222                        ValueData::StringValue(name.to_string()).into(),
223                        ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
224                        ValueData::StringValue(content_type.to_string()).into(),
225                        ValueData::StringValue(pipeline.to_string()).into(),
226                        ValueData::TimestampNanosecondValue(now.value()).into(),
227                    ],
228                }],
229            }),
230        };
231
232        let requests = RowInsertRequests {
233            inserts: vec![insert],
234        };
235
236        let output = self
237            .inserter
238            .handle_row_inserts(
239                requests,
240                Self::query_ctx(&table_info),
241                &self.statement_executor,
242                false,
243                false,
244            )
245            .await
246            .context(InsertPipelineSnafu)?;
247
248        info!(
249            "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
250            name,
251            table_info.full_table_name(),
252            output
253        );
254
255        Ok(now)
256    }
257
258    /// Get a pipeline by name.
259    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
260    pub async fn get_pipeline(
261        &self,
262        schema: &str,
263        name: &str,
264        version: PipelineVersion,
265    ) -> Result<Arc<Pipeline>> {
266        if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
267            return Ok(pipeline);
268        }
269
270        let pipeline = self.get_pipeline_str(schema, name, version).await?;
271        let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
272
273        self.cache
274            .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
275        Ok(compiled_pipeline)
276    }
277
278    /// Get a original pipeline by name.
279    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
280    pub async fn get_pipeline_str(
281        &self,
282        schema: &str,
283        name: &str,
284        version: PipelineVersion,
285    ) -> Result<(String, TimestampNanosecond)> {
286        if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
287            return Ok(pipeline);
288        }
289
290        let mut pipeline_vec;
291        match self.find_pipeline(name, version).await {
292            Ok(p) => {
293                METRIC_PIPELINE_TABLE_FIND_COUNT
294                    .with_label_values(&["true"])
295                    .inc();
296                pipeline_vec = p;
297            }
298            Err(e) => {
299                match e {
300                    Error::CollectRecords { .. } => {
301                        // if collect records failed, it means the pipeline table is temporary invalid
302                        // we should use failover cache
303                        METRIC_PIPELINE_TABLE_FIND_COUNT
304                            .with_label_values(&["false"])
305                            .inc();
306                        return self
307                            .cache
308                            .get_failover_cache(schema, name, version)?
309                            .ok_or(PipelineNotFoundSnafu { name, version }.build());
310                    }
311                    _ => {
312                        // if other error, we should return it
313                        return Err(e);
314                    }
315                }
316            }
317        };
318        ensure!(
319            !pipeline_vec.is_empty(),
320            PipelineNotFoundSnafu { name, version }
321        );
322
323        // if the result is exact one, use it
324        if pipeline_vec.len() == 1 {
325            let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
326            let p = (pipeline_content, version);
327            self.cache.insert_pipeline_str_cache(
328                &found_schema,
329                name,
330                Some(version),
331                p.clone(),
332                false,
333            );
334            return Ok(p);
335        }
336
337        // check if there's empty schema pipeline
338        // if there isn't, check current schema
339        let pipeline = pipeline_vec
340            .iter()
341            .find(|v| v.1 == EMPTY_SCHEMA_NAME)
342            .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
343
344        // multiple pipeline with no empty or current schema
345        // throw an error
346        let (pipeline_content, found_schema, version) =
347            pipeline.context(MultiPipelineWithDiffSchemaSnafu {
348                schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
349            })?;
350
351        let v = *version;
352        let p = (pipeline_content.clone(), v);
353        self.cache
354            .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
355        Ok(p)
356    }
357
358    /// Insert a pipeline into the pipeline table and compile it.
359    /// The compiled pipeline will be inserted into the cache.
360    /// Newly created pipelines will be saved under empty schema.
361    pub async fn insert_and_compile(
362        &self,
363        name: &str,
364        content_type: &str,
365        pipeline: &str,
366    ) -> Result<PipelineInfo> {
367        let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
368        // we will use the version in the future
369        let version = self
370            .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
371            .await?;
372
373        {
374            self.cache.insert_pipeline_cache(
375                EMPTY_SCHEMA_NAME,
376                name,
377                Some(TimestampNanosecond(version)),
378                compiled_pipeline.clone(),
379                true,
380            );
381
382            self.cache.insert_pipeline_str_cache(
383                EMPTY_SCHEMA_NAME,
384                name,
385                Some(TimestampNanosecond(version)),
386                (pipeline.to_owned(), TimestampNanosecond(version)),
387                true,
388            );
389        }
390
391        Ok((version, compiled_pipeline))
392    }
393
394    pub async fn delete_pipeline(
395        &self,
396        name: &str,
397        version: PipelineVersion,
398    ) -> Result<Option<()>> {
399        // 0. version is ensured at the http api level not None
400        ensure!(
401            version.is_some(),
402            InvalidPipelineVersionSnafu { version: "None" }
403        );
404
405        // 1. check pipeline exist in catalog
406        let pipeline = self.find_pipeline(name, version).await?;
407        if pipeline.is_empty() {
408            return Ok(None);
409        }
410
411        // 2. prepare dataframe
412        let dataframe = self
413            .query_engine
414            .read_table(self.table.clone())
415            .context(DataFrameSnafu)?;
416        let DataFrame::DataFusion(dataframe) = dataframe;
417
418        let dataframe = dataframe
419            .filter(prepare_dataframe_conditions(name, version))
420            .context(BuildDfLogicalPlanSnafu)?;
421
422        // 3. prepare dml stmt
423        let table_info = self.table.table_info();
424        let table_name = TableReference::full(
425            table_info.catalog_name.clone(),
426            table_info.schema_name.clone(),
427            table_info.name.clone(),
428        );
429
430        let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
431        let table_source = Arc::new(DefaultTableSource::new(table_provider));
432
433        // create dml stmt
434        let stmt = DmlStatement::new(
435            table_name,
436            table_source,
437            datafusion_expr::WriteOp::Delete,
438            Arc::new(dataframe.into_parts().1),
439        );
440
441        let plan = LogicalPlan::Dml(stmt);
442
443        // 4. execute dml stmt
444        let output = self
445            .query_engine
446            .execute(plan, Self::query_ctx(&table_info))
447            .await
448            .context(ExecuteInternalStatementSnafu)?;
449
450        info!(
451            "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
452            name,
453            version,
454            table_info.full_table_name(),
455            output
456        );
457
458        // remove cache with version and latest
459        self.cache.remove_cache(name, version);
460
461        Ok(Some(()))
462    }
463
464    // find all pipelines with name and version
465    // cloud be multiple with different schema
466    // return format: (pipeline content, schema, created_at)
467    async fn find_pipeline(
468        &self,
469        name: &str,
470        version: PipelineVersion,
471    ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
472        // 1. prepare dataframe
473        let dataframe = self
474            .query_engine
475            .read_table(self.table.clone())
476            .context(DataFrameSnafu)?;
477        let DataFrame::DataFusion(dataframe) = dataframe;
478
479        // select all pipelines with name and version
480        let dataframe = dataframe
481            .filter(prepare_dataframe_conditions(name, version))
482            .context(BuildDfLogicalPlanSnafu)?
483            .select_columns(&[
484                PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
485                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
486                PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
487            ])
488            .context(BuildDfLogicalPlanSnafu)?
489            .sort(vec![
490                col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
491            ])
492            .context(BuildDfLogicalPlanSnafu)?;
493
494        let plan = dataframe.into_parts().1;
495
496        let table_info = self.table.table_info();
497
498        debug!("find_pipeline_by_name: plan: {:?}", plan);
499
500        // 2. execute plan
501        let output = self
502            .query_engine
503            .execute(plan, Self::query_ctx(&table_info))
504            .await
505            .context(ExecuteInternalStatementSnafu)?;
506        let stream = match output.data {
507            OutputData::Stream(stream) => stream,
508            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
509            _ => unreachable!(),
510        };
511
512        // 3. construct result
513        let records = record_util::collect(stream)
514            .await
515            .context(CollectRecordsSnafu)?;
516
517        if records.is_empty() {
518            return Ok(vec![]);
519        }
520
521        ensure!(
522            !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
523            PipelineNotFoundSnafu { name, version }
524        );
525
526        let mut re = Vec::with_capacity(records.len());
527        for r in records {
528            let pipeline_content_column = r.column(0);
529            let pipeline_content = pipeline_content_column
530                .as_any()
531                .downcast_ref::<StringVector>()
532                .with_context(|| CastTypeSnafu {
533                    msg: format!(
534                        "can't downcast {:?} array into string vector",
535                        pipeline_content_column.data_type()
536                    ),
537                })?;
538
539            let pipeline_schema_column = r.column(1);
540            let pipeline_schema = pipeline_schema_column
541                .as_any()
542                .downcast_ref::<StringVector>()
543                .with_context(|| CastTypeSnafu {
544                    msg: format!(
545                        "can't downcast {:?} array into string vector",
546                        pipeline_schema_column.data_type()
547                    ),
548                })?;
549
550            let pipeline_created_at_column = r.column(2);
551            let pipeline_created_at = pipeline_created_at_column
552                .as_any()
553                .downcast_ref::<TimestampNanosecondVector>()
554                .with_context(|| CastTypeSnafu {
555                    msg: format!(
556                        "can't downcast {:?} array into scalar vector",
557                        pipeline_created_at_column.data_type()
558                    ),
559                })?;
560
561            debug!(
562                "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
563                pipeline_content, pipeline_schema, pipeline_created_at
564            );
565
566            ensure!(
567                pipeline_content.len() == pipeline_schema.len()
568                    && pipeline_schema.len() == pipeline_created_at.len(),
569                RecordBatchLenNotMatchSnafu
570            );
571
572            let len = pipeline_content.len();
573            for i in 0..len {
574                re.push((
575                    pipeline_content.get_data(i).unwrap().to_string(),
576                    pipeline_schema.get_data(i).unwrap().to_string(),
577                    pipeline_created_at.get_data(i).unwrap(),
578                ));
579            }
580        }
581
582        Ok(re)
583    }
584}