pipeline/manager/
table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20    RowInsertRequests, Rows, SemanticType,
21};
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::TimestampNanosecondType;
24use common_query::OutputData;
25use common_recordbatch::util as record_util;
26use common_telemetry::{debug, info};
27use common_time::timestamp::{TimeUnit, Timestamp};
28use datafusion::datasource::DefaultTableSource;
29use datafusion::logical_expr::col;
30use datafusion_common::TableReference;
31use datafusion_expr::{DmlStatement, LogicalPlan};
32use datatypes::timestamp::TimestampNanosecond;
33use itertools::Itertools;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::QueryEngineRef;
37use session::context::{QueryContextBuilder, QueryContextRef};
38use snafu::{OptionExt, ResultExt, ensure};
39use table::TableRef;
40use table::metadata::TableInfo;
41use table::table::adapter::DfTableProviderAdapter;
42
43use crate::error::{
44    BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
45    ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
46    MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
47};
48use crate::etl::{Content, Pipeline, parse};
49use crate::manager::pipeline_cache::{PipelineCache, PipelineContent};
50use crate::manager::{PipelineInfo, PipelineVersion};
51use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
52use crate::util::prepare_dataframe_conditions;
53
54pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
55pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
56const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
57const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
58const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
59pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
60pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
61
62/// PipelineTable is a table that stores the pipeline schema and content.
63/// Every catalog has its own pipeline table.
64pub struct PipelineTable {
65    inserter: InserterRef,
66    statement_executor: StatementExecutorRef,
67    table: TableRef,
68    query_engine: QueryEngineRef,
69    cache: PipelineCache,
70}
71
72impl PipelineTable {
73    /// Create a new PipelineTable.
74    pub fn new(
75        inserter: InserterRef,
76        statement_executor: StatementExecutorRef,
77        table: TableRef,
78        query_engine: QueryEngineRef,
79    ) -> Self {
80        Self {
81            inserter,
82            statement_executor,
83            table,
84            query_engine,
85            cache: PipelineCache::new(),
86        }
87    }
88
89    /// Build the schema for the pipeline table.
90    /// Returns the (time index, primary keys, column) definitions.
91    pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
92        (
93            PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
94            vec![
95                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
96                PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
97                PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
98            ],
99            vec![
100                ColumnDef {
101                    name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
102                    data_type: ColumnDataType::String as i32,
103                    is_nullable: false,
104                    default_constraint: vec![],
105                    semantic_type: SemanticType::Tag as i32,
106                    comment: "".to_string(),
107                    datatype_extension: None,
108                    options: None,
109                },
110                ColumnDef {
111                    name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
112                    data_type: ColumnDataType::String as i32,
113                    is_nullable: false,
114                    default_constraint: vec![],
115                    semantic_type: SemanticType::Tag as i32,
116                    comment: "".to_string(),
117                    datatype_extension: None,
118                    options: None,
119                },
120                ColumnDef {
121                    name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
122                    data_type: ColumnDataType::String as i32,
123                    is_nullable: false,
124                    default_constraint: vec![],
125                    semantic_type: SemanticType::Tag as i32,
126                    comment: "".to_string(),
127                    datatype_extension: None,
128                    options: None,
129                },
130                ColumnDef {
131                    name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
132                    data_type: ColumnDataType::String as i32,
133                    is_nullable: false,
134                    default_constraint: vec![],
135                    semantic_type: SemanticType::Field as i32,
136                    comment: "".to_string(),
137                    datatype_extension: None,
138                    options: None,
139                },
140                ColumnDef {
141                    name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
142                    data_type: ColumnDataType::TimestampNanosecond as i32,
143                    is_nullable: false,
144                    default_constraint: vec![],
145                    semantic_type: SemanticType::Timestamp as i32,
146                    comment: "".to_string(),
147                    datatype_extension: None,
148                    options: None,
149                },
150            ],
151        )
152    }
153
154    /// Build the column schemas for inserting a row into the pipeline table.
155    fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
156        vec![
157            PbColumnSchema {
158                column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
159                datatype: ColumnDataType::String.into(),
160                semantic_type: SemanticType::Tag.into(),
161                ..Default::default()
162            },
163            PbColumnSchema {
164                column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
165                datatype: ColumnDataType::String.into(),
166                semantic_type: SemanticType::Tag.into(),
167                ..Default::default()
168            },
169            PbColumnSchema {
170                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
171                datatype: ColumnDataType::String.into(),
172                semantic_type: SemanticType::Tag.into(),
173                ..Default::default()
174            },
175            PbColumnSchema {
176                column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
177                datatype: ColumnDataType::String.into(),
178                semantic_type: SemanticType::Field.into(),
179                ..Default::default()
180            },
181            PbColumnSchema {
182                column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
183                datatype: ColumnDataType::TimestampNanosecond.into(),
184                semantic_type: SemanticType::Timestamp.into(),
185                ..Default::default()
186            },
187        ]
188    }
189
190    fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
191        QueryContextBuilder::default()
192            .current_catalog(table_info.catalog_name.clone())
193            .current_schema(table_info.schema_name.clone())
194            .build()
195            .into()
196    }
197
198    /// Compile a pipeline from a string.
199    pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
200        let yaml_content = Content::Yaml(pipeline);
201        parse(&yaml_content)
202    }
203
204    /// Insert a pipeline into the pipeline table.
205    async fn insert_pipeline_to_pipeline_table(
206        &self,
207        name: &str,
208        content_type: &str,
209        pipeline: &str,
210    ) -> Result<Timestamp> {
211        let now = Timestamp::current_time(TimeUnit::Nanosecond);
212
213        let table_info = self.table.table_info();
214
215        let insert = RowInsertRequest {
216            table_name: PIPELINE_TABLE_NAME.to_string(),
217            rows: Some(Rows {
218                schema: Self::build_insert_column_schemas(),
219                rows: vec![Row {
220                    values: vec![
221                        ValueData::StringValue(name.to_string()).into(),
222                        ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
223                        ValueData::StringValue(content_type.to_string()).into(),
224                        ValueData::StringValue(pipeline.to_string()).into(),
225                        ValueData::TimestampNanosecondValue(now.value()).into(),
226                    ],
227                }],
228            }),
229        };
230
231        let requests = RowInsertRequests {
232            inserts: vec![insert],
233        };
234
235        let output = self
236            .inserter
237            .handle_row_inserts(
238                requests,
239                Self::query_ctx(&table_info),
240                &self.statement_executor,
241                false,
242                false,
243            )
244            .await
245            .context(InsertPipelineSnafu)?;
246
247        info!(
248            "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
249            name,
250            table_info.full_table_name(),
251            output
252        );
253
254        Ok(now)
255    }
256
257    /// Get a pipeline by name.
258    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
259    pub async fn get_pipeline(
260        &self,
261        schema: &str,
262        name: &str,
263        input_version: PipelineVersion,
264    ) -> Result<Arc<Pipeline>> {
265        if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, input_version)? {
266            return Ok(pipeline);
267        }
268
269        let pipeline_content = self.get_pipeline_str(schema, name, input_version).await?;
270        let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline_content.content)?);
271
272        self.cache.insert_pipeline_cache(
273            &pipeline_content.schema,
274            name,
275            Some(pipeline_content.version),
276            compiled_pipeline.clone(),
277            input_version.is_none(),
278        );
279        Ok(compiled_pipeline)
280    }
281
282    /// Get a original pipeline by name.
283    /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache.
284    pub async fn get_pipeline_str(
285        &self,
286        schema: &str,
287        name: &str,
288        input_version: PipelineVersion,
289    ) -> Result<PipelineContent> {
290        if let Some(pipeline) = self
291            .cache
292            .get_pipeline_str_cache(schema, name, input_version)?
293        {
294            return Ok(pipeline);
295        }
296
297        let mut pipeline_vec;
298        match self.find_pipeline(name, input_version).await {
299            Ok(p) => {
300                METRIC_PIPELINE_TABLE_FIND_COUNT
301                    .with_label_values(&["true"])
302                    .inc();
303                pipeline_vec = p;
304            }
305            Err(e) => {
306                match e {
307                    Error::CollectRecords { .. } => {
308                        // if collect records failed, it means the pipeline table is temporary invalid
309                        // we should use failover cache
310                        METRIC_PIPELINE_TABLE_FIND_COUNT
311                            .with_label_values(&["false"])
312                            .inc();
313                        return self
314                            .cache
315                            .get_failover_cache(schema, name, input_version)?
316                            .context(PipelineNotFoundSnafu {
317                                name,
318                                version: input_version,
319                            });
320                    }
321                    _ => {
322                        // if other error, we should return it
323                        return Err(e);
324                    }
325                }
326            }
327        };
328        ensure!(
329            !pipeline_vec.is_empty(),
330            PipelineNotFoundSnafu {
331                name,
332                version: input_version
333            }
334        );
335
336        // if the result is exact one, use it
337        if pipeline_vec.len() == 1 {
338            let pipeline_content = pipeline_vec.remove(0);
339
340            self.cache
341                .insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
342            return Ok(pipeline_content);
343        }
344
345        // check if there's empty schema pipeline
346        // if there isn't, check current schema
347        let pipeline = pipeline_vec
348            .iter()
349            .position(|v| v.schema == EMPTY_SCHEMA_NAME)
350            .or_else(|| pipeline_vec.iter().position(|v| v.schema == schema))
351            .map(|idx| pipeline_vec.remove(idx));
352
353        // multiple pipeline with no empty or current schema
354        // throw an error
355        let pipeline_content = pipeline.with_context(|| MultiPipelineWithDiffSchemaSnafu {
356            name: name.to_string(),
357            current_schema: schema.to_string(),
358            schemas: pipeline_vec.iter().map(|v| v.schema.clone()).join(","),
359        })?;
360
361        self.cache
362            .insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
363        Ok(pipeline_content)
364    }
365
366    /// Insert a pipeline into the pipeline table and compile it.
367    /// The compiled pipeline will be inserted into the cache.
368    /// Newly created pipelines will be saved under empty schema.
369    pub async fn insert_and_compile(
370        &self,
371        name: &str,
372        content_type: &str,
373        pipeline: &str,
374    ) -> Result<PipelineInfo> {
375        let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
376        // we will use the version in the future
377        let version = self
378            .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
379            .await?;
380
381        {
382            self.cache.insert_pipeline_cache(
383                EMPTY_SCHEMA_NAME,
384                name,
385                Some(TimestampNanosecond(version)),
386                compiled_pipeline.clone(),
387                true,
388            );
389
390            let pipeline_content = PipelineContent {
391                name: name.to_string(),
392                content: pipeline.to_string(),
393                version: TimestampNanosecond(version),
394                schema: EMPTY_SCHEMA_NAME.to_string(),
395            };
396
397            self.cache
398                .insert_pipeline_str_cache(&pipeline_content, true);
399        }
400
401        Ok((version, compiled_pipeline))
402    }
403
404    pub async fn delete_pipeline(
405        &self,
406        name: &str,
407        version: PipelineVersion,
408    ) -> Result<Option<()>> {
409        // 0. version is ensured at the http api level not None
410        ensure!(
411            version.is_some(),
412            InvalidPipelineVersionSnafu { version: "None" }
413        );
414
415        // 1. check pipeline exist in catalog
416        let pipeline = self.find_pipeline(name, version).await?;
417        if pipeline.is_empty() {
418            return Ok(None);
419        }
420
421        // 2. prepare dataframe
422        let dataframe = self
423            .query_engine
424            .read_table(self.table.clone())
425            .context(DataFrameSnafu)?;
426
427        let dataframe = dataframe
428            .filter(prepare_dataframe_conditions(name, version))
429            .context(BuildDfLogicalPlanSnafu)?;
430
431        // 3. prepare dml stmt
432        let table_info = self.table.table_info();
433        let table_name = TableReference::full(
434            table_info.catalog_name.clone(),
435            table_info.schema_name.clone(),
436            table_info.name.clone(),
437        );
438
439        let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
440        let table_source = Arc::new(DefaultTableSource::new(table_provider));
441
442        // create dml stmt
443        let stmt = DmlStatement::new(
444            table_name,
445            table_source,
446            datafusion_expr::WriteOp::Delete,
447            Arc::new(dataframe.into_parts().1),
448        );
449
450        let plan = LogicalPlan::Dml(stmt);
451
452        // 4. execute dml stmt
453        let output = self
454            .query_engine
455            .execute(plan, Self::query_ctx(&table_info))
456            .await
457            .context(ExecuteInternalStatementSnafu)?;
458
459        info!(
460            "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
461            name,
462            version,
463            table_info.full_table_name(),
464            output
465        );
466
467        // remove cache with version and latest
468        self.cache.remove_cache(name, version);
469
470        Ok(Some(()))
471    }
472
473    // find all pipelines with name and version
474    // cloud be multiple with different schema
475    // return format: (pipeline content, schema, created_at)
476    async fn find_pipeline(
477        &self,
478        name: &str,
479        version: PipelineVersion,
480    ) -> Result<Vec<PipelineContent>> {
481        // 1. prepare dataframe
482        let dataframe = self
483            .query_engine
484            .read_table(self.table.clone())
485            .context(DataFrameSnafu)?;
486
487        // select all pipelines with name and version
488        let dataframe = dataframe
489            .filter(prepare_dataframe_conditions(name, version))
490            .context(BuildDfLogicalPlanSnafu)?
491            .select_columns(&[
492                PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
493                PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
494                PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
495            ])
496            .context(BuildDfLogicalPlanSnafu)?
497            .sort(vec![
498                col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true),
499            ])
500            .context(BuildDfLogicalPlanSnafu)?;
501
502        let plan = dataframe.into_parts().1;
503
504        let table_info = self.table.table_info();
505
506        debug!("find_pipeline_by_name: plan: {:?}", plan);
507
508        // 2. execute plan
509        let output = self
510            .query_engine
511            .execute(plan, Self::query_ctx(&table_info))
512            .await
513            .context(ExecuteInternalStatementSnafu)?;
514        let stream = match output.data {
515            OutputData::Stream(stream) => stream,
516            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
517            _ => unreachable!(),
518        };
519
520        // 3. construct result
521        let records = record_util::collect(stream)
522            .await
523            .context(CollectRecordsSnafu)?;
524
525        if records.is_empty() {
526            return Ok(vec![]);
527        }
528
529        ensure!(
530            !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
531            PipelineNotFoundSnafu { name, version }
532        );
533
534        let mut re = Vec::with_capacity(records.len());
535        for r in records {
536            let pipeline_content_column = r.column(0);
537            let pipeline_content = pipeline_content_column
538                .as_string_opt::<i32>()
539                .with_context(|| CastTypeSnafu {
540                    msg: format!(
541                        "can't downcast {:?} array into string vector",
542                        pipeline_content_column.data_type()
543                    ),
544                })?;
545
546            let pipeline_schema_column = r.column(1);
547            let pipeline_schema =
548                pipeline_schema_column
549                    .as_string_opt::<i32>()
550                    .with_context(|| CastTypeSnafu {
551                        msg: format!(
552                            "expecting pipeline schema column of type string, actual: {}",
553                            pipeline_schema_column.data_type()
554                        ),
555                    })?;
556
557            let pipeline_created_at_column = r.column(2);
558            let pipeline_created_at = pipeline_created_at_column
559                .as_primitive_opt::<TimestampNanosecondType>()
560                .with_context(|| CastTypeSnafu {
561                    msg: format!(
562                        "can't downcast {:?} array into scalar vector",
563                        pipeline_created_at_column.data_type()
564                    ),
565                })?;
566
567            debug!(
568                "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
569                pipeline_content, pipeline_schema, pipeline_created_at
570            );
571
572            ensure!(
573                pipeline_content.len() == pipeline_schema.len()
574                    && pipeline_schema.len() == pipeline_created_at.len(),
575                RecordBatchLenNotMatchSnafu
576            );
577
578            let len = pipeline_content.len();
579            for i in 0..len {
580                re.push(PipelineContent {
581                    name: name.to_string(),
582                    content: pipeline_content.value(i).to_string(),
583                    version: TimestampNanosecond::new(pipeline_created_at.value(i)),
584                    schema: pipeline_schema.value(i).to_string(),
585                });
586            }
587        }
588
589        Ok(re)
590    }
591}