1use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17use std::time::Instant;
18
19use api::v1::CreateTableExpr;
20use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
21use common_catalog::consts::{DEFAULT_PRIVATE_SCHEMA_NAME, default_engine};
22use common_telemetry::info;
23use common_time::FOREVER;
24use datatypes::timestamp::TimestampNanosecond;
25use futures::FutureExt;
26use operator::insert::InserterRef;
27use operator::statement::StatementExecutorRef;
28use query::QueryEngineRef;
29use session::context::QueryContextRef;
30use snafu::{OptionExt, ResultExt};
31use table::TableRef;
32use table::requests::TTL_KEY;
33
34use crate::Pipeline;
35use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
36use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion};
37use crate::metrics::{
38 METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM,
39 METRIC_PIPELINE_RETRIEVE_HISTOGRAM,
40};
41use crate::table::{PIPELINE_TABLE_NAME, PipelineTable};
42
43pub struct PipelineOperator {
52 inserter: InserterRef,
53 statement_executor: StatementExecutorRef,
54 catalog_manager: CatalogManagerRef,
55 query_engine: QueryEngineRef,
56 tables: RwLock<HashMap<String, PipelineTableRef>>,
57}
58
59impl PipelineOperator {
60 fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
62 let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();
63
64 let mut table_options = HashMap::new();
65 table_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
66
67 let create_table_expr = CreateTableExpr {
68 catalog_name: catalog.to_string(),
69 schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
70 table_name: PIPELINE_TABLE_NAME.to_string(),
71 desc: "GreptimeDB pipeline table for Log".to_string(),
72 column_defs,
73 time_index,
74 primary_keys,
75 create_if_not_exists: true,
76 table_options,
77 table_id: None, engine: default_engine().to_string(),
79 };
80
81 RegisterSystemTableRequest {
82 create_table_expr,
83 open_hook: None,
84 }
85 }
86
87 fn add_pipeline_table_to_cache(&self, catalog: &str, table: TableRef) {
88 let mut tables = self.tables.write().unwrap();
89 if tables.contains_key(catalog) {
90 return;
91 }
92 tables.insert(
93 catalog.to_string(),
94 Arc::new(PipelineTable::new(
95 self.inserter.clone(),
96 self.statement_executor.clone(),
97 table,
98 self.query_engine.clone(),
99 )),
100 );
101 }
102
103 async fn create_pipeline_table_if_not_exists(&self, ctx: QueryContextRef) -> Result<()> {
104 let catalog = ctx.current_catalog();
105
106 if self.get_pipeline_table_from_cache(catalog).is_some() {
108 return Ok(());
109 }
110
111 let RegisterSystemTableRequest {
112 create_table_expr: mut expr,
113 open_hook: _,
114 } = self.create_table_request(catalog);
115
116 if let Some(table) = self
118 .catalog_manager
119 .table(
120 &expr.catalog_name,
121 &expr.schema_name,
122 &expr.table_name,
123 Some(&ctx),
124 )
125 .await
126 .context(CatalogSnafu)?
127 {
128 self.add_pipeline_table_to_cache(catalog, table);
129 return Ok(());
130 }
131
132 self.statement_executor
134 .create_table_inner(&mut expr, None, ctx.clone())
135 .await
136 .context(CreateTableSnafu)?;
137
138 let schema = &expr.schema_name;
139 let table_name = &expr.table_name;
140
141 let table = self
143 .catalog_manager
144 .table(catalog, schema, table_name, Some(&ctx))
145 .await
146 .context(CatalogSnafu)?
147 .context(PipelineTableNotFoundSnafu)?;
148
149 info!(
150 "Created pipelines table {} with table id {}.",
151 table.table_info().full_table_name(),
152 table.table_info().table_id()
153 );
154
155 self.add_pipeline_table_to_cache(catalog, table);
157
158 Ok(())
159 }
160
161 pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
163 self.tables.read().unwrap().get(catalog).cloned()
164 }
165}
166
167impl PipelineOperator {
168 pub fn new(
170 inserter: InserterRef,
171 statement_executor: StatementExecutorRef,
172 catalog_manager: CatalogManagerRef,
173 query_engine: QueryEngineRef,
174 ) -> Self {
175 Self {
176 inserter,
177 statement_executor,
178 catalog_manager,
179 tables: RwLock::new(HashMap::new()),
180 query_engine,
181 }
182 }
183
184 pub async fn get_pipeline(
186 &self,
187 query_ctx: QueryContextRef,
188 name: &str,
189 version: PipelineVersion,
190 ) -> Result<Arc<Pipeline>> {
191 let schema = query_ctx.current_schema();
192 self.create_pipeline_table_if_not_exists(query_ctx.clone())
193 .await?;
194
195 let timer = Instant::now();
196 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
197 .context(PipelineTableNotFoundSnafu)?
198 .get_pipeline(&schema, name, version)
199 .inspect(|re| {
200 METRIC_PIPELINE_RETRIEVE_HISTOGRAM
201 .with_label_values(&[&re.is_ok().to_string()])
202 .observe(timer.elapsed().as_secs_f64())
203 })
204 .await
205 }
206
207 pub async fn get_pipeline_str(
209 &self,
210 name: &str,
211 version: PipelineVersion,
212 query_ctx: QueryContextRef,
213 ) -> Result<(String, TimestampNanosecond)> {
214 let schema = query_ctx.current_schema();
215 self.create_pipeline_table_if_not_exists(query_ctx.clone())
216 .await?;
217
218 let timer = Instant::now();
219 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
220 .context(PipelineTableNotFoundSnafu)?
221 .get_pipeline_str(&schema, name, version)
222 .inspect(|re| {
223 METRIC_PIPELINE_RETRIEVE_HISTOGRAM
224 .with_label_values(&[&re.is_ok().to_string()])
225 .observe(timer.elapsed().as_secs_f64())
226 })
227 .await
228 .map(|p| (p.content, p.version))
229 }
230
231 pub async fn insert_pipeline(
233 &self,
234 name: &str,
235 content_type: &str,
236 pipeline: &str,
237 query_ctx: QueryContextRef,
238 ) -> Result<PipelineInfo> {
239 self.create_pipeline_table_if_not_exists(query_ctx.clone())
240 .await?;
241
242 let timer = Instant::now();
243 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
244 .context(PipelineTableNotFoundSnafu)?
245 .insert_and_compile(name, content_type, pipeline)
246 .inspect(|re| {
247 METRIC_PIPELINE_CREATE_HISTOGRAM
248 .with_label_values(&[&re.is_ok().to_string()])
249 .observe(timer.elapsed().as_secs_f64())
250 })
251 .await
252 }
253
254 pub async fn delete_pipeline(
256 &self,
257 name: &str,
258 version: PipelineVersion,
259 query_ctx: QueryContextRef,
260 ) -> Result<Option<()>> {
261 self.create_pipeline_table_if_not_exists(query_ctx.clone())
263 .await?;
264
265 let timer = Instant::now();
266 self.get_pipeline_table_from_cache(query_ctx.current_catalog())
267 .context(PipelineTableNotFoundSnafu)?
268 .delete_pipeline(name, version)
269 .inspect(|re| {
270 METRIC_PIPELINE_DELETE_HISTOGRAM
271 .with_label_values(&[&re.is_ok().to_string()])
272 .observe(timer.elapsed().as_secs_f64())
273 })
274 .await
275 }
276
277 pub fn build_pipeline(pipeline: &str) -> Result<Pipeline> {
279 PipelineTable::compile_pipeline(pipeline)
280 }
281}