1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20 RowInsertRequests, Rows, SemanticType,
21};
22use common_query::OutputData;
23use common_recordbatch::util as record_util;
24use common_telemetry::{debug, info};
25use common_time::timestamp::{TimeUnit, Timestamp};
26use datafusion::datasource::DefaultTableSource;
27use datafusion::logical_expr::col;
28use datafusion_common::TableReference;
29use datafusion_expr::{DmlStatement, LogicalPlan};
30use datatypes::prelude::ScalarVector;
31use datatypes::timestamp::TimestampNanosecond;
32use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
33use itertools::Itertools;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::dataframe::DataFrame;
37use query::QueryEngineRef;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{ensure, OptionExt, ResultExt};
40use table::metadata::TableInfo;
41use table::table::adapter::DfTableProviderAdapter;
42use table::TableRef;
43
44use crate::error::{
45 BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
46 ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
47 MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
48};
49use crate::etl::{parse, Content, Pipeline};
50use crate::manager::pipeline_cache::PipelineCache;
51use crate::manager::{PipelineInfo, PipelineVersion};
52use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
53use crate::util::prepare_dataframe_conditions;
54
55pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
56pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
57const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
58const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
59const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
60pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
61pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
62
63pub struct PipelineTable {
66 inserter: InserterRef,
67 statement_executor: StatementExecutorRef,
68 table: TableRef,
69 query_engine: QueryEngineRef,
70 cache: PipelineCache,
71}
72
73impl PipelineTable {
74 pub fn new(
76 inserter: InserterRef,
77 statement_executor: StatementExecutorRef,
78 table: TableRef,
79 query_engine: QueryEngineRef,
80 ) -> Self {
81 Self {
82 inserter,
83 statement_executor,
84 table,
85 query_engine,
86 cache: PipelineCache::new(),
87 }
88 }
89
90 pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
93 (
94 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
95 vec![
96 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
97 PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
98 PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
99 ],
100 vec![
101 ColumnDef {
102 name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
103 data_type: ColumnDataType::String as i32,
104 is_nullable: false,
105 default_constraint: vec![],
106 semantic_type: SemanticType::Tag as i32,
107 comment: "".to_string(),
108 datatype_extension: None,
109 options: None,
110 },
111 ColumnDef {
112 name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
113 data_type: ColumnDataType::String as i32,
114 is_nullable: false,
115 default_constraint: vec![],
116 semantic_type: SemanticType::Tag as i32,
117 comment: "".to_string(),
118 datatype_extension: None,
119 options: None,
120 },
121 ColumnDef {
122 name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
123 data_type: ColumnDataType::String as i32,
124 is_nullable: false,
125 default_constraint: vec![],
126 semantic_type: SemanticType::Tag as i32,
127 comment: "".to_string(),
128 datatype_extension: None,
129 options: None,
130 },
131 ColumnDef {
132 name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
133 data_type: ColumnDataType::String as i32,
134 is_nullable: false,
135 default_constraint: vec![],
136 semantic_type: SemanticType::Field as i32,
137 comment: "".to_string(),
138 datatype_extension: None,
139 options: None,
140 },
141 ColumnDef {
142 name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
143 data_type: ColumnDataType::TimestampNanosecond as i32,
144 is_nullable: false,
145 default_constraint: vec![],
146 semantic_type: SemanticType::Timestamp as i32,
147 comment: "".to_string(),
148 datatype_extension: None,
149 options: None,
150 },
151 ],
152 )
153 }
154
155 fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
157 vec![
158 PbColumnSchema {
159 column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
160 datatype: ColumnDataType::String.into(),
161 semantic_type: SemanticType::Tag.into(),
162 ..Default::default()
163 },
164 PbColumnSchema {
165 column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
166 datatype: ColumnDataType::String.into(),
167 semantic_type: SemanticType::Tag.into(),
168 ..Default::default()
169 },
170 PbColumnSchema {
171 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
172 datatype: ColumnDataType::String.into(),
173 semantic_type: SemanticType::Tag.into(),
174 ..Default::default()
175 },
176 PbColumnSchema {
177 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
178 datatype: ColumnDataType::String.into(),
179 semantic_type: SemanticType::Field.into(),
180 ..Default::default()
181 },
182 PbColumnSchema {
183 column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
184 datatype: ColumnDataType::TimestampNanosecond.into(),
185 semantic_type: SemanticType::Timestamp.into(),
186 ..Default::default()
187 },
188 ]
189 }
190
191 fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
192 QueryContextBuilder::default()
193 .current_catalog(table_info.catalog_name.to_string())
194 .current_schema(table_info.schema_name.to_string())
195 .build()
196 .into()
197 }
198
199 pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
201 let yaml_content = Content::Yaml(pipeline);
202 parse(&yaml_content)
203 }
204
205 async fn insert_pipeline_to_pipeline_table(
207 &self,
208 name: &str,
209 content_type: &str,
210 pipeline: &str,
211 ) -> Result<Timestamp> {
212 let now = Timestamp::current_time(TimeUnit::Nanosecond);
213
214 let table_info = self.table.table_info();
215
216 let insert = RowInsertRequest {
217 table_name: PIPELINE_TABLE_NAME.to_string(),
218 rows: Some(Rows {
219 schema: Self::build_insert_column_schemas(),
220 rows: vec![Row {
221 values: vec![
222 ValueData::StringValue(name.to_string()).into(),
223 ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
224 ValueData::StringValue(content_type.to_string()).into(),
225 ValueData::StringValue(pipeline.to_string()).into(),
226 ValueData::TimestampNanosecondValue(now.value()).into(),
227 ],
228 }],
229 }),
230 };
231
232 let requests = RowInsertRequests {
233 inserts: vec![insert],
234 };
235
236 let output = self
237 .inserter
238 .handle_row_inserts(
239 requests,
240 Self::query_ctx(&table_info),
241 &self.statement_executor,
242 false,
243 false,
244 )
245 .await
246 .context(InsertPipelineSnafu)?;
247
248 info!(
249 "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
250 name,
251 table_info.full_table_name(),
252 output
253 );
254
255 Ok(now)
256 }
257
258 pub async fn get_pipeline(
261 &self,
262 schema: &str,
263 name: &str,
264 version: PipelineVersion,
265 ) -> Result<Arc<Pipeline>> {
266 if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
267 return Ok(pipeline);
268 }
269
270 let pipeline = self.get_pipeline_str(schema, name, version).await?;
271 let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
272
273 self.cache
274 .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
275 Ok(compiled_pipeline)
276 }
277
278 pub async fn get_pipeline_str(
281 &self,
282 schema: &str,
283 name: &str,
284 version: PipelineVersion,
285 ) -> Result<(String, TimestampNanosecond)> {
286 if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
287 return Ok(pipeline);
288 }
289
290 let mut pipeline_vec;
291 match self.find_pipeline(name, version).await {
292 Ok(p) => {
293 METRIC_PIPELINE_TABLE_FIND_COUNT
294 .with_label_values(&["true"])
295 .inc();
296 pipeline_vec = p;
297 }
298 Err(e) => {
299 match e {
300 Error::CollectRecords { .. } => {
301 METRIC_PIPELINE_TABLE_FIND_COUNT
304 .with_label_values(&["false"])
305 .inc();
306 return self
307 .cache
308 .get_failover_cache(schema, name, version)?
309 .ok_or(PipelineNotFoundSnafu { name, version }.build());
310 }
311 _ => {
312 return Err(e);
314 }
315 }
316 }
317 };
318 ensure!(
319 !pipeline_vec.is_empty(),
320 PipelineNotFoundSnafu { name, version }
321 );
322
323 if pipeline_vec.len() == 1 {
325 let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
326 let p = (pipeline_content, version);
327 self.cache.insert_pipeline_str_cache(
328 &found_schema,
329 name,
330 Some(version),
331 p.clone(),
332 false,
333 );
334 return Ok(p);
335 }
336
337 let pipeline = pipeline_vec
340 .iter()
341 .find(|v| v.1 == EMPTY_SCHEMA_NAME)
342 .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
343
344 let (pipeline_content, found_schema, version) =
347 pipeline.context(MultiPipelineWithDiffSchemaSnafu {
348 schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
349 })?;
350
351 let v = *version;
352 let p = (pipeline_content.clone(), v);
353 self.cache
354 .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
355 Ok(p)
356 }
357
358 pub async fn insert_and_compile(
362 &self,
363 name: &str,
364 content_type: &str,
365 pipeline: &str,
366 ) -> Result<PipelineInfo> {
367 let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
368 let version = self
370 .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
371 .await?;
372
373 {
374 self.cache.insert_pipeline_cache(
375 EMPTY_SCHEMA_NAME,
376 name,
377 Some(TimestampNanosecond(version)),
378 compiled_pipeline.clone(),
379 true,
380 );
381
382 self.cache.insert_pipeline_str_cache(
383 EMPTY_SCHEMA_NAME,
384 name,
385 Some(TimestampNanosecond(version)),
386 (pipeline.to_owned(), TimestampNanosecond(version)),
387 true,
388 );
389 }
390
391 Ok((version, compiled_pipeline))
392 }
393
394 pub async fn delete_pipeline(
395 &self,
396 name: &str,
397 version: PipelineVersion,
398 ) -> Result<Option<()>> {
399 ensure!(
401 version.is_some(),
402 InvalidPipelineVersionSnafu { version: "None" }
403 );
404
405 let pipeline = self.find_pipeline(name, version).await?;
407 if pipeline.is_empty() {
408 return Ok(None);
409 }
410
411 let dataframe = self
413 .query_engine
414 .read_table(self.table.clone())
415 .context(DataFrameSnafu)?;
416 let DataFrame::DataFusion(dataframe) = dataframe;
417
418 let dataframe = dataframe
419 .filter(prepare_dataframe_conditions(name, version))
420 .context(BuildDfLogicalPlanSnafu)?;
421
422 let table_info = self.table.table_info();
424 let table_name = TableReference::full(
425 table_info.catalog_name.clone(),
426 table_info.schema_name.clone(),
427 table_info.name.clone(),
428 );
429
430 let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
431 let table_source = Arc::new(DefaultTableSource::new(table_provider));
432
433 let stmt = DmlStatement::new(
435 table_name,
436 table_source,
437 datafusion_expr::WriteOp::Delete,
438 Arc::new(dataframe.into_parts().1),
439 );
440
441 let plan = LogicalPlan::Dml(stmt);
442
443 let output = self
445 .query_engine
446 .execute(plan, Self::query_ctx(&table_info))
447 .await
448 .context(ExecuteInternalStatementSnafu)?;
449
450 info!(
451 "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
452 name,
453 version,
454 table_info.full_table_name(),
455 output
456 );
457
458 self.cache.remove_cache(name, version);
460
461 Ok(Some(()))
462 }
463
464 async fn find_pipeline(
468 &self,
469 name: &str,
470 version: PipelineVersion,
471 ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
472 let dataframe = self
474 .query_engine
475 .read_table(self.table.clone())
476 .context(DataFrameSnafu)?;
477 let DataFrame::DataFusion(dataframe) = dataframe;
478
479 let dataframe = dataframe
481 .filter(prepare_dataframe_conditions(name, version))
482 .context(BuildDfLogicalPlanSnafu)?
483 .select_columns(&[
484 PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
485 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
486 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
487 ])
488 .context(BuildDfLogicalPlanSnafu)?
489 .sort(vec![
490 col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
491 ])
492 .context(BuildDfLogicalPlanSnafu)?;
493
494 let plan = dataframe.into_parts().1;
495
496 let table_info = self.table.table_info();
497
498 debug!("find_pipeline_by_name: plan: {:?}", plan);
499
500 let output = self
502 .query_engine
503 .execute(plan, Self::query_ctx(&table_info))
504 .await
505 .context(ExecuteInternalStatementSnafu)?;
506 let stream = match output.data {
507 OutputData::Stream(stream) => stream,
508 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
509 _ => unreachable!(),
510 };
511
512 let records = record_util::collect(stream)
514 .await
515 .context(CollectRecordsSnafu)?;
516
517 if records.is_empty() {
518 return Ok(vec![]);
519 }
520
521 ensure!(
522 !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
523 PipelineNotFoundSnafu { name, version }
524 );
525
526 let mut re = Vec::with_capacity(records.len());
527 for r in records {
528 let pipeline_content_column = r.column(0);
529 let pipeline_content = pipeline_content_column
530 .as_any()
531 .downcast_ref::<StringVector>()
532 .with_context(|| CastTypeSnafu {
533 msg: format!(
534 "can't downcast {:?} array into string vector",
535 pipeline_content_column.data_type()
536 ),
537 })?;
538
539 let pipeline_schema_column = r.column(1);
540 let pipeline_schema = pipeline_schema_column
541 .as_any()
542 .downcast_ref::<StringVector>()
543 .with_context(|| CastTypeSnafu {
544 msg: format!(
545 "can't downcast {:?} array into string vector",
546 pipeline_schema_column.data_type()
547 ),
548 })?;
549
550 let pipeline_created_at_column = r.column(2);
551 let pipeline_created_at = pipeline_created_at_column
552 .as_any()
553 .downcast_ref::<TimestampNanosecondVector>()
554 .with_context(|| CastTypeSnafu {
555 msg: format!(
556 "can't downcast {:?} array into scalar vector",
557 pipeline_created_at_column.data_type()
558 ),
559 })?;
560
561 debug!(
562 "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
563 pipeline_content, pipeline_schema, pipeline_created_at
564 );
565
566 ensure!(
567 pipeline_content.len() == pipeline_schema.len()
568 && pipeline_schema.len() == pipeline_created_at.len(),
569 RecordBatchLenNotMatchSnafu
570 );
571
572 let len = pipeline_content.len();
573 for i in 0..len {
574 re.push((
575 pipeline_content.get_data(i).unwrap().to_string(),
576 pipeline_schema.get_data(i).unwrap().to_string(),
577 pipeline_created_at.get_data(i).unwrap(),
578 ));
579 }
580 }
581
582 Ok(re)
583 }
584}