pipeline/manager/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20    RowInsertRequests, Rows, SemanticType,
21};
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::TimestampNanosecondType;
24use common_query::OutputData;
25use common_recordbatch::util as record_util;
26use common_telemetry::{debug, info};
27use common_time::timestamp::{TimeUnit, Timestamp};
28use datafusion::datasource::DefaultTableSource;
29use datafusion::logical_expr::col;
30use datafusion_common::TableReference;
31use datafusion_expr::{DmlStatement, LogicalPlan};
32use datatypes::timestamp::TimestampNanosecond;
33use itertools::Itertools;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::QueryEngineRef;
37use session::context::{QueryContextBuilder, QueryContextRef};
38use snafu::{OptionExt, ResultExt, ensure};
39use table::TableRef;
40use table::metadata::TableInfo;
41use table::table::adapter::DfTableProviderAdapter;
42
43use crate::error::{
44    BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
45    ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
46    MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
47};
48use crate::etl::{Content, Pipeline, parse};
49use crate::manager::pipeline_cache::PipelineCache;
50use crate::manager::{PipelineInfo, PipelineVersion};
51use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
52use crate::util::prepare_dataframe_conditions;
53
54pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
55pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
56const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
57const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
58const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
59pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
60pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
61
62/// PipelineTable is a table that stores the pipeline schema and content.
63/// Every catalog has its own pipeline table.
64pub struct PipelineTable {
65    inserter: InserterRef,
66    statement_executor: StatementExecutorRef,
67    table: TableRef,
68    query_engine: QueryEngineRef,
69    cache: PipelineCache,
70}
71
72impl PipelineTable {
73    /// Create a new PipelineTable.
74    pub fn new(
75        inserter: InserterRef,
76        statement_executor: StatementExecutorRef,
77        table: TableRef,
78        query_engine: QueryEngineRef,
79    ) -> Self {
80        Self {
81            inserter,
82            statement_executor,
83            table,
84            query_engine,
85            cache: PipelineCache::new(),
86        }
87    }
88
89    /// Build the schema for the pipeline table.
90    /// Returns the (time index, primary keys, column) definitions.
91    pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
92        (
93            PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
94            vec![
95                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
96                PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
97                PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
98            ],
99            vec![
100                ColumnDef {
101                    name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
102                    data_type: ColumnDataType::String as i32,
103                    is_nullable: false,
104                    default_constraint: vec![],
105                    semantic_type: SemanticType::Tag as i32,
106                    comment: "".to_string(),
107                    datatype_extension: None,
108                    options: None,
109                },
110                ColumnDef {
111                    name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
112                    data_type: ColumnDataType::String as i32,
113                    is_nullable: false,
114                    default_constraint: vec![],
115                    semantic_type: SemanticType::Tag as i32,
116                    comment: "".to_string(),
117                    datatype_extension: None,
118                    options: None,
119                },
120                ColumnDef {
121                    name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
122                    data_type: ColumnDataType::String as i32,
123                    is_nullable: false,
124                    default_constraint: vec![],
125                    semantic_type: SemanticType::Tag as i32,
126                    comment: "".to_string(),
127                    datatype_extension: None,
128                    options: None,
129                },
130                ColumnDef {
131                    name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
132                    data_type: ColumnDataType::String as i32,
133                    is_nullable: false,
134                    default_constraint: vec![],
135                    semantic_type: SemanticType::Field as i32,
136                    comment: "".to_string(),
137                    datatype_extension: None,
138                    options: None,
139                },
140                ColumnDef {
141                    name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
142                    data_type: ColumnDataType::TimestampNanosecond as i32,
143                    is_nullable: false,
144                    default_constraint: vec![],
145                    semantic_type: SemanticType::Timestamp as i32,
146                    comment: "".to_string(),
147                    datatype_extension: None,
148                    options: None,
149                },
150            ],
151        )
152    }
153
154    /// Build the column schemas for inserting a row into the pipeline table.
155    fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
156        vec![
157            PbColumnSchema {
158                column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
159                datatype: ColumnDataType::String.into(),
160                semantic_type: SemanticType::Tag.into(),
161                ..Default::default()
162            },
163            PbColumnSchema {
164                column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
165                datatype: ColumnDataType::String.into(),
166                semantic_type: SemanticType::Tag.into(),
167                ..Default::default()
168            },
169            PbColumnSchema {
170                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
171                datatype: ColumnDataType::String.into(),
172                semantic_type: SemanticType::Tag.into(),
173                ..Default::default()
174            },
175            PbColumnSchema {
176                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
177                datatype: ColumnDataType::String.into(),
178                semantic_type: SemanticType::Field.into(),
179                ..Default::default()
180            },
181            PbColumnSchema {
182                column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
183                datatype: ColumnDataType::TimestampNanosecond.into(),
184                semantic_type: SemanticType::Timestamp.into(),
185                ..Default::default()
186            },
187        ]
188    }
189
190    fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
191        QueryContextBuilder::default()
192            .current_catalog(table_info.catalog_name.clone())
193            .current_schema(table_info.schema_name.clone())
194            .build()
195            .into()
196    }
197
198    /// Compile a pipeline from a string.
199    pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
200        let yaml_content = Content::Yaml(pipeline);
201        parse(&yaml_content)
202    }
203
204    /// Insert a pipeline into the pipeline table.
205    async fn insert_pipeline_to_pipeline_table(
206        &self,
207        name: &str,
208        content_type: &str,
209        pipeline: &str,
210    ) -> Result<Timestamp> {
211        let now = Timestamp::current_time(TimeUnit::Nanosecond);
212
213        let table_info = self.table.table_info();
214
215        let insert = RowInsertRequest {
216            table_name: PIPELINE_TABLE_NAME.to_string(),
217            rows: Some(Rows {
218                schema: Self::build_insert_column_schemas(),
219                rows: vec![Row {
220                    values: vec![
221                        ValueData::StringValue(name.to_string()).into(),
222                        ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
223                        ValueData::StringValue(content_type.to_string()).into(),
224                        ValueData::StringValue(pipeline.to_string()).into(),
225                        ValueData::TimestampNanosecondValue(now.value()).into(),
226                    ],
227                }],
228            }),
229        };
230
231        let requests = RowInsertRequests {
232            inserts: vec![insert],
233        };
234
235        let output = self
236            .inserter
237            .handle_row_inserts(
238                requests,
239                Self::query_ctx(&table_info),
240                &self.statement_executor,
241                false,
242                false,
243            )
244            .await
245            .context(InsertPipelineSnafu)?;
246
247        info!(
248            "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
249            name,
250            table_info.full_table_name(),
251            output
252        );
253
254        Ok(now)
255    }
256
257    /// Get a pipeline by name.
258    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
259    pub async fn get_pipeline(
260        &self,
261        schema: &str,
262        name: &str,
263        version: PipelineVersion,
264    ) -> Result<Arc<Pipeline>> {
265        if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
266            return Ok(pipeline);
267        }
268
269        let pipeline = self.get_pipeline_str(schema, name, version).await?;
270        let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
271
272        self.cache
273            .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
274        Ok(compiled_pipeline)
275    }
276
277    /// Get a original pipeline by name.
278    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
279    pub async fn get_pipeline_str(
280        &self,
281        schema: &str,
282        name: &str,
283        version: PipelineVersion,
284    ) -> Result<(String, TimestampNanosecond)> {
285        if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
286            return Ok(pipeline);
287        }
288
289        let mut pipeline_vec;
290        match self.find_pipeline(name, version).await {
291            Ok(p) => {
292                METRIC_PIPELINE_TABLE_FIND_COUNT
293                    .with_label_values(&["true"])
294                    .inc();
295                pipeline_vec = p;
296            }
297            Err(e) => {
298                match e {
299                    Error::CollectRecords { .. } => {
300                        // if collect records failed, it means the pipeline table is temporary invalid
301                        // we should use failover cache
302                        METRIC_PIPELINE_TABLE_FIND_COUNT
303                            .with_label_values(&["false"])
304                            .inc();
305                        return self
306                            .cache
307                            .get_failover_cache(schema, name, version)?
308                            .ok_or(PipelineNotFoundSnafu { name, version }.build());
309                    }
310                    _ => {
311                        // if other error, we should return it
312                        return Err(e);
313                    }
314                }
315            }
316        };
317        ensure!(
318            !pipeline_vec.is_empty(),
319            PipelineNotFoundSnafu { name, version }
320        );
321
322        // if the result is exact one, use it
323        if pipeline_vec.len() == 1 {
324            let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
325            let p = (pipeline_content, version);
326            self.cache.insert_pipeline_str_cache(
327                &found_schema,
328                name,
329                Some(version),
330                p.clone(),
331                false,
332            );
333            return Ok(p);
334        }
335
336        // check if there's empty schema pipeline
337        // if there isn't, check current schema
338        let pipeline = pipeline_vec
339            .iter()
340            .find(|v| v.1 == EMPTY_SCHEMA_NAME)
341            .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
342
343        // multiple pipeline with no empty or current schema
344        // throw an error
345        let (pipeline_content, found_schema, version) =
346            pipeline.context(MultiPipelineWithDiffSchemaSnafu {
347                schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
348            })?;
349
350        let v = *version;
351        let p = (pipeline_content.clone(), v);
352        self.cache
353            .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
354        Ok(p)
355    }
356
357    /// Insert a pipeline into the pipeline table and compile it.
358    /// The compiled pipeline will be inserted into the cache.
359    /// Newly created pipelines will be saved under empty schema.
360    pub async fn insert_and_compile(
361        &self,
362        name: &str,
363        content_type: &str,
364        pipeline: &str,
365    ) -> Result<PipelineInfo> {
366        let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
367        // we will use the version in the future
368        let version = self
369            .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
370            .await?;
371
372        {
373            self.cache.insert_pipeline_cache(
374                EMPTY_SCHEMA_NAME,
375                name,
376                Some(TimestampNanosecond(version)),
377                compiled_pipeline.clone(),
378                true,
379            );
380
381            self.cache.insert_pipeline_str_cache(
382                EMPTY_SCHEMA_NAME,
383                name,
384                Some(TimestampNanosecond(version)),
385                (pipeline.to_owned(), TimestampNanosecond(version)),
386                true,
387            );
388        }
389
390        Ok((version, compiled_pipeline))
391    }
392
393    pub async fn delete_pipeline(
394        &self,
395        name: &str,
396        version: PipelineVersion,
397    ) -> Result<Option<()>> {
398        // 0. version is ensured at the http api level not None
399        ensure!(
400            version.is_some(),
401            InvalidPipelineVersionSnafu { version: "None" }
402        );
403
404        // 1. check pipeline exist in catalog
405        let pipeline = self.find_pipeline(name, version).await?;
406        if pipeline.is_empty() {
407            return Ok(None);
408        }
409
410        // 2. prepare dataframe
411        let dataframe = self
412            .query_engine
413            .read_table(self.table.clone())
414            .context(DataFrameSnafu)?;
415
416        let dataframe = dataframe
417            .filter(prepare_dataframe_conditions(name, version))
418            .context(BuildDfLogicalPlanSnafu)?;
419
420        // 3. prepare dml stmt
421        let table_info = self.table.table_info();
422        let table_name = TableReference::full(
423            table_info.catalog_name.clone(),
424            table_info.schema_name.clone(),
425            table_info.name.clone(),
426        );
427
428        let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
429        let table_source = Arc::new(DefaultTableSource::new(table_provider));
430
431        // create dml stmt
432        let stmt = DmlStatement::new(
433            table_name,
434            table_source,
435            datafusion_expr::WriteOp::Delete,
436            Arc::new(dataframe.into_parts().1),
437        );
438
439        let plan = LogicalPlan::Dml(stmt);
440
441        // 4. execute dml stmt
442        let output = self
443            .query_engine
444            .execute(plan, Self::query_ctx(&table_info))
445            .await
446            .context(ExecuteInternalStatementSnafu)?;
447
448        info!(
449            "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
450            name,
451            version,
452            table_info.full_table_name(),
453            output
454        );
455
456        // remove cache with version and latest
457        self.cache.remove_cache(name, version);
458
459        Ok(Some(()))
460    }
461
462    // find all pipelines with name and version
463    // cloud be multiple with different schema
464    // return format: (pipeline content, schema, created_at)
465    async fn find_pipeline(
466        &self,
467        name: &str,
468        version: PipelineVersion,
469    ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
470        // 1. prepare dataframe
471        let dataframe = self
472            .query_engine
473            .read_table(self.table.clone())
474            .context(DataFrameSnafu)?;
475
476        // select all pipelines with name and version
477        let dataframe = dataframe
478            .filter(prepare_dataframe_conditions(name, version))
479            .context(BuildDfLogicalPlanSnafu)?
480            .select_columns(&[
481                PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
482                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
483                PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
484            ])
485            .context(BuildDfLogicalPlanSnafu)?
486            .sort(vec![
487                col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true),
488            ])
489            .context(BuildDfLogicalPlanSnafu)?;
490
491        let plan = dataframe.into_parts().1;
492
493        let table_info = self.table.table_info();
494
495        debug!("find_pipeline_by_name: plan: {:?}", plan);
496
497        // 2. execute plan
498        let output = self
499            .query_engine
500            .execute(plan, Self::query_ctx(&table_info))
501            .await
502            .context(ExecuteInternalStatementSnafu)?;
503        let stream = match output.data {
504            OutputData::Stream(stream) => stream,
505            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
506            _ => unreachable!(),
507        };
508
509        // 3. construct result
510        let records = record_util::collect(stream)
511            .await
512            .context(CollectRecordsSnafu)?;
513
514        if records.is_empty() {
515            return Ok(vec![]);
516        }
517
518        ensure!(
519            !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
520            PipelineNotFoundSnafu { name, version }
521        );
522
523        let mut re = Vec::with_capacity(records.len());
524        for r in records {
525            let pipeline_content_column = r.column(0);
526            let pipeline_content = pipeline_content_column
527                .as_string_opt::<i32>()
528                .with_context(|| CastTypeSnafu {
529                    msg: format!(
530                        "can't downcast {:?} array into string vector",
531                        pipeline_content_column.data_type()
532                    ),
533                })?;
534
535            let pipeline_schema_column = r.column(1);
536            let pipeline_schema =
537                pipeline_schema_column
538                    .as_string_opt::<i32>()
539                    .with_context(|| CastTypeSnafu {
540                        msg: format!(
541                            "expecting pipeline schema column of type string, actual: {}",
542                            pipeline_schema_column.data_type()
543                        ),
544                    })?;
545
546            let pipeline_created_at_column = r.column(2);
547            let pipeline_created_at = pipeline_created_at_column
548                .as_primitive_opt::<TimestampNanosecondType>()
549                .with_context(|| CastTypeSnafu {
550                    msg: format!(
551                        "can't downcast {:?} array into scalar vector",
552                        pipeline_created_at_column.data_type()
553                    ),
554                })?;
555
556            debug!(
557                "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
558                pipeline_content, pipeline_schema, pipeline_created_at
559            );
560
561            ensure!(
562                pipeline_content.len() == pipeline_schema.len()
563                    && pipeline_schema.len() == pipeline_created_at.len(),
564                RecordBatchLenNotMatchSnafu
565            );
566
567            let len = pipeline_content.len();
568            for i in 0..len {
569                re.push((
570                    pipeline_content.value(i).to_string(),
571                    pipeline_schema.value(i).to_string(),
572                    TimestampNanosecond::new(pipeline_created_at.value(i)),
573                ));
574            }
575        }
576
577        Ok(re)
578    }
579}