use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Instant;
use api::v1::CreateTableExpr;
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_telemetry::info;
use futures::FutureExt;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion};
use crate::metrics::{
METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM,
METRIC_PIPELINE_RETRIEVE_HISTOGRAM,
};
use crate::table::{PipelineTable, PIPELINE_TABLE_NAME};
use crate::{GreptimeTransformer, Pipeline};
pub struct PipelineOperator {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
tables: RwLock<HashMap<String, PipelineTableRef>>,
}
impl PipelineOperator {
fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();
let create_table_expr = CreateTableExpr {
catalog_name: catalog.to_string(),
schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
table_name: PIPELINE_TABLE_NAME.to_string(),
desc: "GreptimeDB pipeline table for Log".to_string(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id: None, engine: default_engine().to_string(),
};
RegisterSystemTableRequest {
create_table_expr,
open_hook: None,
}
}
fn add_pipeline_table_to_cache(&self, catalog: &str, table: TableRef) {
let mut tables = self.tables.write().unwrap();
if tables.contains_key(catalog) {
return;
}
tables.insert(
catalog.to_string(),
Arc::new(PipelineTable::new(
self.inserter.clone(),
self.statement_executor.clone(),
table,
self.query_engine.clone(),
)),
);
}
async fn create_pipeline_table_if_not_exists(&self, ctx: QueryContextRef) -> Result<()> {
let catalog = ctx.current_catalog();
if self.get_pipeline_table_from_cache(catalog).is_some() {
return Ok(());
}
let RegisterSystemTableRequest {
create_table_expr: mut expr,
open_hook: _,
} = self.create_table_request(catalog);
if let Some(table) = self
.catalog_manager
.table(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
Some(&ctx),
)
.await
.context(CatalogSnafu)?
{
self.add_pipeline_table_to_cache(catalog, table);
return Ok(());
}
self.statement_executor
.create_table_inner(&mut expr, None, ctx.clone())
.await
.context(CreateTableSnafu)?;
let schema = &expr.schema_name;
let table_name = &expr.table_name;
let table = self
.catalog_manager
.table(catalog, schema, table_name, Some(&ctx))
.await
.context(CatalogSnafu)?
.context(PipelineTableNotFoundSnafu)?;
info!(
"Created pipelines table {} with table id {}.",
table.table_info().full_table_name(),
table.table_info().table_id()
);
self.add_pipeline_table_to_cache(catalog, table);
Ok(())
}
pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
self.tables.read().unwrap().get(catalog).cloned()
}
}
impl PipelineOperator {
pub fn new(
inserter: InserterRef,
statement_executor: StatementExecutorRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Self {
Self {
inserter,
statement_executor,
catalog_manager,
tables: RwLock::new(HashMap::new()),
query_engine,
}
}
pub async fn get_pipeline(
&self,
query_ctx: QueryContextRef,
name: &str,
version: PipelineVersion,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
let schema = query_ctx.current_schema();
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.get_pipeline(&schema, name, version)
.inspect(|re| {
METRIC_PIPELINE_RETRIEVE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
}
pub async fn insert_pipeline(
&self,
name: &str,
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> Result<PipelineInfo> {
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline)
.inspect(|re| {
METRIC_PIPELINE_CREATE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
}
pub async fn delete_pipeline(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>> {
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.delete_pipeline(&query_ctx.current_schema(), name, version)
.inspect(|re| {
METRIC_PIPELINE_DELETE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
}
pub fn build_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
PipelineTable::compile_pipeline(pipeline)
}
}