1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20 RowInsertRequests, Rows, SemanticType,
21};
22use arrow::array::{Array, AsArray};
23use arrow::datatypes::TimestampNanosecondType;
24use common_query::OutputData;
25use common_recordbatch::util as record_util;
26use common_telemetry::{debug, info};
27use common_time::timestamp::{TimeUnit, Timestamp};
28use datafusion::datasource::DefaultTableSource;
29use datafusion::logical_expr::col;
30use datafusion_common::TableReference;
31use datafusion_expr::{DmlStatement, LogicalPlan};
32use datatypes::timestamp::TimestampNanosecond;
33use itertools::Itertools;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::QueryEngineRef;
37use session::context::{QueryContextBuilder, QueryContextRef};
38use snafu::{OptionExt, ResultExt, ensure};
39use table::TableRef;
40use table::metadata::TableInfo;
41use table::table::adapter::DfTableProviderAdapter;
42
43use crate::error::{
44 BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, Error,
45 ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
46 MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
47};
48use crate::etl::{Content, Pipeline, parse};
49use crate::manager::pipeline_cache::PipelineCache;
50use crate::manager::{PipelineInfo, PipelineVersion};
51use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
52use crate::util::prepare_dataframe_conditions;
53
54pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
55pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
56const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
57const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
58const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
59pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
60pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
61
62pub struct PipelineTable {
65 inserter: InserterRef,
66 statement_executor: StatementExecutorRef,
67 table: TableRef,
68 query_engine: QueryEngineRef,
69 cache: PipelineCache,
70}
71
72impl PipelineTable {
73 pub fn new(
75 inserter: InserterRef,
76 statement_executor: StatementExecutorRef,
77 table: TableRef,
78 query_engine: QueryEngineRef,
79 ) -> Self {
80 Self {
81 inserter,
82 statement_executor,
83 table,
84 query_engine,
85 cache: PipelineCache::new(),
86 }
87 }
88
89 pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
92 (
93 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
94 vec![
95 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
96 PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
97 PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
98 ],
99 vec![
100 ColumnDef {
101 name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
102 data_type: ColumnDataType::String as i32,
103 is_nullable: false,
104 default_constraint: vec![],
105 semantic_type: SemanticType::Tag as i32,
106 comment: "".to_string(),
107 datatype_extension: None,
108 options: None,
109 },
110 ColumnDef {
111 name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
112 data_type: ColumnDataType::String as i32,
113 is_nullable: false,
114 default_constraint: vec![],
115 semantic_type: SemanticType::Tag as i32,
116 comment: "".to_string(),
117 datatype_extension: None,
118 options: None,
119 },
120 ColumnDef {
121 name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
122 data_type: ColumnDataType::String as i32,
123 is_nullable: false,
124 default_constraint: vec![],
125 semantic_type: SemanticType::Tag as i32,
126 comment: "".to_string(),
127 datatype_extension: None,
128 options: None,
129 },
130 ColumnDef {
131 name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
132 data_type: ColumnDataType::String as i32,
133 is_nullable: false,
134 default_constraint: vec![],
135 semantic_type: SemanticType::Field as i32,
136 comment: "".to_string(),
137 datatype_extension: None,
138 options: None,
139 },
140 ColumnDef {
141 name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
142 data_type: ColumnDataType::TimestampNanosecond as i32,
143 is_nullable: false,
144 default_constraint: vec![],
145 semantic_type: SemanticType::Timestamp as i32,
146 comment: "".to_string(),
147 datatype_extension: None,
148 options: None,
149 },
150 ],
151 )
152 }
153
154 fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
156 vec![
157 PbColumnSchema {
158 column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
159 datatype: ColumnDataType::String.into(),
160 semantic_type: SemanticType::Tag.into(),
161 ..Default::default()
162 },
163 PbColumnSchema {
164 column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
165 datatype: ColumnDataType::String.into(),
166 semantic_type: SemanticType::Tag.into(),
167 ..Default::default()
168 },
169 PbColumnSchema {
170 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
171 datatype: ColumnDataType::String.into(),
172 semantic_type: SemanticType::Tag.into(),
173 ..Default::default()
174 },
175 PbColumnSchema {
176 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
177 datatype: ColumnDataType::String.into(),
178 semantic_type: SemanticType::Field.into(),
179 ..Default::default()
180 },
181 PbColumnSchema {
182 column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
183 datatype: ColumnDataType::TimestampNanosecond.into(),
184 semantic_type: SemanticType::Timestamp.into(),
185 ..Default::default()
186 },
187 ]
188 }
189
190 fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
191 QueryContextBuilder::default()
192 .current_catalog(table_info.catalog_name.clone())
193 .current_schema(table_info.schema_name.clone())
194 .build()
195 .into()
196 }
197
198 pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
200 let yaml_content = Content::Yaml(pipeline);
201 parse(&yaml_content)
202 }
203
204 async fn insert_pipeline_to_pipeline_table(
206 &self,
207 name: &str,
208 content_type: &str,
209 pipeline: &str,
210 ) -> Result<Timestamp> {
211 let now = Timestamp::current_time(TimeUnit::Nanosecond);
212
213 let table_info = self.table.table_info();
214
215 let insert = RowInsertRequest {
216 table_name: PIPELINE_TABLE_NAME.to_string(),
217 rows: Some(Rows {
218 schema: Self::build_insert_column_schemas(),
219 rows: vec![Row {
220 values: vec![
221 ValueData::StringValue(name.to_string()).into(),
222 ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
223 ValueData::StringValue(content_type.to_string()).into(),
224 ValueData::StringValue(pipeline.to_string()).into(),
225 ValueData::TimestampNanosecondValue(now.value()).into(),
226 ],
227 }],
228 }),
229 };
230
231 let requests = RowInsertRequests {
232 inserts: vec![insert],
233 };
234
235 let output = self
236 .inserter
237 .handle_row_inserts(
238 requests,
239 Self::query_ctx(&table_info),
240 &self.statement_executor,
241 false,
242 false,
243 )
244 .await
245 .context(InsertPipelineSnafu)?;
246
247 info!(
248 "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
249 name,
250 table_info.full_table_name(),
251 output
252 );
253
254 Ok(now)
255 }
256
257 pub async fn get_pipeline(
260 &self,
261 schema: &str,
262 name: &str,
263 version: PipelineVersion,
264 ) -> Result<Arc<Pipeline>> {
265 if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
266 return Ok(pipeline);
267 }
268
269 let pipeline = self.get_pipeline_str(schema, name, version).await?;
270 let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
271
272 self.cache
273 .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
274 Ok(compiled_pipeline)
275 }
276
277 pub async fn get_pipeline_str(
280 &self,
281 schema: &str,
282 name: &str,
283 version: PipelineVersion,
284 ) -> Result<(String, TimestampNanosecond)> {
285 if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
286 return Ok(pipeline);
287 }
288
289 let mut pipeline_vec;
290 match self.find_pipeline(name, version).await {
291 Ok(p) => {
292 METRIC_PIPELINE_TABLE_FIND_COUNT
293 .with_label_values(&["true"])
294 .inc();
295 pipeline_vec = p;
296 }
297 Err(e) => {
298 match e {
299 Error::CollectRecords { .. } => {
300 METRIC_PIPELINE_TABLE_FIND_COUNT
303 .with_label_values(&["false"])
304 .inc();
305 return self
306 .cache
307 .get_failover_cache(schema, name, version)?
308 .ok_or(PipelineNotFoundSnafu { name, version }.build());
309 }
310 _ => {
311 return Err(e);
313 }
314 }
315 }
316 };
317 ensure!(
318 !pipeline_vec.is_empty(),
319 PipelineNotFoundSnafu { name, version }
320 );
321
322 if pipeline_vec.len() == 1 {
324 let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
325 let p = (pipeline_content, version);
326 self.cache.insert_pipeline_str_cache(
327 &found_schema,
328 name,
329 Some(version),
330 p.clone(),
331 false,
332 );
333 return Ok(p);
334 }
335
336 let pipeline = pipeline_vec
339 .iter()
340 .find(|v| v.1 == EMPTY_SCHEMA_NAME)
341 .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
342
343 let (pipeline_content, found_schema, version) =
346 pipeline.context(MultiPipelineWithDiffSchemaSnafu {
347 schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
348 })?;
349
350 let v = *version;
351 let p = (pipeline_content.clone(), v);
352 self.cache
353 .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
354 Ok(p)
355 }
356
357 pub async fn insert_and_compile(
361 &self,
362 name: &str,
363 content_type: &str,
364 pipeline: &str,
365 ) -> Result<PipelineInfo> {
366 let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
367 let version = self
369 .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
370 .await?;
371
372 {
373 self.cache.insert_pipeline_cache(
374 EMPTY_SCHEMA_NAME,
375 name,
376 Some(TimestampNanosecond(version)),
377 compiled_pipeline.clone(),
378 true,
379 );
380
381 self.cache.insert_pipeline_str_cache(
382 EMPTY_SCHEMA_NAME,
383 name,
384 Some(TimestampNanosecond(version)),
385 (pipeline.to_owned(), TimestampNanosecond(version)),
386 true,
387 );
388 }
389
390 Ok((version, compiled_pipeline))
391 }
392
393 pub async fn delete_pipeline(
394 &self,
395 name: &str,
396 version: PipelineVersion,
397 ) -> Result<Option<()>> {
398 ensure!(
400 version.is_some(),
401 InvalidPipelineVersionSnafu { version: "None" }
402 );
403
404 let pipeline = self.find_pipeline(name, version).await?;
406 if pipeline.is_empty() {
407 return Ok(None);
408 }
409
410 let dataframe = self
412 .query_engine
413 .read_table(self.table.clone())
414 .context(DataFrameSnafu)?;
415
416 let dataframe = dataframe
417 .filter(prepare_dataframe_conditions(name, version))
418 .context(BuildDfLogicalPlanSnafu)?;
419
420 let table_info = self.table.table_info();
422 let table_name = TableReference::full(
423 table_info.catalog_name.clone(),
424 table_info.schema_name.clone(),
425 table_info.name.clone(),
426 );
427
428 let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone()));
429 let table_source = Arc::new(DefaultTableSource::new(table_provider));
430
431 let stmt = DmlStatement::new(
433 table_name,
434 table_source,
435 datafusion_expr::WriteOp::Delete,
436 Arc::new(dataframe.into_parts().1),
437 );
438
439 let plan = LogicalPlan::Dml(stmt);
440
441 let output = self
443 .query_engine
444 .execute(plan, Self::query_ctx(&table_info))
445 .await
446 .context(ExecuteInternalStatementSnafu)?;
447
448 info!(
449 "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
450 name,
451 version,
452 table_info.full_table_name(),
453 output
454 );
455
456 self.cache.remove_cache(name, version);
458
459 Ok(Some(()))
460 }
461
462 async fn find_pipeline(
466 &self,
467 name: &str,
468 version: PipelineVersion,
469 ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
470 let dataframe = self
472 .query_engine
473 .read_table(self.table.clone())
474 .context(DataFrameSnafu)?;
475
476 let dataframe = dataframe
478 .filter(prepare_dataframe_conditions(name, version))
479 .context(BuildDfLogicalPlanSnafu)?
480 .select_columns(&[
481 PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
482 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
483 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
484 ])
485 .context(BuildDfLogicalPlanSnafu)?
486 .sort(vec![
487 col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true),
488 ])
489 .context(BuildDfLogicalPlanSnafu)?;
490
491 let plan = dataframe.into_parts().1;
492
493 let table_info = self.table.table_info();
494
495 debug!("find_pipeline_by_name: plan: {:?}", plan);
496
497 let output = self
499 .query_engine
500 .execute(plan, Self::query_ctx(&table_info))
501 .await
502 .context(ExecuteInternalStatementSnafu)?;
503 let stream = match output.data {
504 OutputData::Stream(stream) => stream,
505 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
506 _ => unreachable!(),
507 };
508
509 let records = record_util::collect(stream)
511 .await
512 .context(CollectRecordsSnafu)?;
513
514 if records.is_empty() {
515 return Ok(vec![]);
516 }
517
518 ensure!(
519 !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
520 PipelineNotFoundSnafu { name, version }
521 );
522
523 let mut re = Vec::with_capacity(records.len());
524 for r in records {
525 let pipeline_content_column = r.column(0);
526 let pipeline_content = pipeline_content_column
527 .as_string_opt::<i32>()
528 .with_context(|| CastTypeSnafu {
529 msg: format!(
530 "can't downcast {:?} array into string vector",
531 pipeline_content_column.data_type()
532 ),
533 })?;
534
535 let pipeline_schema_column = r.column(1);
536 let pipeline_schema =
537 pipeline_schema_column
538 .as_string_opt::<i32>()
539 .with_context(|| CastTypeSnafu {
540 msg: format!(
541 "expecting pipeline schema column of type string, actual: {}",
542 pipeline_schema_column.data_type()
543 ),
544 })?;
545
546 let pipeline_created_at_column = r.column(2);
547 let pipeline_created_at = pipeline_created_at_column
548 .as_primitive_opt::<TimestampNanosecondType>()
549 .with_context(|| CastTypeSnafu {
550 msg: format!(
551 "can't downcast {:?} array into scalar vector",
552 pipeline_created_at_column.data_type()
553 ),
554 })?;
555
556 debug!(
557 "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
558 pipeline_content, pipeline_schema, pipeline_created_at
559 );
560
561 ensure!(
562 pipeline_content.len() == pipeline_schema.len()
563 && pipeline_schema.len() == pipeline_created_at.len(),
564 RecordBatchLenNotMatchSnafu
565 );
566
567 let len = pipeline_content.len();
568 for i in 0..len {
569 re.push((
570 pipeline_content.value(i).to_string(),
571 pipeline_schema.value(i).to_string(),
572 TimestampNanosecond::new(pipeline_created_at.value(i)),
573 ));
574 }
575 }
576
577 Ok(re)
578 }
579}