1use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17use std::time::Instant;
18
19use api::v1::CreateTableExpr;
20use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
21use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
22use common_telemetry::info;
23use datatypes::timestamp::TimestampNanosecond;
24use futures::FutureExt;
25use operator::insert::InserterRef;
26use operator::statement::StatementExecutorRef;
27use query::QueryEngineRef;
28use session::context::QueryContextRef;
29use snafu::{OptionExt, ResultExt};
30use table::TableRef;
31
32use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
33use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion};
34use crate::metrics::{
35 METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM,
36 METRIC_PIPELINE_RETRIEVE_HISTOGRAM,
37};
38use crate::table::{PipelineTable, PIPELINE_TABLE_NAME};
39use crate::Pipeline;
40
41pub struct PipelineOperator {
50 inserter: InserterRef,
51 statement_executor: StatementExecutorRef,
52 catalog_manager: CatalogManagerRef,
53 query_engine: QueryEngineRef,
54 tables: RwLock<HashMap<String, PipelineTableRef>>,
55}
56
57impl PipelineOperator {
58 fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
60 let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();
61
62 let create_table_expr = CreateTableExpr {
63 catalog_name: catalog.to_string(),
64 schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
65 table_name: PIPELINE_TABLE_NAME.to_string(),
66 desc: "GreptimeDB pipeline table for Log".to_string(),
67 column_defs,
68 time_index,
69 primary_keys,
70 create_if_not_exists: true,
71 table_options: Default::default(),
72 table_id: None, engine: default_engine().to_string(),
74 };
75
76 RegisterSystemTableRequest {
77 create_table_expr,
78 open_hook: None,
79 }
80 }
81
82 fn add_pipeline_table_to_cache(&self, catalog: &str, table: TableRef) {
83 let mut tables = self.tables.write().unwrap();
84 if tables.contains_key(catalog) {
85 return;
86 }
87 tables.insert(
88 catalog.to_string(),
89 Arc::new(PipelineTable::new(
90 self.inserter.clone(),
91 self.statement_executor.clone(),
92 table,
93 self.query_engine.clone(),
94 )),
95 );
96 }
97
98 async fn create_pipeline_table_if_not_exists(&self, ctx: QueryContextRef) -> Result<()> {
99 let catalog = ctx.current_catalog();
100
101 if self.get_pipeline_table_from_cache(catalog).is_some() {
103 return Ok(());
104 }
105
106 let RegisterSystemTableRequest {
107 create_table_expr: mut expr,
108 open_hook: _,
109 } = self.create_table_request(catalog);
110
111 if let Some(table) = self
113 .catalog_manager
114 .table(
115 &expr.catalog_name,
116 &expr.schema_name,
117 &expr.table_name,
118 Some(&ctx),
119 )
120 .await
121 .context(CatalogSnafu)?
122 {
123 self.add_pipeline_table_to_cache(catalog, table);
124 return Ok(());
125 }
126
127 self.statement_executor
129 .create_table_inner(&mut expr, None, ctx.clone())
130 .await
131 .context(CreateTableSnafu)?;
132
133 let schema = &expr.schema_name;
134 let table_name = &expr.table_name;
135
136 let table = self
138 .catalog_manager
139 .table(catalog, schema, table_name, Some(&ctx))
140 .await
141 .context(CatalogSnafu)?
142 .context(PipelineTableNotFoundSnafu)?;
143
144 info!(
145 "Created pipelines table {} with table id {}.",
146 table.table_info().full_table_name(),
147 table.table_info().table_id()
148 );
149
150 self.add_pipeline_table_to_cache(catalog, table);
152
153 Ok(())
154 }
155
156 pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
158 self.tables.read().unwrap().get(catalog).cloned()
159 }
160}
161
162impl PipelineOperator {
163 pub fn new(
165 inserter: InserterRef,
166 statement_executor: StatementExecutorRef,
167 catalog_manager: CatalogManagerRef,
168 query_engine: QueryEngineRef,
169 ) -> Self {
170 Self {
171 inserter,
172 statement_executor,
173 catalog_manager,
174 tables: RwLock::new(HashMap::new()),
175 query_engine,
176 }
177 }
178
179 pub async fn get_pipeline(
181 &self,
182 query_ctx: QueryContextRef,
183 name: &str,
184 version: PipelineVersion,
185 ) -> Result<Arc<Pipeline>> {
186 let schema = query_ctx.current_schema();
187 self.create_pipeline_table_if_not_exists(query_ctx.clone())
188 .await?;
189
190 let timer = Instant::now();
191 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
192 .context(PipelineTableNotFoundSnafu)?
193 .get_pipeline(&schema, name, version)
194 .inspect(|re| {
195 METRIC_PIPELINE_RETRIEVE_HISTOGRAM
196 .with_label_values(&[&re.is_ok().to_string()])
197 .observe(timer.elapsed().as_secs_f64())
198 })
199 .await
200 }
201
202 pub async fn get_pipeline_str(
204 &self,
205 name: &str,
206 version: PipelineVersion,
207 query_ctx: QueryContextRef,
208 ) -> Result<(String, TimestampNanosecond)> {
209 let schema = query_ctx.current_schema();
210 self.create_pipeline_table_if_not_exists(query_ctx.clone())
211 .await?;
212
213 let timer = Instant::now();
214 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
215 .context(PipelineTableNotFoundSnafu)?
216 .get_pipeline_str(&schema, name, version)
217 .inspect(|re| {
218 METRIC_PIPELINE_RETRIEVE_HISTOGRAM
219 .with_label_values(&[&re.is_ok().to_string()])
220 .observe(timer.elapsed().as_secs_f64())
221 })
222 .await
223 }
224
225 pub async fn insert_pipeline(
227 &self,
228 name: &str,
229 content_type: &str,
230 pipeline: &str,
231 query_ctx: QueryContextRef,
232 ) -> Result<PipelineInfo> {
233 self.create_pipeline_table_if_not_exists(query_ctx.clone())
234 .await?;
235
236 let timer = Instant::now();
237 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
238 .context(PipelineTableNotFoundSnafu)?
239 .insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline)
240 .inspect(|re| {
241 METRIC_PIPELINE_CREATE_HISTOGRAM
242 .with_label_values(&[&re.is_ok().to_string()])
243 .observe(timer.elapsed().as_secs_f64())
244 })
245 .await
246 }
247
248 pub async fn delete_pipeline(
250 &self,
251 name: &str,
252 version: PipelineVersion,
253 query_ctx: QueryContextRef,
254 ) -> Result<Option<()>> {
255 self.create_pipeline_table_if_not_exists(query_ctx.clone())
257 .await?;
258
259 let timer = Instant::now();
260 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
261 .context(PipelineTableNotFoundSnafu)?
262 .delete_pipeline(&query_ctx.current_schema(), name, version)
263 .inspect(|re| {
264 METRIC_PIPELINE_DELETE_HISTOGRAM
265 .with_label_values(&[&re.is_ok().to_string()])
266 .observe(timer.elapsed().as_secs_f64())
267 })
268 .await
269 }
270
271 pub fn build_pipeline(pipeline: &str) -> Result<Pipeline> {
273 PipelineTable::compile_pipeline(pipeline)
274 }
275}