1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20 RowInsertRequests, Rows, SemanticType,
21};
22use common_query::OutputData;
23use common_recordbatch::util as record_util;
24use common_telemetry::{debug, info};
25use common_time::timestamp::{TimeUnit, Timestamp};
26use datafusion_common::{TableReference, ToDFSchema};
27use datafusion_expr::{col, DmlStatement, LogicalPlan};
28use datatypes::prelude::ScalarVector;
29use datatypes::timestamp::TimestampNanosecond;
30use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
31use itertools::Itertools;
32use operator::insert::InserterRef;
33use operator::statement::StatementExecutorRef;
34use query::dataframe::DataFrame;
35use query::QueryEngineRef;
36use session::context::{QueryContextBuilder, QueryContextRef};
37use snafu::{ensure, OptionExt, ResultExt};
38use table::metadata::TableInfo;
39use table::TableRef;
40
41use crate::error::{
42 BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
43 ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
44 MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
45};
46use crate::etl::{parse, Content, Pipeline};
47use crate::manager::pipeline_cache::PipelineCache;
48use crate::manager::{PipelineInfo, PipelineVersion};
49use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
50use crate::util::prepare_dataframe_conditions;
51
52pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
53pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
54const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
55const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
56const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
57pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
58pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
59
60pub struct PipelineTable {
63 inserter: InserterRef,
64 statement_executor: StatementExecutorRef,
65 table: TableRef,
66 query_engine: QueryEngineRef,
67 cache: PipelineCache,
68}
69
70impl PipelineTable {
71 pub fn new(
73 inserter: InserterRef,
74 statement_executor: StatementExecutorRef,
75 table: TableRef,
76 query_engine: QueryEngineRef,
77 ) -> Self {
78 Self {
79 inserter,
80 statement_executor,
81 table,
82 query_engine,
83 cache: PipelineCache::new(),
84 }
85 }
86
87 pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
90 (
91 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
92 vec![
93 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
94 PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
95 PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
96 ],
97 vec![
98 ColumnDef {
99 name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
100 data_type: ColumnDataType::String as i32,
101 is_nullable: false,
102 default_constraint: vec![],
103 semantic_type: SemanticType::Tag as i32,
104 comment: "".to_string(),
105 datatype_extension: None,
106 options: None,
107 },
108 ColumnDef {
109 name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
110 data_type: ColumnDataType::String as i32,
111 is_nullable: false,
112 default_constraint: vec![],
113 semantic_type: SemanticType::Tag as i32,
114 comment: "".to_string(),
115 datatype_extension: None,
116 options: None,
117 },
118 ColumnDef {
119 name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
120 data_type: ColumnDataType::String as i32,
121 is_nullable: false,
122 default_constraint: vec![],
123 semantic_type: SemanticType::Tag as i32,
124 comment: "".to_string(),
125 datatype_extension: None,
126 options: None,
127 },
128 ColumnDef {
129 name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
130 data_type: ColumnDataType::String as i32,
131 is_nullable: false,
132 default_constraint: vec![],
133 semantic_type: SemanticType::Field as i32,
134 comment: "".to_string(),
135 datatype_extension: None,
136 options: None,
137 },
138 ColumnDef {
139 name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
140 data_type: ColumnDataType::TimestampNanosecond as i32,
141 is_nullable: false,
142 default_constraint: vec![],
143 semantic_type: SemanticType::Timestamp as i32,
144 comment: "".to_string(),
145 datatype_extension: None,
146 options: None,
147 },
148 ],
149 )
150 }
151
152 fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
154 vec![
155 PbColumnSchema {
156 column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
157 datatype: ColumnDataType::String.into(),
158 semantic_type: SemanticType::Tag.into(),
159 ..Default::default()
160 },
161 PbColumnSchema {
162 column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
163 datatype: ColumnDataType::String.into(),
164 semantic_type: SemanticType::Tag.into(),
165 ..Default::default()
166 },
167 PbColumnSchema {
168 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
169 datatype: ColumnDataType::String.into(),
170 semantic_type: SemanticType::Tag.into(),
171 ..Default::default()
172 },
173 PbColumnSchema {
174 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
175 datatype: ColumnDataType::String.into(),
176 semantic_type: SemanticType::Field.into(),
177 ..Default::default()
178 },
179 PbColumnSchema {
180 column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
181 datatype: ColumnDataType::TimestampNanosecond.into(),
182 semantic_type: SemanticType::Timestamp.into(),
183 ..Default::default()
184 },
185 ]
186 }
187
188 fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
189 QueryContextBuilder::default()
190 .current_catalog(table_info.catalog_name.to_string())
191 .current_schema(table_info.schema_name.to_string())
192 .build()
193 .into()
194 }
195
196 pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
198 let yaml_content = Content::Yaml(pipeline);
199 parse(&yaml_content)
200 }
201
202 async fn insert_pipeline_to_pipeline_table(
204 &self,
205 name: &str,
206 content_type: &str,
207 pipeline: &str,
208 ) -> Result<Timestamp> {
209 let now = Timestamp::current_time(TimeUnit::Nanosecond);
210
211 let table_info = self.table.table_info();
212
213 let insert = RowInsertRequest {
214 table_name: PIPELINE_TABLE_NAME.to_string(),
215 rows: Some(Rows {
216 schema: Self::build_insert_column_schemas(),
217 rows: vec![Row {
218 values: vec![
219 ValueData::StringValue(name.to_string()).into(),
220 ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
221 ValueData::StringValue(content_type.to_string()).into(),
222 ValueData::StringValue(pipeline.to_string()).into(),
223 ValueData::TimestampNanosecondValue(now.value()).into(),
224 ],
225 }],
226 }),
227 };
228
229 let requests = RowInsertRequests {
230 inserts: vec![insert],
231 };
232
233 let output = self
234 .inserter
235 .handle_row_inserts(
236 requests,
237 Self::query_ctx(&table_info),
238 &self.statement_executor,
239 false,
240 false,
241 )
242 .await
243 .context(InsertPipelineSnafu)?;
244
245 info!(
246 "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
247 name,
248 table_info.full_table_name(),
249 output
250 );
251
252 Ok(now)
253 }
254
255 pub async fn get_pipeline(
258 &self,
259 schema: &str,
260 name: &str,
261 version: PipelineVersion,
262 ) -> Result<Arc<Pipeline>> {
263 if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
264 return Ok(pipeline);
265 }
266
267 let pipeline = self.get_pipeline_str(schema, name, version).await?;
268 let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
269
270 self.cache
271 .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
272 Ok(compiled_pipeline)
273 }
274
275 pub async fn get_pipeline_str(
278 &self,
279 schema: &str,
280 name: &str,
281 version: PipelineVersion,
282 ) -> Result<(String, TimestampNanosecond)> {
283 if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
284 return Ok(pipeline);
285 }
286
287 let mut pipeline_vec;
288 match self.find_pipeline(name, version).await {
289 Ok(p) => {
290 METRIC_PIPELINE_TABLE_FIND_COUNT
291 .with_label_values(&["true"])
292 .inc();
293 pipeline_vec = p;
294 }
295 Err(e) => {
296 match e {
297 Error::CollectRecords { .. } => {
298 METRIC_PIPELINE_TABLE_FIND_COUNT
301 .with_label_values(&["false"])
302 .inc();
303 return self
304 .cache
305 .get_failover_cache(schema, name, version)?
306 .ok_or(PipelineNotFoundSnafu { name, version }.build());
307 }
308 _ => {
309 return Err(e);
311 }
312 }
313 }
314 };
315 ensure!(
316 !pipeline_vec.is_empty(),
317 PipelineNotFoundSnafu { name, version }
318 );
319
320 if pipeline_vec.len() == 1 {
322 let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
323 let p = (pipeline_content, version);
324 self.cache.insert_pipeline_str_cache(
325 &found_schema,
326 name,
327 Some(version),
328 p.clone(),
329 false,
330 );
331 return Ok(p);
332 }
333
334 let pipeline = pipeline_vec
337 .iter()
338 .find(|v| v.1 == EMPTY_SCHEMA_NAME)
339 .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
340
341 let (pipeline_content, found_schema, version) =
344 pipeline.context(MultiPipelineWithDiffSchemaSnafu {
345 schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
346 })?;
347
348 let v = *version;
349 let p = (pipeline_content.clone(), v);
350 self.cache
351 .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
352 Ok(p)
353 }
354
355 pub async fn insert_and_compile(
359 &self,
360 name: &str,
361 content_type: &str,
362 pipeline: &str,
363 ) -> Result<PipelineInfo> {
364 let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
365 let version = self
367 .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
368 .await?;
369
370 {
371 self.cache.insert_pipeline_cache(
372 EMPTY_SCHEMA_NAME,
373 name,
374 Some(TimestampNanosecond(version)),
375 compiled_pipeline.clone(),
376 true,
377 );
378
379 self.cache.insert_pipeline_str_cache(
380 EMPTY_SCHEMA_NAME,
381 name,
382 Some(TimestampNanosecond(version)),
383 (pipeline.to_owned(), TimestampNanosecond(version)),
384 true,
385 );
386 }
387
388 Ok((version, compiled_pipeline))
389 }
390
391 pub async fn delete_pipeline(
392 &self,
393 name: &str,
394 version: PipelineVersion,
395 ) -> Result<Option<()>> {
396 ensure!(
398 version.is_some(),
399 InvalidPipelineVersionSnafu { version: "None" }
400 );
401
402 let pipeline = self.find_pipeline(name, version).await?;
404 if pipeline.is_empty() {
405 return Ok(None);
406 }
407
408 let dataframe = self
410 .query_engine
411 .read_table(self.table.clone())
412 .context(DataFrameSnafu)?;
413 let DataFrame::DataFusion(dataframe) = dataframe;
414
415 let dataframe = dataframe
416 .filter(prepare_dataframe_conditions(name, version))
417 .context(BuildDfLogicalPlanSnafu)?;
418
419 let table_info = self.table.table_info();
421 let table_name = TableReference::full(
422 table_info.catalog_name.clone(),
423 table_info.schema_name.clone(),
424 table_info.name.clone(),
425 );
426
427 let df_schema = Arc::new(
428 table_info
429 .meta
430 .schema
431 .arrow_schema()
432 .clone()
433 .to_dfschema()
434 .context(BuildDfLogicalPlanSnafu)?,
435 );
436
437 let stmt = DmlStatement::new(
439 table_name,
440 df_schema,
441 datafusion_expr::WriteOp::Delete,
442 Arc::new(dataframe.into_parts().1),
443 );
444
445 let plan = LogicalPlan::Dml(stmt);
446
447 let output = self
449 .query_engine
450 .execute(plan, Self::query_ctx(&table_info))
451 .await
452 .context(ExecuteInternalStatementSnafu)?;
453
454 info!(
455 "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
456 name,
457 version,
458 table_info.full_table_name(),
459 output
460 );
461
462 self.cache.remove_cache(name, version);
464
465 Ok(Some(()))
466 }
467
468 async fn find_pipeline(
472 &self,
473 name: &str,
474 version: PipelineVersion,
475 ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
476 let dataframe = self
478 .query_engine
479 .read_table(self.table.clone())
480 .context(DataFrameSnafu)?;
481 let DataFrame::DataFusion(dataframe) = dataframe;
482
483 let dataframe = dataframe
485 .filter(prepare_dataframe_conditions(name, version))
486 .context(BuildDfLogicalPlanSnafu)?
487 .select_columns(&[
488 PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
489 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
490 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
491 ])
492 .context(BuildDfLogicalPlanSnafu)?
493 .sort(vec![
494 col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
495 ])
496 .context(BuildDfLogicalPlanSnafu)?;
497
498 let plan = dataframe.into_parts().1;
499
500 let table_info = self.table.table_info();
501
502 debug!("find_pipeline_by_name: plan: {:?}", plan);
503
504 let output = self
506 .query_engine
507 .execute(plan, Self::query_ctx(&table_info))
508 .await
509 .context(ExecuteInternalStatementSnafu)?;
510 let stream = match output.data {
511 OutputData::Stream(stream) => stream,
512 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
513 _ => unreachable!(),
514 };
515
516 let records = record_util::collect(stream)
518 .await
519 .context(CollectRecordsSnafu)?;
520
521 if records.is_empty() {
522 return Ok(vec![]);
523 }
524
525 ensure!(
526 !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
527 PipelineNotFoundSnafu { name, version }
528 );
529
530 let mut re = Vec::with_capacity(records.len());
531 for r in records {
532 let pipeline_content_column = r.column(0);
533 let pipeline_content = pipeline_content_column
534 .as_any()
535 .downcast_ref::<StringVector>()
536 .with_context(|| CastTypeSnafu {
537 msg: format!(
538 "can't downcast {:?} array into string vector",
539 pipeline_content_column.data_type()
540 ),
541 })?;
542
543 let pipeline_schema_column = r.column(1);
544 let pipeline_schema = pipeline_schema_column
545 .as_any()
546 .downcast_ref::<StringVector>()
547 .with_context(|| CastTypeSnafu {
548 msg: format!(
549 "can't downcast {:?} array into string vector",
550 pipeline_schema_column.data_type()
551 ),
552 })?;
553
554 let pipeline_created_at_column = r.column(2);
555 let pipeline_created_at = pipeline_created_at_column
556 .as_any()
557 .downcast_ref::<TimestampNanosecondVector>()
558 .with_context(|| CastTypeSnafu {
559 msg: format!(
560 "can't downcast {:?} array into scalar vector",
561 pipeline_created_at_column.data_type()
562 ),
563 })?;
564
565 debug!(
566 "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
567 pipeline_content, pipeline_schema, pipeline_created_at
568 );
569
570 ensure!(
571 pipeline_content.len() == pipeline_schema.len()
572 && pipeline_schema.len() == pipeline_created_at.len(),
573 RecordBatchLenNotMatchSnafu
574 );
575
576 let len = pipeline_content.len();
577 for i in 0..len {
578 re.push((
579 pipeline_content.get_data(i).unwrap().to_string(),
580 pipeline_schema.get_data(i).unwrap().to_string(),
581 pipeline_created_at.get_data(i).unwrap(),
582 ));
583 }
584 }
585
586 Ok(re)
587 }
588}