1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20 RowInsertRequests, Rows, SemanticType,
21};
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::TimestampNanosecondType;
24use common_query::OutputData;
25use common_recordbatch::util as record_util;
26use common_telemetry::{debug, info};
27use common_time::timestamp::{TimeUnit, Timestamp};
28use datafusion::datasource::DefaultTableSource;
29use datafusion::logical_expr::col;
30use datafusion_common::TableReference;
31use datafusion_expr::{DmlStatement, LogicalPlan};
32use datatypes::timestamp::TimestampNanosecond;
33use itertools::Itertools;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::QueryEngineRef;
37use session::context::{QueryContextBuilder, QueryContextRef};
38use snafu::{OptionExt, ResultExt, ensure};
39use table::TableRef;
40use table::metadata::TableInfo;
41use table::table::adapter::DfTableProviderAdapter;
42
43use crate::error::{
44 BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
45 ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
46 MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
47};
48use crate::etl::{Content, Pipeline, parse};
49use crate::manager::pipeline_cache::{PipelineCache, PipelineContent};
50use crate::manager::{PipelineInfo, PipelineVersion};
51use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
52use crate::util::prepare_dataframe_conditions;
53
54pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
55pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
56const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
57const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
58const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
59pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
60pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
61
62pub struct PipelineTable {
65 inserter: InserterRef,
66 statement_executor: StatementExecutorRef,
67 table: TableRef,
68 query_engine: QueryEngineRef,
69 cache: PipelineCache,
70}
71
72impl PipelineTable {
73 pub fn new(
75 inserter: InserterRef,
76 statement_executor: StatementExecutorRef,
77 table: TableRef,
78 query_engine: QueryEngineRef,
79 ) -> Self {
80 Self {
81 inserter,
82 statement_executor,
83 table,
84 query_engine,
85 cache: PipelineCache::new(),
86 }
87 }
88
89 pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
92 (
93 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
94 vec![
95 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
96 PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
97 PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
98 ],
99 vec![
100 ColumnDef {
101 name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
102 data_type: ColumnDataType::String as i32,
103 is_nullable: false,
104 default_constraint: vec![],
105 semantic_type: SemanticType::Tag as i32,
106 comment: "".to_string(),
107 datatype_extension: None,
108 options: None,
109 },
110 ColumnDef {
111 name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
112 data_type: ColumnDataType::String as i32,
113 is_nullable: false,
114 default_constraint: vec![],
115 semantic_type: SemanticType::Tag as i32,
116 comment: "".to_string(),
117 datatype_extension: None,
118 options: None,
119 },
120 ColumnDef {
121 name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
122 data_type: ColumnDataType::String as i32,
123 is_nullable: false,
124 default_constraint: vec![],
125 semantic_type: SemanticType::Tag as i32,
126 comment: "".to_string(),
127 datatype_extension: None,
128 options: None,
129 },
130 ColumnDef {
131 name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
132 data_type: ColumnDataType::String as i32,
133 is_nullable: false,
134 default_constraint: vec![],
135 semantic_type: SemanticType::Field as i32,
136 comment: "".to_string(),
137 datatype_extension: None,
138 options: None,
139 },
140 ColumnDef {
141 name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
142 data_type: ColumnDataType::TimestampNanosecond as i32,
143 is_nullable: false,
144 default_constraint: vec![],
145 semantic_type: SemanticType::Timestamp as i32,
146 comment: "".to_string(),
147 datatype_extension: None,
148 options: None,
149 },
150 ],
151 )
152 }
153
154 fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
156 vec![
157 PbColumnSchema {
158 column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
159 datatype: ColumnDataType::String.into(),
160 semantic_type: SemanticType::Tag.into(),
161 ..Default::default()
162 },
163 PbColumnSchema {
164 column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
165 datatype: ColumnDataType::String.into(),
166 semantic_type: SemanticType::Tag.into(),
167 ..Default::default()
168 },
169 PbColumnSchema {
170 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
171 datatype: ColumnDataType::String.into(),
172 semantic_type: SemanticType::Tag.into(),
173 ..Default::default()
174 },
175 PbColumnSchema {
176 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
177 datatype: ColumnDataType::String.into(),
178 semantic_type: SemanticType::Field.into(),
179 ..Default::default()
180 },
181 PbColumnSchema {
182 column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
183 datatype: ColumnDataType::TimestampNanosecond.into(),
184 semantic_type: SemanticType::Timestamp.into(),
185 ..Default::default()
186 },
187 ]
188 }
189
190 fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
191 QueryContextBuilder::default()
192 .current_catalog(table_info.catalog_name.clone())
193 .current_schema(table_info.schema_name.clone())
194 .build()
195 .into()
196 }
197
198 pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
200 let yaml_content = Content::Yaml(pipeline);
201 parse(&yaml_content)
202 }
203
204 async fn insert_pipeline_to_pipeline_table(
206 &self,
207 name: &str,
208 content_type: &str,
209 pipeline: &str,
210 ) -> Result<Timestamp> {
211 let now = Timestamp::current_time(TimeUnit::Nanosecond);
212
213 let table_info = self.table.table_info();
214
215 let insert = RowInsertRequest {
216 table_name: PIPELINE_TABLE_NAME.to_string(),
217 rows: Some(Rows {
218 schema: Self::build_insert_column_schemas(),
219 rows: vec![Row {
220 values: vec![
221 ValueData::StringValue(name.to_string()).into(),
222 ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
223 ValueData::StringValue(content_type.to_string()).into(),
224 ValueData::StringValue(pipeline.to_string()).into(),
225 ValueData::TimestampNanosecondValue(now.value()).into(),
226 ],
227 }],
228 }),
229 };
230
231 let requests = RowInsertRequests {
232 inserts: vec![insert],
233 };
234
235 let output = self
236 .inserter
237 .handle_row_inserts(
238 requests,
239 Self::query_ctx(&table_info),
240 &self.statement_executor,
241 false,
242 false,
243 )
244 .await
245 .context(InsertPipelineSnafu)?;
246
247 info!(
248 "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
249 name,
250 table_info.full_table_name(),
251 output
252 );
253
254 Ok(now)
255 }
256
257 pub async fn get_pipeline(
260 &self,
261 schema: &str,
262 name: &str,
263 input_version: PipelineVersion,
264 ) -> Result<Arc<Pipeline>> {
265 if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, input_version)? {
266 return Ok(pipeline);
267 }
268
269 let pipeline_content = self.get_pipeline_str(schema, name, input_version).await?;
270 let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline_content.content)?);
271
272 self.cache.insert_pipeline_cache(
273 &pipeline_content.schema,
274 name,
275 Some(pipeline_content.version),
276 compiled_pipeline.clone(),
277 input_version.is_none(),
278 );
279 Ok(compiled_pipeline)
280 }
281
282 pub async fn get_pipeline_str(
285 &self,
286 schema: &str,
287 name: &str,
288 input_version: PipelineVersion,
289 ) -> Result<PipelineContent> {
290 if let Some(pipeline) = self
291 .cache
292 .get_pipeline_str_cache(schema, name, input_version)?
293 {
294 return Ok(pipeline);
295 }
296
297 let mut pipeline_vec;
298 match self.find_pipeline(name, input_version).await {
299 Ok(p) => {
300 METRIC_PIPELINE_TABLE_FIND_COUNT
301 .with_label_values(&["true"])
302 .inc();
303 pipeline_vec = p;
304 }
305 Err(e) => {
306 match e {
307 Error::CollectRecords { .. } => {
308 METRIC_PIPELINE_TABLE_FIND_COUNT
311 .with_label_values(&["false"])
312 .inc();
313 return self
314 .cache
315 .get_failover_cache(schema, name, input_version)?
316 .context(PipelineNotFoundSnafu {
317 name,
318 version: input_version,
319 });
320 }
321 _ => {
322 return Err(e);
324 }
325 }
326 }
327 };
328 ensure!(
329 !pipeline_vec.is_empty(),
330 PipelineNotFoundSnafu {
331 name,
332 version: input_version
333 }
334 );
335
336 if pipeline_vec.len() == 1 {
338 let pipeline_content = pipeline_vec.remove(0);
339
340 self.cache
341 .insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
342 return Ok(pipeline_content);
343 }
344
345 let pipeline = pipeline_vec
348 .iter()
349 .position(|v| v.schema == EMPTY_SCHEMA_NAME)
350 .or_else(|| pipeline_vec.iter().position(|v| v.schema == schema))
351 .map(|idx| pipeline_vec.remove(idx));
352
353 let pipeline_content = pipeline.with_context(|| MultiPipelineWithDiffSchemaSnafu {
356 name: name.to_string(),
357 current_schema: schema.to_string(),
358 schemas: pipeline_vec.iter().map(|v| v.schema.clone()).join(","),
359 })?;
360
361 self.cache
362 .insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
363 Ok(pipeline_content)
364 }
365
366 pub async fn insert_and_compile(
370 &self,
371 name: &str,
372 content_type: &str,
373 pipeline: &str,
374 ) -> Result<PipelineInfo> {
375 let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
376 let version = self
378 .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
379 .await?;
380
381 {
382 self.cache.insert_pipeline_cache(
383 EMPTY_SCHEMA_NAME,
384 name,
385 Some(TimestampNanosecond(version)),
386 compiled_pipeline.clone(),
387 true,
388 );
389
390 let pipeline_content = PipelineContent {
391 name: name.to_string(),
392 content: pipeline.to_string(),
393 version: TimestampNanosecond(version),
394 schema: EMPTY_SCHEMA_NAME.to_string(),
395 };
396
397 self.cache
398 .insert_pipeline_str_cache(&pipeline_content, true);
399 }
400
401 Ok((version, compiled_pipeline))
402 }
403
404 pub async fn delete_pipeline(
405 &self,
406 name: &str,
407 version: PipelineVersion,
408 ) -> Result<Option<()>> {
409 ensure!(
411 version.is_some(),
412 InvalidPipelineVersionSnafu { version: "None" }
413 );
414
415 let pipeline = self.find_pipeline(name, version).await?;
417 if pipeline.is_empty() {
418 return Ok(None);
419 }
420
421 let dataframe = self
423 .query_engine
424 .read_table(self.table.clone())
425 .context(DataFrameSnafu)?;
426
427 let dataframe = dataframe
428 .filter(prepare_dataframe_conditions(name, version))
429 .context(BuildDfLogicalPlanSnafu)?;
430
431 let table_info = self.table.table_info();
433 let table_name = TableReference::full(
434 table_info.catalog_name.clone(),
435 table_info.schema_name.clone(),
436 table_info.name.clone(),
437 );
438
439 let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
440 let table_source = Arc::new(DefaultTableSource::new(table_provider));
441
442 let stmt = DmlStatement::new(
444 table_name,
445 table_source,
446 datafusion_expr::WriteOp::Delete,
447 Arc::new(dataframe.into_parts().1),
448 );
449
450 let plan = LogicalPlan::Dml(stmt);
451
452 let output = self
454 .query_engine
455 .execute(plan, Self::query_ctx(&table_info))
456 .await
457 .context(ExecuteInternalStatementSnafu)?;
458
459 info!(
460 "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
461 name,
462 version,
463 table_info.full_table_name(),
464 output
465 );
466
467 self.cache.remove_cache(name, version);
469
470 Ok(Some(()))
471 }
472
473 async fn find_pipeline(
477 &self,
478 name: &str,
479 version: PipelineVersion,
480 ) -> Result<Vec<PipelineContent>> {
481 let dataframe = self
483 .query_engine
484 .read_table(self.table.clone())
485 .context(DataFrameSnafu)?;
486
487 let dataframe = dataframe
489 .filter(prepare_dataframe_conditions(name, version))
490 .context(BuildDfLogicalPlanSnafu)?
491 .select_columns(&[
492 PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
493 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
494 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
495 ])
496 .context(BuildDfLogicalPlanSnafu)?
497 .sort(vec![
498 col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true),
499 ])
500 .context(BuildDfLogicalPlanSnafu)?;
501
502 let plan = dataframe.into_parts().1;
503
504 let table_info = self.table.table_info();
505
506 debug!("find_pipeline_by_name: plan: {:?}", plan);
507
508 let output = self
510 .query_engine
511 .execute(plan, Self::query_ctx(&table_info))
512 .await
513 .context(ExecuteInternalStatementSnafu)?;
514 let stream = match output.data {
515 OutputData::Stream(stream) => stream,
516 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
517 _ => unreachable!(),
518 };
519
520 let records = record_util::collect(stream)
522 .await
523 .context(CollectRecordsSnafu)?;
524
525 if records.is_empty() {
526 return Ok(vec![]);
527 }
528
529 ensure!(
530 !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
531 PipelineNotFoundSnafu { name, version }
532 );
533
534 let mut re = Vec::with_capacity(records.len());
535 for r in records {
536 let pipeline_content_column = r.column(0);
537 let pipeline_content = pipeline_content_column
538 .as_string_opt::<i32>()
539 .with_context(|| CastTypeSnafu {
540 msg: format!(
541 "can't downcast {:?} array into string vector",
542 pipeline_content_column.data_type()
543 ),
544 })?;
545
546 let pipeline_schema_column = r.column(1);
547 let pipeline_schema =
548 pipeline_schema_column
549 .as_string_opt::<i32>()
550 .with_context(|| CastTypeSnafu {
551 msg: format!(
552 "expecting pipeline schema column of type string, actual: {}",
553 pipeline_schema_column.data_type()
554 ),
555 })?;
556
557 let pipeline_created_at_column = r.column(2);
558 let pipeline_created_at = pipeline_created_at_column
559 .as_primitive_opt::<TimestampNanosecondType>()
560 .with_context(|| CastTypeSnafu {
561 msg: format!(
562 "can't downcast {:?} array into scalar vector",
563 pipeline_created_at_column.data_type()
564 ),
565 })?;
566
567 debug!(
568 "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
569 pipeline_content, pipeline_schema, pipeline_created_at
570 );
571
572 ensure!(
573 pipeline_content.len() == pipeline_schema.len()
574 && pipeline_schema.len() == pipeline_created_at.len(),
575 RecordBatchLenNotMatchSnafu
576 );
577
578 let len = pipeline_content.len();
579 for i in 0..len {
580 re.push(PipelineContent {
581 name: name.to_string(),
582 content: pipeline_content.value(i).to_string(),
583 version: TimestampNanosecond::new(pipeline_created_at.value(i)),
584 schema: pipeline_schema.value(i).to_string(),
585 });
586 }
587 }
588
589 Ok(re)
590 }
591}