pipeline/manager/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20    RowInsertRequests, Rows, SemanticType,
21};
22use common_query::OutputData;
23use common_recordbatch::util as record_util;
24use common_telemetry::{debug, info};
25use common_time::timestamp::{TimeUnit, Timestamp};
26use datafusion_common::{TableReference, ToDFSchema};
27use datafusion_expr::{col, DmlStatement, LogicalPlan};
28use datatypes::prelude::ScalarVector;
29use datatypes::timestamp::TimestampNanosecond;
30use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
31use itertools::Itertools;
32use operator::insert::InserterRef;
33use operator::statement::StatementExecutorRef;
34use query::dataframe::DataFrame;
35use query::QueryEngineRef;
36use session::context::{QueryContextBuilder, QueryContextRef};
37use snafu::{ensure, OptionExt, ResultExt};
38use table::metadata::TableInfo;
39use table::TableRef;
40
41use crate::error::{
42    BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
43    ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
44    MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
45};
46use crate::etl::{parse, Content, Pipeline};
47use crate::manager::pipeline_cache::PipelineCache;
48use crate::manager::{PipelineInfo, PipelineVersion};
49use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
50use crate::util::prepare_dataframe_conditions;
51
52pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
53pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
54const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
55const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
56const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
57pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
58pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
59
60/// PipelineTable is a table that stores the pipeline schema and content.
61/// Every catalog has its own pipeline table.
62pub struct PipelineTable {
63    inserter: InserterRef,
64    statement_executor: StatementExecutorRef,
65    table: TableRef,
66    query_engine: QueryEngineRef,
67    cache: PipelineCache,
68}
69
70impl PipelineTable {
71    /// Create a new PipelineTable.
72    pub fn new(
73        inserter: InserterRef,
74        statement_executor: StatementExecutorRef,
75        table: TableRef,
76        query_engine: QueryEngineRef,
77    ) -> Self {
78        Self {
79            inserter,
80            statement_executor,
81            table,
82            query_engine,
83            cache: PipelineCache::new(),
84        }
85    }
86
87    /// Build the schema for the pipeline table.
88    /// Returns the (time index, primary keys, column) definitions.
89    pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
90        (
91            PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
92            vec![
93                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
94                PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
95                PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
96            ],
97            vec![
98                ColumnDef {
99                    name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
100                    data_type: ColumnDataType::String as i32,
101                    is_nullable: false,
102                    default_constraint: vec![],
103                    semantic_type: SemanticType::Tag as i32,
104                    comment: "".to_string(),
105                    datatype_extension: None,
106                    options: None,
107                },
108                ColumnDef {
109                    name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
110                    data_type: ColumnDataType::String as i32,
111                    is_nullable: false,
112                    default_constraint: vec![],
113                    semantic_type: SemanticType::Tag as i32,
114                    comment: "".to_string(),
115                    datatype_extension: None,
116                    options: None,
117                },
118                ColumnDef {
119                    name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
120                    data_type: ColumnDataType::String as i32,
121                    is_nullable: false,
122                    default_constraint: vec![],
123                    semantic_type: SemanticType::Tag as i32,
124                    comment: "".to_string(),
125                    datatype_extension: None,
126                    options: None,
127                },
128                ColumnDef {
129                    name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
130                    data_type: ColumnDataType::String as i32,
131                    is_nullable: false,
132                    default_constraint: vec![],
133                    semantic_type: SemanticType::Field as i32,
134                    comment: "".to_string(),
135                    datatype_extension: None,
136                    options: None,
137                },
138                ColumnDef {
139                    name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
140                    data_type: ColumnDataType::TimestampNanosecond as i32,
141                    is_nullable: false,
142                    default_constraint: vec![],
143                    semantic_type: SemanticType::Timestamp as i32,
144                    comment: "".to_string(),
145                    datatype_extension: None,
146                    options: None,
147                },
148            ],
149        )
150    }
151
152    /// Build the column schemas for inserting a row into the pipeline table.
153    fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
154        vec![
155            PbColumnSchema {
156                column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
157                datatype: ColumnDataType::String.into(),
158                semantic_type: SemanticType::Tag.into(),
159                ..Default::default()
160            },
161            PbColumnSchema {
162                column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
163                datatype: ColumnDataType::String.into(),
164                semantic_type: SemanticType::Tag.into(),
165                ..Default::default()
166            },
167            PbColumnSchema {
168                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
169                datatype: ColumnDataType::String.into(),
170                semantic_type: SemanticType::Tag.into(),
171                ..Default::default()
172            },
173            PbColumnSchema {
174                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
175                datatype: ColumnDataType::String.into(),
176                semantic_type: SemanticType::Field.into(),
177                ..Default::default()
178            },
179            PbColumnSchema {
180                column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
181                datatype: ColumnDataType::TimestampNanosecond.into(),
182                semantic_type: SemanticType::Timestamp.into(),
183                ..Default::default()
184            },
185        ]
186    }
187
188    fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
189        QueryContextBuilder::default()
190            .current_catalog(table_info.catalog_name.to_string())
191            .current_schema(table_info.schema_name.to_string())
192            .build()
193            .into()
194    }
195
196    /// Compile a pipeline from a string.
197    pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
198        let yaml_content = Content::Yaml(pipeline);
199        parse(&yaml_content)
200    }
201
202    /// Insert a pipeline into the pipeline table.
203    async fn insert_pipeline_to_pipeline_table(
204        &self,
205        name: &str,
206        content_type: &str,
207        pipeline: &str,
208    ) -> Result<Timestamp> {
209        let now = Timestamp::current_time(TimeUnit::Nanosecond);
210
211        let table_info = self.table.table_info();
212
213        let insert = RowInsertRequest {
214            table_name: PIPELINE_TABLE_NAME.to_string(),
215            rows: Some(Rows {
216                schema: Self::build_insert_column_schemas(),
217                rows: vec![Row {
218                    values: vec![
219                        ValueData::StringValue(name.to_string()).into(),
220                        ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
221                        ValueData::StringValue(content_type.to_string()).into(),
222                        ValueData::StringValue(pipeline.to_string()).into(),
223                        ValueData::TimestampNanosecondValue(now.value()).into(),
224                    ],
225                }],
226            }),
227        };
228
229        let requests = RowInsertRequests {
230            inserts: vec![insert],
231        };
232
233        let output = self
234            .inserter
235            .handle_row_inserts(
236                requests,
237                Self::query_ctx(&table_info),
238                &self.statement_executor,
239                false,
240                false,
241            )
242            .await
243            .context(InsertPipelineSnafu)?;
244
245        info!(
246            "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
247            name,
248            table_info.full_table_name(),
249            output
250        );
251
252        Ok(now)
253    }
254
255    /// Get a pipeline by name.
256    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
257    pub async fn get_pipeline(
258        &self,
259        schema: &str,
260        name: &str,
261        version: PipelineVersion,
262    ) -> Result<Arc<Pipeline>> {
263        if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
264            return Ok(pipeline);
265        }
266
267        let pipeline = self.get_pipeline_str(schema, name, version).await?;
268        let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
269
270        self.cache
271            .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
272        Ok(compiled_pipeline)
273    }
274
275    /// Get a original pipeline by name.
276    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
277    pub async fn get_pipeline_str(
278        &self,
279        schema: &str,
280        name: &str,
281        version: PipelineVersion,
282    ) -> Result<(String, TimestampNanosecond)> {
283        if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
284            return Ok(pipeline);
285        }
286
287        let mut pipeline_vec;
288        match self.find_pipeline(name, version).await {
289            Ok(p) => {
290                METRIC_PIPELINE_TABLE_FIND_COUNT
291                    .with_label_values(&["true"])
292                    .inc();
293                pipeline_vec = p;
294            }
295            Err(e) => {
296                match e {
297                    Error::CollectRecords { .. } => {
298                        // if collect records failed, it means the pipeline table is temporary invalid
299                        // we should use failover cache
300                        METRIC_PIPELINE_TABLE_FIND_COUNT
301                            .with_label_values(&["false"])
302                            .inc();
303                        return self
304                            .cache
305                            .get_failover_cache(schema, name, version)?
306                            .ok_or(PipelineNotFoundSnafu { name, version }.build());
307                    }
308                    _ => {
309                        // if other error, we should return it
310                        return Err(e);
311                    }
312                }
313            }
314        };
315        ensure!(
316            !pipeline_vec.is_empty(),
317            PipelineNotFoundSnafu { name, version }
318        );
319
320        // if the result is exact one, use it
321        if pipeline_vec.len() == 1 {
322            let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
323            let p = (pipeline_content, version);
324            self.cache.insert_pipeline_str_cache(
325                &found_schema,
326                name,
327                Some(version),
328                p.clone(),
329                false,
330            );
331            return Ok(p);
332        }
333
334        // check if there's empty schema pipeline
335        // if there isn't, check current schema
336        let pipeline = pipeline_vec
337            .iter()
338            .find(|v| v.1 == EMPTY_SCHEMA_NAME)
339            .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
340
341        // multiple pipeline with no empty or current schema
342        // throw an error
343        let (pipeline_content, found_schema, version) =
344            pipeline.context(MultiPipelineWithDiffSchemaSnafu {
345                schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
346            })?;
347
348        let v = *version;
349        let p = (pipeline_content.clone(), v);
350        self.cache
351            .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
352        Ok(p)
353    }
354
355    /// Insert a pipeline into the pipeline table and compile it.
356    /// The compiled pipeline will be inserted into the cache.
357    /// Newly created pipelines will be saved under empty schema.
358    pub async fn insert_and_compile(
359        &self,
360        name: &str,
361        content_type: &str,
362        pipeline: &str,
363    ) -> Result<PipelineInfo> {
364        let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
365        // we will use the version in the future
366        let version = self
367            .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
368            .await?;
369
370        {
371            self.cache.insert_pipeline_cache(
372                EMPTY_SCHEMA_NAME,
373                name,
374                Some(TimestampNanosecond(version)),
375                compiled_pipeline.clone(),
376                true,
377            );
378
379            self.cache.insert_pipeline_str_cache(
380                EMPTY_SCHEMA_NAME,
381                name,
382                Some(TimestampNanosecond(version)),
383                (pipeline.to_owned(), TimestampNanosecond(version)),
384                true,
385            );
386        }
387
388        Ok((version, compiled_pipeline))
389    }
390
391    pub async fn delete_pipeline(
392        &self,
393        name: &str,
394        version: PipelineVersion,
395    ) -> Result<Option<()>> {
396        // 0. version is ensured at the http api level not None
397        ensure!(
398            version.is_some(),
399            InvalidPipelineVersionSnafu { version: "None" }
400        );
401
402        // 1. check pipeline exist in catalog
403        let pipeline = self.find_pipeline(name, version).await?;
404        if pipeline.is_empty() {
405            return Ok(None);
406        }
407
408        // 2. prepare dataframe
409        let dataframe = self
410            .query_engine
411            .read_table(self.table.clone())
412            .context(DataFrameSnafu)?;
413        let DataFrame::DataFusion(dataframe) = dataframe;
414
415        let dataframe = dataframe
416            .filter(prepare_dataframe_conditions(name, version))
417            .context(BuildDfLogicalPlanSnafu)?;
418
419        // 3. prepare dml stmt
420        let table_info = self.table.table_info();
421        let table_name = TableReference::full(
422            table_info.catalog_name.clone(),
423            table_info.schema_name.clone(),
424            table_info.name.clone(),
425        );
426
427        let df_schema = Arc::new(
428            table_info
429                .meta
430                .schema
431                .arrow_schema()
432                .clone()
433                .to_dfschema()
434                .context(BuildDfLogicalPlanSnafu)?,
435        );
436
437        // create dml stmt
438        let stmt = DmlStatement::new(
439            table_name,
440            df_schema,
441            datafusion_expr::WriteOp::Delete,
442            Arc::new(dataframe.into_parts().1),
443        );
444
445        let plan = LogicalPlan::Dml(stmt);
446
447        // 4. execute dml stmt
448        let output = self
449            .query_engine
450            .execute(plan, Self::query_ctx(&table_info))
451            .await
452            .context(ExecuteInternalStatementSnafu)?;
453
454        info!(
455            "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
456            name,
457            version,
458            table_info.full_table_name(),
459            output
460        );
461
462        // remove cache with version and latest
463        self.cache.remove_cache(name, version);
464
465        Ok(Some(()))
466    }
467
468    // find all pipelines with name and version
469    // cloud be multiple with different schema
470    // return format: (pipeline content, schema, created_at)
471    async fn find_pipeline(
472        &self,
473        name: &str,
474        version: PipelineVersion,
475    ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
476        // 1. prepare dataframe
477        let dataframe = self
478            .query_engine
479            .read_table(self.table.clone())
480            .context(DataFrameSnafu)?;
481        let DataFrame::DataFusion(dataframe) = dataframe;
482
483        // select all pipelines with name and version
484        let dataframe = dataframe
485            .filter(prepare_dataframe_conditions(name, version))
486            .context(BuildDfLogicalPlanSnafu)?
487            .select_columns(&[
488                PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
489                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
490                PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
491            ])
492            .context(BuildDfLogicalPlanSnafu)?
493            .sort(vec![
494                col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
495            ])
496            .context(BuildDfLogicalPlanSnafu)?;
497
498        let plan = dataframe.into_parts().1;
499
500        let table_info = self.table.table_info();
501
502        debug!("find_pipeline_by_name: plan: {:?}", plan);
503
504        // 2. execute plan
505        let output = self
506            .query_engine
507            .execute(plan, Self::query_ctx(&table_info))
508            .await
509            .context(ExecuteInternalStatementSnafu)?;
510        let stream = match output.data {
511            OutputData::Stream(stream) => stream,
512            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
513            _ => unreachable!(),
514        };
515
516        // 3. construct result
517        let records = record_util::collect(stream)
518            .await
519            .context(CollectRecordsSnafu)?;
520
521        if records.is_empty() {
522            return Ok(vec![]);
523        }
524
525        ensure!(
526            !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
527            PipelineNotFoundSnafu { name, version }
528        );
529
530        let mut re = Vec::with_capacity(records.len());
531        for r in records {
532            let pipeline_content_column = r.column(0);
533            let pipeline_content = pipeline_content_column
534                .as_any()
535                .downcast_ref::<StringVector>()
536                .with_context(|| CastTypeSnafu {
537                    msg: format!(
538                        "can't downcast {:?} array into string vector",
539                        pipeline_content_column.data_type()
540                    ),
541                })?;
542
543            let pipeline_schema_column = r.column(1);
544            let pipeline_schema = pipeline_schema_column
545                .as_any()
546                .downcast_ref::<StringVector>()
547                .with_context(|| CastTypeSnafu {
548                    msg: format!(
549                        "can't downcast {:?} array into string vector",
550                        pipeline_schema_column.data_type()
551                    ),
552                })?;
553
554            let pipeline_created_at_column = r.column(2);
555            let pipeline_created_at = pipeline_created_at_column
556                .as_any()
557                .downcast_ref::<TimestampNanosecondVector>()
558                .with_context(|| CastTypeSnafu {
559                    msg: format!(
560                        "can't downcast {:?} array into scalar vector",
561                        pipeline_created_at_column.data_type()
562                    ),
563                })?;
564
565            debug!(
566                "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
567                pipeline_content, pipeline_schema, pipeline_created_at
568            );
569
570            ensure!(
571                pipeline_content.len() == pipeline_schema.len()
572                    && pipeline_schema.len() == pipeline_created_at.len(),
573                RecordBatchLenNotMatchSnafu
574            );
575
576            let len = pipeline_content.len();
577            for i in 0..len {
578                re.push((
579                    pipeline_content.get_data(i).unwrap().to_string(),
580                    pipeline_schema.get_data(i).unwrap().to_string(),
581                    pipeline_created_at.get_data(i).unwrap(),
582                ));
583            }
584        }
585
586        Ok(re)
587    }
588}