pipeline/manager/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20    RowInsertRequests, Rows, SemanticType,
21};
22use common_query::OutputData;
23use common_recordbatch::util as record_util;
24use common_telemetry::{debug, info};
25use common_time::timestamp::{TimeUnit, Timestamp};
26use datafusion_common::{TableReference, ToDFSchema};
27use datafusion_expr::{col, DmlStatement, LogicalPlan};
28use datatypes::prelude::ScalarVector;
29use datatypes::timestamp::TimestampNanosecond;
30use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
31use itertools::Itertools;
32use operator::insert::InserterRef;
33use operator::statement::StatementExecutorRef;
34use query::dataframe::DataFrame;
35use query::QueryEngineRef;
36use session::context::{QueryContextBuilder, QueryContextRef};
37use snafu::{ensure, OptionExt, ResultExt};
38use table::metadata::TableInfo;
39use table::TableRef;
40
41use crate::error::{
42    BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
43    ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
44    MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
45};
46use crate::etl::{parse, Content, Pipeline};
47use crate::manager::pipeline_cache::PipelineCache;
48use crate::manager::{PipelineInfo, PipelineVersion};
49use crate::util::prepare_dataframe_conditions;
50
51pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
52pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
53const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
54const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
55const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
56pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
57pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
58
59/// PipelineTable is a table that stores the pipeline schema and content.
60/// Every catalog has its own pipeline table.
61pub struct PipelineTable {
62    inserter: InserterRef,
63    statement_executor: StatementExecutorRef,
64    table: TableRef,
65    query_engine: QueryEngineRef,
66    cache: PipelineCache,
67}
68
69impl PipelineTable {
70    /// Create a new PipelineTable.
71    pub fn new(
72        inserter: InserterRef,
73        statement_executor: StatementExecutorRef,
74        table: TableRef,
75        query_engine: QueryEngineRef,
76    ) -> Self {
77        Self {
78            inserter,
79            statement_executor,
80            table,
81            query_engine,
82            cache: PipelineCache::new(),
83        }
84    }
85
86    /// Build the schema for the pipeline table.
87    /// Returns the (time index, primary keys, column) definitions.
88    pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
89        (
90            PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
91            vec![
92                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
93                PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
94                PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
95            ],
96            vec![
97                ColumnDef {
98                    name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
99                    data_type: ColumnDataType::String as i32,
100                    is_nullable: false,
101                    default_constraint: vec![],
102                    semantic_type: SemanticType::Tag as i32,
103                    comment: "".to_string(),
104                    datatype_extension: None,
105                    options: None,
106                },
107                ColumnDef {
108                    name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
109                    data_type: ColumnDataType::String as i32,
110                    is_nullable: false,
111                    default_constraint: vec![],
112                    semantic_type: SemanticType::Tag as i32,
113                    comment: "".to_string(),
114                    datatype_extension: None,
115                    options: None,
116                },
117                ColumnDef {
118                    name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
119                    data_type: ColumnDataType::String as i32,
120                    is_nullable: false,
121                    default_constraint: vec![],
122                    semantic_type: SemanticType::Tag as i32,
123                    comment: "".to_string(),
124                    datatype_extension: None,
125                    options: None,
126                },
127                ColumnDef {
128                    name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
129                    data_type: ColumnDataType::String as i32,
130                    is_nullable: false,
131                    default_constraint: vec![],
132                    semantic_type: SemanticType::Field as i32,
133                    comment: "".to_string(),
134                    datatype_extension: None,
135                    options: None,
136                },
137                ColumnDef {
138                    name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
139                    data_type: ColumnDataType::TimestampNanosecond as i32,
140                    is_nullable: false,
141                    default_constraint: vec![],
142                    semantic_type: SemanticType::Timestamp as i32,
143                    comment: "".to_string(),
144                    datatype_extension: None,
145                    options: None,
146                },
147            ],
148        )
149    }
150
151    /// Build the column schemas for inserting a row into the pipeline table.
152    fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
153        vec![
154            PbColumnSchema {
155                column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
156                datatype: ColumnDataType::String.into(),
157                semantic_type: SemanticType::Tag.into(),
158                ..Default::default()
159            },
160            PbColumnSchema {
161                column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
162                datatype: ColumnDataType::String.into(),
163                semantic_type: SemanticType::Tag.into(),
164                ..Default::default()
165            },
166            PbColumnSchema {
167                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
168                datatype: ColumnDataType::String.into(),
169                semantic_type: SemanticType::Tag.into(),
170                ..Default::default()
171            },
172            PbColumnSchema {
173                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
174                datatype: ColumnDataType::String.into(),
175                semantic_type: SemanticType::Field.into(),
176                ..Default::default()
177            },
178            PbColumnSchema {
179                column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
180                datatype: ColumnDataType::TimestampNanosecond.into(),
181                semantic_type: SemanticType::Timestamp.into(),
182                ..Default::default()
183            },
184        ]
185    }
186
187    fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
188        QueryContextBuilder::default()
189            .current_catalog(table_info.catalog_name.to_string())
190            .current_schema(table_info.schema_name.to_string())
191            .build()
192            .into()
193    }
194
195    /// Compile a pipeline from a string.
196    pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
197        let yaml_content = Content::Yaml(pipeline);
198        parse(&yaml_content)
199    }
200
201    /// Insert a pipeline into the pipeline table.
202    async fn insert_pipeline_to_pipeline_table(
203        &self,
204        name: &str,
205        content_type: &str,
206        pipeline: &str,
207    ) -> Result<Timestamp> {
208        let now = Timestamp::current_time(TimeUnit::Nanosecond);
209
210        let table_info = self.table.table_info();
211
212        let insert = RowInsertRequest {
213            table_name: PIPELINE_TABLE_NAME.to_string(),
214            rows: Some(Rows {
215                schema: Self::build_insert_column_schemas(),
216                rows: vec![Row {
217                    values: vec![
218                        ValueData::StringValue(name.to_string()).into(),
219                        ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
220                        ValueData::StringValue(content_type.to_string()).into(),
221                        ValueData::StringValue(pipeline.to_string()).into(),
222                        ValueData::TimestampNanosecondValue(now.value()).into(),
223                    ],
224                }],
225            }),
226        };
227
228        let requests = RowInsertRequests {
229            inserts: vec![insert],
230        };
231
232        let output = self
233            .inserter
234            .handle_row_inserts(
235                requests,
236                Self::query_ctx(&table_info),
237                &self.statement_executor,
238                false,
239                false,
240            )
241            .await
242            .context(InsertPipelineSnafu)?;
243
244        info!(
245            "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
246            name,
247            table_info.full_table_name(),
248            output
249        );
250
251        Ok(now)
252    }
253
254    /// Get a pipeline by name.
255    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
256    pub async fn get_pipeline(
257        &self,
258        schema: &str,
259        name: &str,
260        version: PipelineVersion,
261    ) -> Result<Arc<Pipeline>> {
262        if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
263            return Ok(pipeline);
264        }
265
266        let pipeline = self.get_pipeline_str(schema, name, version).await?;
267        let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
268
269        self.cache
270            .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
271        Ok(compiled_pipeline)
272    }
273
274    /// Get a original pipeline by name.
275    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
276    pub async fn get_pipeline_str(
277        &self,
278        schema: &str,
279        name: &str,
280        version: PipelineVersion,
281    ) -> Result<(String, TimestampNanosecond)> {
282        if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
283            return Ok(pipeline);
284        }
285
286        let mut pipeline_vec = self.find_pipeline(name, version).await?;
287        ensure!(
288            !pipeline_vec.is_empty(),
289            PipelineNotFoundSnafu { name, version }
290        );
291
292        // if the result is exact one, use it
293        if pipeline_vec.len() == 1 {
294            let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
295            let p = (pipeline_content, version);
296            self.cache.insert_pipeline_str_cache(
297                &found_schema,
298                name,
299                Some(version),
300                p.clone(),
301                false,
302            );
303            return Ok(p);
304        }
305
306        // check if there's empty schema pipeline
307        // if there isn't, check current schema
308        let pipeline = pipeline_vec
309            .iter()
310            .find(|v| v.1 == EMPTY_SCHEMA_NAME)
311            .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
312
313        // multiple pipeline with no empty or current schema
314        // throw an error
315        let (pipeline_content, found_schema, version) =
316            pipeline.context(MultiPipelineWithDiffSchemaSnafu {
317                schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
318            })?;
319
320        let v = *version;
321        let p = (pipeline_content.clone(), v);
322        self.cache
323            .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
324        Ok(p)
325    }
326
327    /// Insert a pipeline into the pipeline table and compile it.
328    /// The compiled pipeline will be inserted into the cache.
329    /// Newly created pipelines will be saved under empty schema.
330    pub async fn insert_and_compile(
331        &self,
332        name: &str,
333        content_type: &str,
334        pipeline: &str,
335    ) -> Result<PipelineInfo> {
336        let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
337        // we will use the version in the future
338        let version = self
339            .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
340            .await?;
341
342        {
343            self.cache.insert_pipeline_cache(
344                EMPTY_SCHEMA_NAME,
345                name,
346                Some(TimestampNanosecond(version)),
347                compiled_pipeline.clone(),
348                true,
349            );
350
351            self.cache.insert_pipeline_str_cache(
352                EMPTY_SCHEMA_NAME,
353                name,
354                Some(TimestampNanosecond(version)),
355                (pipeline.to_owned(), TimestampNanosecond(version)),
356                true,
357            );
358        }
359
360        Ok((version, compiled_pipeline))
361    }
362
363    pub async fn delete_pipeline(
364        &self,
365        name: &str,
366        version: PipelineVersion,
367    ) -> Result<Option<()>> {
368        // 0. version is ensured at the http api level not None
369        ensure!(
370            version.is_some(),
371            InvalidPipelineVersionSnafu { version: "None" }
372        );
373
374        // 1. check pipeline exist in catalog
375        let pipeline = self.find_pipeline(name, version).await?;
376        if pipeline.is_empty() {
377            return Ok(None);
378        }
379
380        // 2. prepare dataframe
381        let dataframe = self
382            .query_engine
383            .read_table(self.table.clone())
384            .context(DataFrameSnafu)?;
385        let DataFrame::DataFusion(dataframe) = dataframe;
386
387        let dataframe = dataframe
388            .filter(prepare_dataframe_conditions(name, version))
389            .context(BuildDfLogicalPlanSnafu)?;
390
391        // 3. prepare dml stmt
392        let table_info = self.table.table_info();
393        let table_name = TableReference::full(
394            table_info.catalog_name.clone(),
395            table_info.schema_name.clone(),
396            table_info.name.clone(),
397        );
398
399        let df_schema = Arc::new(
400            table_info
401                .meta
402                .schema
403                .arrow_schema()
404                .clone()
405                .to_dfschema()
406                .context(BuildDfLogicalPlanSnafu)?,
407        );
408
409        // create dml stmt
410        let stmt = DmlStatement::new(
411            table_name,
412            df_schema,
413            datafusion_expr::WriteOp::Delete,
414            Arc::new(dataframe.into_parts().1),
415        );
416
417        let plan = LogicalPlan::Dml(stmt);
418
419        // 4. execute dml stmt
420        let output = self
421            .query_engine
422            .execute(plan, Self::query_ctx(&table_info))
423            .await
424            .context(ExecuteInternalStatementSnafu)?;
425
426        info!(
427            "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
428            name,
429            version,
430            table_info.full_table_name(),
431            output
432        );
433
434        // remove cache with version and latest
435        self.cache.remove_cache(name, version);
436
437        Ok(Some(()))
438    }
439
440    // find all pipelines with name and version
441    // cloud be multiple with different schema
442    // return format: (pipeline content, schema, created_at)
443    async fn find_pipeline(
444        &self,
445        name: &str,
446        version: PipelineVersion,
447    ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
448        // 1. prepare dataframe
449        let dataframe = self
450            .query_engine
451            .read_table(self.table.clone())
452            .context(DataFrameSnafu)?;
453        let DataFrame::DataFusion(dataframe) = dataframe;
454
455        // select all pipelines with name and version
456        let dataframe = dataframe
457            .filter(prepare_dataframe_conditions(name, version))
458            .context(BuildDfLogicalPlanSnafu)?
459            .select_columns(&[
460                PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
461                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
462                PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
463            ])
464            .context(BuildDfLogicalPlanSnafu)?
465            .sort(vec![
466                col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
467            ])
468            .context(BuildDfLogicalPlanSnafu)?;
469
470        let plan = dataframe.into_parts().1;
471
472        let table_info = self.table.table_info();
473
474        debug!("find_pipeline_by_name: plan: {:?}", plan);
475
476        // 2. execute plan
477        let output = self
478            .query_engine
479            .execute(plan, Self::query_ctx(&table_info))
480            .await
481            .context(ExecuteInternalStatementSnafu)?;
482        let stream = match output.data {
483            OutputData::Stream(stream) => stream,
484            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
485            _ => unreachable!(),
486        };
487
488        // 3. construct result
489        let records = record_util::collect(stream)
490            .await
491            .context(CollectRecordsSnafu)?;
492
493        if records.is_empty() {
494            return Ok(vec![]);
495        }
496
497        ensure!(
498            !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
499            PipelineNotFoundSnafu { name, version }
500        );
501
502        let mut re = Vec::with_capacity(records.len());
503        for r in records {
504            let pipeline_content_column = r.column(0);
505            let pipeline_content = pipeline_content_column
506                .as_any()
507                .downcast_ref::<StringVector>()
508                .with_context(|| CastTypeSnafu {
509                    msg: format!(
510                        "can't downcast {:?} array into string vector",
511                        pipeline_content_column.data_type()
512                    ),
513                })?;
514
515            let pipeline_schema_column = r.column(1);
516            let pipeline_schema = pipeline_schema_column
517                .as_any()
518                .downcast_ref::<StringVector>()
519                .with_context(|| CastTypeSnafu {
520                    msg: format!(
521                        "can't downcast {:?} array into string vector",
522                        pipeline_schema_column.data_type()
523                    ),
524                })?;
525
526            let pipeline_created_at_column = r.column(2);
527            let pipeline_created_at = pipeline_created_at_column
528                .as_any()
529                .downcast_ref::<TimestampNanosecondVector>()
530                .with_context(|| CastTypeSnafu {
531                    msg: format!(
532                        "can't downcast {:?} array into scalar vector",
533                        pipeline_created_at_column.data_type()
534                    ),
535                })?;
536
537            debug!(
538                "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
539                pipeline_content, pipeline_schema, pipeline_created_at
540            );
541
542            ensure!(
543                pipeline_content.len() == pipeline_schema.len()
544                    && pipeline_schema.len() == pipeline_created_at.len(),
545                RecordBatchLenNotMatchSnafu
546            );
547
548            let len = pipeline_content.len();
549            for i in 0..len {
550                re.push((
551                    pipeline_content.get_data(i).unwrap().to_string(),
552                    pipeline_schema.get_data(i).unwrap().to_string(),
553                    pipeline_created_at.get_data(i).unwrap(),
554                ));
555            }
556        }
557
558        Ok(re)
559    }
560}