1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::{
19 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
20 RowInsertRequests, Rows, SemanticType,
21};
22use common_query::OutputData;
23use common_recordbatch::util as record_util;
24use common_telemetry::{debug, info};
25use common_time::timestamp::{TimeUnit, Timestamp};
26use datafusion_common::{TableReference, ToDFSchema};
27use datafusion_expr::{col, DmlStatement, LogicalPlan};
28use datatypes::prelude::ScalarVector;
29use datatypes::timestamp::TimestampNanosecond;
30use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
31use itertools::Itertools;
32use operator::insert::InserterRef;
33use operator::statement::StatementExecutorRef;
34use query::dataframe::DataFrame;
35use query::QueryEngineRef;
36use session::context::{QueryContextBuilder, QueryContextRef};
37use snafu::{ensure, OptionExt, ResultExt};
38use table::metadata::TableInfo;
39use table::TableRef;
40
41use crate::error::{
42 BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
43 ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
44 MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
45};
46use crate::etl::{parse, Content, Pipeline};
47use crate::manager::pipeline_cache::PipelineCache;
48use crate::manager::{PipelineInfo, PipelineVersion};
49use crate::util::prepare_dataframe_conditions;
50
51pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
52pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
53const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
54const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
55const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
56pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
57pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
58
59pub struct PipelineTable {
62 inserter: InserterRef,
63 statement_executor: StatementExecutorRef,
64 table: TableRef,
65 query_engine: QueryEngineRef,
66 cache: PipelineCache,
67}
68
69impl PipelineTable {
70 pub fn new(
72 inserter: InserterRef,
73 statement_executor: StatementExecutorRef,
74 table: TableRef,
75 query_engine: QueryEngineRef,
76 ) -> Self {
77 Self {
78 inserter,
79 statement_executor,
80 table,
81 query_engine,
82 cache: PipelineCache::new(),
83 }
84 }
85
86 pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
89 (
90 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
91 vec![
92 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
93 PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
94 PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
95 ],
96 vec![
97 ColumnDef {
98 name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
99 data_type: ColumnDataType::String as i32,
100 is_nullable: false,
101 default_constraint: vec![],
102 semantic_type: SemanticType::Tag as i32,
103 comment: "".to_string(),
104 datatype_extension: None,
105 options: None,
106 },
107 ColumnDef {
108 name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
109 data_type: ColumnDataType::String as i32,
110 is_nullable: false,
111 default_constraint: vec![],
112 semantic_type: SemanticType::Tag as i32,
113 comment: "".to_string(),
114 datatype_extension: None,
115 options: None,
116 },
117 ColumnDef {
118 name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
119 data_type: ColumnDataType::String as i32,
120 is_nullable: false,
121 default_constraint: vec![],
122 semantic_type: SemanticType::Tag as i32,
123 comment: "".to_string(),
124 datatype_extension: None,
125 options: None,
126 },
127 ColumnDef {
128 name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
129 data_type: ColumnDataType::String as i32,
130 is_nullable: false,
131 default_constraint: vec![],
132 semantic_type: SemanticType::Field as i32,
133 comment: "".to_string(),
134 datatype_extension: None,
135 options: None,
136 },
137 ColumnDef {
138 name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
139 data_type: ColumnDataType::TimestampNanosecond as i32,
140 is_nullable: false,
141 default_constraint: vec![],
142 semantic_type: SemanticType::Timestamp as i32,
143 comment: "".to_string(),
144 datatype_extension: None,
145 options: None,
146 },
147 ],
148 )
149 }
150
151 fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
153 vec![
154 PbColumnSchema {
155 column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
156 datatype: ColumnDataType::String.into(),
157 semantic_type: SemanticType::Tag.into(),
158 ..Default::default()
159 },
160 PbColumnSchema {
161 column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
162 datatype: ColumnDataType::String.into(),
163 semantic_type: SemanticType::Tag.into(),
164 ..Default::default()
165 },
166 PbColumnSchema {
167 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
168 datatype: ColumnDataType::String.into(),
169 semantic_type: SemanticType::Tag.into(),
170 ..Default::default()
171 },
172 PbColumnSchema {
173 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
174 datatype: ColumnDataType::String.into(),
175 semantic_type: SemanticType::Field.into(),
176 ..Default::default()
177 },
178 PbColumnSchema {
179 column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
180 datatype: ColumnDataType::TimestampNanosecond.into(),
181 semantic_type: SemanticType::Timestamp.into(),
182 ..Default::default()
183 },
184 ]
185 }
186
187 fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
188 QueryContextBuilder::default()
189 .current_catalog(table_info.catalog_name.to_string())
190 .current_schema(table_info.schema_name.to_string())
191 .build()
192 .into()
193 }
194
195 pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
197 let yaml_content = Content::Yaml(pipeline);
198 parse(&yaml_content)
199 }
200
201 async fn insert_pipeline_to_pipeline_table(
203 &self,
204 name: &str,
205 content_type: &str,
206 pipeline: &str,
207 ) -> Result<Timestamp> {
208 let now = Timestamp::current_time(TimeUnit::Nanosecond);
209
210 let table_info = self.table.table_info();
211
212 let insert = RowInsertRequest {
213 table_name: PIPELINE_TABLE_NAME.to_string(),
214 rows: Some(Rows {
215 schema: Self::build_insert_column_schemas(),
216 rows: vec![Row {
217 values: vec![
218 ValueData::StringValue(name.to_string()).into(),
219 ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(),
220 ValueData::StringValue(content_type.to_string()).into(),
221 ValueData::StringValue(pipeline.to_string()).into(),
222 ValueData::TimestampNanosecondValue(now.value()).into(),
223 ],
224 }],
225 }),
226 };
227
228 let requests = RowInsertRequests {
229 inserts: vec![insert],
230 };
231
232 let output = self
233 .inserter
234 .handle_row_inserts(
235 requests,
236 Self::query_ctx(&table_info),
237 &self.statement_executor,
238 false,
239 false,
240 )
241 .await
242 .context(InsertPipelineSnafu)?;
243
244 info!(
245 "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
246 name,
247 table_info.full_table_name(),
248 output
249 );
250
251 Ok(now)
252 }
253
254 pub async fn get_pipeline(
257 &self,
258 schema: &str,
259 name: &str,
260 version: PipelineVersion,
261 ) -> Result<Arc<Pipeline>> {
262 if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
263 return Ok(pipeline);
264 }
265
266 let pipeline = self.get_pipeline_str(schema, name, version).await?;
267 let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
268
269 self.cache
270 .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
271 Ok(compiled_pipeline)
272 }
273
274 pub async fn get_pipeline_str(
277 &self,
278 schema: &str,
279 name: &str,
280 version: PipelineVersion,
281 ) -> Result<(String, TimestampNanosecond)> {
282 if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
283 return Ok(pipeline);
284 }
285
286 let mut pipeline_vec = self.find_pipeline(name, version).await?;
287 ensure!(
288 !pipeline_vec.is_empty(),
289 PipelineNotFoundSnafu { name, version }
290 );
291
292 if pipeline_vec.len() == 1 {
294 let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
295 let p = (pipeline_content, version);
296 self.cache.insert_pipeline_str_cache(
297 &found_schema,
298 name,
299 Some(version),
300 p.clone(),
301 false,
302 );
303 return Ok(p);
304 }
305
306 let pipeline = pipeline_vec
309 .iter()
310 .find(|v| v.1 == EMPTY_SCHEMA_NAME)
311 .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
312
313 let (pipeline_content, found_schema, version) =
316 pipeline.context(MultiPipelineWithDiffSchemaSnafu {
317 schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
318 })?;
319
320 let v = *version;
321 let p = (pipeline_content.clone(), v);
322 self.cache
323 .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
324 Ok(p)
325 }
326
327 pub async fn insert_and_compile(
331 &self,
332 name: &str,
333 content_type: &str,
334 pipeline: &str,
335 ) -> Result<PipelineInfo> {
336 let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
337 let version = self
339 .insert_pipeline_to_pipeline_table(name, content_type, pipeline)
340 .await?;
341
342 {
343 self.cache.insert_pipeline_cache(
344 EMPTY_SCHEMA_NAME,
345 name,
346 Some(TimestampNanosecond(version)),
347 compiled_pipeline.clone(),
348 true,
349 );
350
351 self.cache.insert_pipeline_str_cache(
352 EMPTY_SCHEMA_NAME,
353 name,
354 Some(TimestampNanosecond(version)),
355 (pipeline.to_owned(), TimestampNanosecond(version)),
356 true,
357 );
358 }
359
360 Ok((version, compiled_pipeline))
361 }
362
363 pub async fn delete_pipeline(
364 &self,
365 name: &str,
366 version: PipelineVersion,
367 ) -> Result<Option<()>> {
368 ensure!(
370 version.is_some(),
371 InvalidPipelineVersionSnafu { version: "None" }
372 );
373
374 let pipeline = self.find_pipeline(name, version).await?;
376 if pipeline.is_empty() {
377 return Ok(None);
378 }
379
380 let dataframe = self
382 .query_engine
383 .read_table(self.table.clone())
384 .context(DataFrameSnafu)?;
385 let DataFrame::DataFusion(dataframe) = dataframe;
386
387 let dataframe = dataframe
388 .filter(prepare_dataframe_conditions(name, version))
389 .context(BuildDfLogicalPlanSnafu)?;
390
391 let table_info = self.table.table_info();
393 let table_name = TableReference::full(
394 table_info.catalog_name.clone(),
395 table_info.schema_name.clone(),
396 table_info.name.clone(),
397 );
398
399 let df_schema = Arc::new(
400 table_info
401 .meta
402 .schema
403 .arrow_schema()
404 .clone()
405 .to_dfschema()
406 .context(BuildDfLogicalPlanSnafu)?,
407 );
408
409 let stmt = DmlStatement::new(
411 table_name,
412 df_schema,
413 datafusion_expr::WriteOp::Delete,
414 Arc::new(dataframe.into_parts().1),
415 );
416
417 let plan = LogicalPlan::Dml(stmt);
418
419 let output = self
421 .query_engine
422 .execute(plan, Self::query_ctx(&table_info))
423 .await
424 .context(ExecuteInternalStatementSnafu)?;
425
426 info!(
427 "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
428 name,
429 version,
430 table_info.full_table_name(),
431 output
432 );
433
434 self.cache.remove_cache(name, version);
436
437 Ok(Some(()))
438 }
439
440 async fn find_pipeline(
444 &self,
445 name: &str,
446 version: PipelineVersion,
447 ) -> Result<Vec<(String, String, TimestampNanosecond)>> {
448 let dataframe = self
450 .query_engine
451 .read_table(self.table.clone())
452 .context(DataFrameSnafu)?;
453 let DataFrame::DataFusion(dataframe) = dataframe;
454
455 let dataframe = dataframe
457 .filter(prepare_dataframe_conditions(name, version))
458 .context(BuildDfLogicalPlanSnafu)?
459 .select_columns(&[
460 PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
461 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME,
462 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
463 ])
464 .context(BuildDfLogicalPlanSnafu)?
465 .sort(vec![
466 col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
467 ])
468 .context(BuildDfLogicalPlanSnafu)?;
469
470 let plan = dataframe.into_parts().1;
471
472 let table_info = self.table.table_info();
473
474 debug!("find_pipeline_by_name: plan: {:?}", plan);
475
476 let output = self
478 .query_engine
479 .execute(plan, Self::query_ctx(&table_info))
480 .await
481 .context(ExecuteInternalStatementSnafu)?;
482 let stream = match output.data {
483 OutputData::Stream(stream) => stream,
484 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
485 _ => unreachable!(),
486 };
487
488 let records = record_util::collect(stream)
490 .await
491 .context(CollectRecordsSnafu)?;
492
493 if records.is_empty() {
494 return Ok(vec![]);
495 }
496
497 ensure!(
498 !records.is_empty() && records.iter().all(|r| r.num_columns() == 3),
499 PipelineNotFoundSnafu { name, version }
500 );
501
502 let mut re = Vec::with_capacity(records.len());
503 for r in records {
504 let pipeline_content_column = r.column(0);
505 let pipeline_content = pipeline_content_column
506 .as_any()
507 .downcast_ref::<StringVector>()
508 .with_context(|| CastTypeSnafu {
509 msg: format!(
510 "can't downcast {:?} array into string vector",
511 pipeline_content_column.data_type()
512 ),
513 })?;
514
515 let pipeline_schema_column = r.column(1);
516 let pipeline_schema = pipeline_schema_column
517 .as_any()
518 .downcast_ref::<StringVector>()
519 .with_context(|| CastTypeSnafu {
520 msg: format!(
521 "can't downcast {:?} array into string vector",
522 pipeline_schema_column.data_type()
523 ),
524 })?;
525
526 let pipeline_created_at_column = r.column(2);
527 let pipeline_created_at = pipeline_created_at_column
528 .as_any()
529 .downcast_ref::<TimestampNanosecondVector>()
530 .with_context(|| CastTypeSnafu {
531 msg: format!(
532 "can't downcast {:?} array into scalar vector",
533 pipeline_created_at_column.data_type()
534 ),
535 })?;
536
537 debug!(
538 "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}",
539 pipeline_content, pipeline_schema, pipeline_created_at
540 );
541
542 ensure!(
543 pipeline_content.len() == pipeline_schema.len()
544 && pipeline_schema.len() == pipeline_created_at.len(),
545 RecordBatchLenNotMatchSnafu
546 );
547
548 let len = pipeline_content.len();
549 for i in 0..len {
550 re.push((
551 pipeline_content.get_data(i).unwrap().to_string(),
552 pipeline_schema.get_data(i).unwrap().to_string(),
553 pipeline_created_at.get_data(i).unwrap(),
554 ));
555 }
556 }
557
558 Ok(re)
559 }
560}