1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
21 RowInsertRequests, Rows, SemanticType,
22};
23use common_query::OutputData;
24use common_recordbatch::util as record_util;
25use common_telemetry::{debug, info};
26use common_time::timestamp::{TimeUnit, Timestamp};
27use datafusion::logical_expr::col;
28use datafusion_common::{TableReference, ToDFSchema};
29use datafusion_expr::{DmlStatement, LogicalPlan};
30use datatypes::prelude::ScalarVector;
31use datatypes::timestamp::TimestampNanosecond;
32use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
33use moka::sync::Cache;
34use operator::insert::InserterRef;
35use operator::statement::StatementExecutorRef;
36use query::dataframe::DataFrame;
37use query::QueryEngineRef;
38use session::context::{QueryContextBuilder, QueryContextRef};
39use snafu::{ensure, OptionExt, ResultExt};
40use table::metadata::TableInfo;
41use table::TableRef;
42
43use crate::error::{
44 BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
45 ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
46 PipelineNotFoundSnafu, Result,
47};
48use crate::etl::{parse, Content, Pipeline};
49use crate::manager::{PipelineInfo, PipelineVersion};
50use crate::util::{generate_pipeline_cache_key, prepare_dataframe_conditions};
51
52pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines";
53pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
54pub(crate) const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema";
55const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type";
56const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline";
57pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
58
59const PIPELINES_CACHE_SIZE: u64 = 10000;
61const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
63
64pub struct PipelineTable {
67 inserter: InserterRef,
68 statement_executor: StatementExecutorRef,
69 table: TableRef,
70 query_engine: QueryEngineRef,
71 pipelines: Cache<String, Arc<Pipeline>>,
72 original_pipelines: Cache<String, (String, TimestampNanosecond)>,
73}
74
75impl PipelineTable {
76 pub fn new(
78 inserter: InserterRef,
79 statement_executor: StatementExecutorRef,
80 table: TableRef,
81 query_engine: QueryEngineRef,
82 ) -> Self {
83 Self {
84 inserter,
85 statement_executor,
86 table,
87 query_engine,
88 pipelines: Cache::builder()
89 .max_capacity(PIPELINES_CACHE_SIZE)
90 .time_to_live(PIPELINES_CACHE_TTL)
91 .build(),
92 original_pipelines: Cache::builder()
93 .max_capacity(PIPELINES_CACHE_SIZE)
94 .time_to_live(PIPELINES_CACHE_TTL)
95 .build(),
96 }
97 }
98
99 pub fn build_pipeline_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
102 (
103 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
104 vec![
105 PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
106 PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
107 PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
108 ],
109 vec![
110 ColumnDef {
111 name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
112 data_type: ColumnDataType::String as i32,
113 is_nullable: false,
114 default_constraint: vec![],
115 semantic_type: SemanticType::Tag as i32,
116 comment: "".to_string(),
117 datatype_extension: None,
118 options: None,
119 },
120 ColumnDef {
121 name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
122 data_type: ColumnDataType::String as i32,
123 is_nullable: false,
124 default_constraint: vec![],
125 semantic_type: SemanticType::Tag as i32,
126 comment: "".to_string(),
127 datatype_extension: None,
128 options: None,
129 },
130 ColumnDef {
131 name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
132 data_type: ColumnDataType::String as i32,
133 is_nullable: false,
134 default_constraint: vec![],
135 semantic_type: SemanticType::Tag as i32,
136 comment: "".to_string(),
137 datatype_extension: None,
138 options: None,
139 },
140 ColumnDef {
141 name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
142 data_type: ColumnDataType::String as i32,
143 is_nullable: false,
144 default_constraint: vec![],
145 semantic_type: SemanticType::Field as i32,
146 comment: "".to_string(),
147 datatype_extension: None,
148 options: None,
149 },
150 ColumnDef {
151 name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
152 data_type: ColumnDataType::TimestampNanosecond as i32,
153 is_nullable: false,
154 default_constraint: vec![],
155 semantic_type: SemanticType::Timestamp as i32,
156 comment: "".to_string(),
157 datatype_extension: None,
158 options: None,
159 },
160 ],
161 )
162 }
163
164 fn build_insert_column_schemas() -> Vec<PbColumnSchema> {
166 vec![
167 PbColumnSchema {
168 column_name: PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME.to_string(),
169 datatype: ColumnDataType::String.into(),
170 semantic_type: SemanticType::Tag.into(),
171 ..Default::default()
172 },
173 PbColumnSchema {
174 column_name: PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME.to_string(),
175 datatype: ColumnDataType::String.into(),
176 semantic_type: SemanticType::Tag.into(),
177 ..Default::default()
178 },
179 PbColumnSchema {
180 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME.to_string(),
181 datatype: ColumnDataType::String.into(),
182 semantic_type: SemanticType::Tag.into(),
183 ..Default::default()
184 },
185 PbColumnSchema {
186 column_name: PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME.to_string(),
187 datatype: ColumnDataType::String.into(),
188 semantic_type: SemanticType::Field.into(),
189 ..Default::default()
190 },
191 PbColumnSchema {
192 column_name: PIPELINE_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
193 datatype: ColumnDataType::TimestampNanosecond.into(),
194 semantic_type: SemanticType::Timestamp.into(),
195 ..Default::default()
196 },
197 ]
198 }
199
200 fn query_ctx(table_info: &TableInfo) -> QueryContextRef {
201 QueryContextBuilder::default()
202 .current_catalog(table_info.catalog_name.to_string())
203 .current_schema(table_info.schema_name.to_string())
204 .build()
205 .into()
206 }
207
208 pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline> {
210 let yaml_content = Content::Yaml(pipeline);
211 parse(&yaml_content)
212 }
213
214 async fn insert_pipeline_to_pipeline_table(
216 &self,
217 schema: &str,
218 name: &str,
219 content_type: &str,
220 pipeline: &str,
221 ) -> Result<Timestamp> {
222 let now = Timestamp::current_time(TimeUnit::Nanosecond);
223
224 let table_info = self.table.table_info();
225
226 let insert = RowInsertRequest {
227 table_name: PIPELINE_TABLE_NAME.to_string(),
228 rows: Some(Rows {
229 schema: Self::build_insert_column_schemas(),
230 rows: vec![Row {
231 values: vec![
232 ValueData::StringValue(name.to_string()).into(),
233 ValueData::StringValue(schema.to_string()).into(),
234 ValueData::StringValue(content_type.to_string()).into(),
235 ValueData::StringValue(pipeline.to_string()).into(),
236 ValueData::TimestampNanosecondValue(now.value()).into(),
237 ],
238 }],
239 }),
240 };
241
242 let requests = RowInsertRequests {
243 inserts: vec![insert],
244 };
245
246 let output = self
247 .inserter
248 .handle_row_inserts(
249 requests,
250 Self::query_ctx(&table_info),
251 &self.statement_executor,
252 )
253 .await
254 .context(InsertPipelineSnafu)?;
255
256 info!(
257 "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}",
258 name,
259 table_info.full_table_name(),
260 output
261 );
262
263 Ok(now)
264 }
265
266 pub async fn get_pipeline(
269 &self,
270 schema: &str,
271 name: &str,
272 version: PipelineVersion,
273 ) -> Result<Arc<Pipeline>> {
274 if let Some(pipeline) = self
275 .pipelines
276 .get(&generate_pipeline_cache_key(schema, name, version))
277 {
278 return Ok(pipeline);
279 }
280
281 let pipeline = self.get_pipeline_str(schema, name, version).await?;
282 let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
283
284 self.pipelines.insert(
285 generate_pipeline_cache_key(schema, name, version),
286 compiled_pipeline.clone(),
287 );
288 Ok(compiled_pipeline)
289 }
290
291 pub async fn get_pipeline_str(
294 &self,
295 schema: &str,
296 name: &str,
297 version: PipelineVersion,
298 ) -> Result<(String, TimestampNanosecond)> {
299 if let Some(pipeline) = self
300 .original_pipelines
301 .get(&generate_pipeline_cache_key(schema, name, version))
302 {
303 return Ok(pipeline);
304 }
305 let pipeline = self
306 .find_pipeline(schema, name, version)
307 .await?
308 .context(PipelineNotFoundSnafu { name, version })?;
309 self.original_pipelines.insert(
310 generate_pipeline_cache_key(schema, name, version),
311 pipeline.clone(),
312 );
313 Ok(pipeline)
314 }
315
316 pub async fn insert_and_compile(
319 &self,
320 schema: &str,
321 name: &str,
322 content_type: &str,
323 pipeline: &str,
324 ) -> Result<PipelineInfo> {
325 let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
326 let version = self
328 .insert_pipeline_to_pipeline_table(schema, name, content_type, pipeline)
329 .await?;
330
331 {
332 self.pipelines.insert(
333 generate_pipeline_cache_key(schema, name, None),
334 compiled_pipeline.clone(),
335 );
336 self.pipelines.insert(
337 generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
338 compiled_pipeline.clone(),
339 );
340
341 self.original_pipelines.insert(
342 generate_pipeline_cache_key(schema, name, None),
343 (pipeline.to_owned(), TimestampNanosecond(version)),
344 );
345 self.original_pipelines.insert(
346 generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))),
347 (pipeline.to_owned(), TimestampNanosecond(version)),
348 );
349 }
350
351 Ok((version, compiled_pipeline))
352 }
353
354 pub async fn delete_pipeline(
355 &self,
356 schema: &str,
357 name: &str,
358 version: PipelineVersion,
359 ) -> Result<Option<()>> {
360 ensure!(
362 version.is_some(),
363 InvalidPipelineVersionSnafu { version: "None" }
364 );
365
366 let pipeline = self.find_pipeline(schema, name, version).await?;
368 if pipeline.is_none() {
369 return Ok(None);
370 }
371
372 let dataframe = self
374 .query_engine
375 .read_table(self.table.clone())
376 .context(DataFrameSnafu)?;
377 let DataFrame::DataFusion(dataframe) = dataframe;
378
379 let dataframe = dataframe
380 .filter(prepare_dataframe_conditions(schema, name, version))
381 .context(BuildDfLogicalPlanSnafu)?;
382
383 let table_info = self.table.table_info();
385 let table_name = TableReference::full(
386 table_info.catalog_name.clone(),
387 table_info.schema_name.clone(),
388 table_info.name.clone(),
389 );
390
391 let df_schema = Arc::new(
392 table_info
393 .meta
394 .schema
395 .arrow_schema()
396 .clone()
397 .to_dfschema()
398 .context(BuildDfLogicalPlanSnafu)?,
399 );
400
401 let stmt = DmlStatement::new(
403 table_name,
404 df_schema,
405 datafusion_expr::WriteOp::Delete,
406 Arc::new(dataframe.into_parts().1),
407 );
408
409 let plan = LogicalPlan::Dml(stmt);
410
411 let output = self
413 .query_engine
414 .execute(plan, Self::query_ctx(&table_info))
415 .await
416 .context(ExecuteInternalStatementSnafu)?;
417
418 info!(
419 "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}",
420 name,
421 version,
422 table_info.full_table_name(),
423 output
424 );
425
426 self.pipelines
428 .remove(&generate_pipeline_cache_key(schema, name, version));
429 self.pipelines
430 .remove(&generate_pipeline_cache_key(schema, name, None));
431 self.original_pipelines
432 .remove(&generate_pipeline_cache_key(schema, name, version));
433 self.original_pipelines
434 .remove(&generate_pipeline_cache_key(schema, name, None));
435
436 Ok(Some(()))
437 }
438
439 async fn find_pipeline(
440 &self,
441 schema: &str,
442 name: &str,
443 version: PipelineVersion,
444 ) -> Result<Option<(String, TimestampNanosecond)>> {
445 let dataframe = self
447 .query_engine
448 .read_table(self.table.clone())
449 .context(DataFrameSnafu)?;
450 let DataFrame::DataFusion(dataframe) = dataframe;
451
452 let dataframe = dataframe
453 .filter(prepare_dataframe_conditions(schema, name, version))
454 .context(BuildDfLogicalPlanSnafu)?
455 .select_columns(&[
456 PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME,
457 PIPELINE_TABLE_CREATED_AT_COLUMN_NAME,
458 ])
459 .context(BuildDfLogicalPlanSnafu)?
460 .sort(vec![
461 col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true)
462 ])
463 .context(BuildDfLogicalPlanSnafu)?
464 .limit(0, Some(1))
465 .context(BuildDfLogicalPlanSnafu)?;
466
467 let plan = dataframe.into_parts().1;
468
469 let table_info = self.table.table_info();
470
471 debug!("find_pipeline_by_name: plan: {:?}", plan);
472
473 let output = self
475 .query_engine
476 .execute(plan, Self::query_ctx(&table_info))
477 .await
478 .context(ExecuteInternalStatementSnafu)?;
479 let stream = match output.data {
480 OutputData::Stream(stream) => stream,
481 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
482 _ => unreachable!(),
483 };
484
485 let records = record_util::collect(stream)
487 .await
488 .context(CollectRecordsSnafu)?;
489
490 if records.is_empty() {
491 return Ok(None);
492 }
493
494 ensure!(
496 records.len() == 1 && records[0].num_columns() == 2,
497 PipelineNotFoundSnafu { name, version }
498 );
499
500 let pipeline_content_column = records[0].column(0);
501 let pipeline_content = pipeline_content_column
502 .as_any()
503 .downcast_ref::<StringVector>()
504 .with_context(|| CastTypeSnafu {
505 msg: format!(
506 "can't downcast {:?} array into string vector",
507 pipeline_content_column.data_type()
508 ),
509 })?;
510
511 let pipeline_created_at_column = records[0].column(1);
512 let pipeline_created_at = pipeline_created_at_column
513 .as_any()
514 .downcast_ref::<TimestampNanosecondVector>()
515 .with_context(|| CastTypeSnafu {
516 msg: format!(
517 "can't downcast {:?} array into scalar vector",
518 pipeline_created_at_column.data_type()
519 ),
520 })?;
521
522 debug!(
523 "find_pipeline_by_name: pipeline_content: {:?}, pipeline_created_at: {:?}",
524 pipeline_content, pipeline_created_at
525 );
526
527 ensure!(
528 pipeline_content.len() == 1,
529 PipelineNotFoundSnafu { name, version }
530 );
531
532 Ok(Some((
534 pipeline_content.get_data(0).unwrap().to_string(),
535 pipeline_created_at.get_data(0).unwrap(),
536 )))
537 }
538}