1use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_meta::key::FlowId;
20use common_meta::key::flow::FlowMetadataManager;
21use common_meta::key::flow::flow_info::FlowInfoValue;
22use common_meta::key::flow::flow_state::FlowStat;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datatypes::prelude::ConcreteDataType as CDT;
29use datatypes::scalars::ScalarVectorBuilder;
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::timestamp::TimestampMillisecond;
32use datatypes::value::Value;
33use datatypes::vectors::{
34 Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
35 UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
36};
37use futures::TryStreamExt;
38use snafu::{OptionExt, ResultExt};
39use sql::ast::Ident;
40use sql::dialect::GreptimeDbDialect;
41use sql::parser::ParserContext;
42use sql::statements::create::{CreateFlow, SqlOrTql};
43use sql::statements::statement::Statement;
44use store_api::storage::{ScanRequest, TableId};
45
46use crate::CatalogManager;
47use crate::error::{
48 CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
49 Result, UpgradeWeakCatalogManagerRefSnafu,
50};
51use crate::information_schema::{FLOWS, Predicates};
52use crate::system_schema::information_schema::InformationTable;
53use crate::system_schema::utils;
54
55const INIT_CAPACITY: usize = 42;
56
57pub const FLOW_NAME: &str = "flow_name";
60pub const FLOW_ID: &str = "flow_id";
61pub const STATE_SIZE: &str = "state_size";
62pub const TABLE_CATALOG: &str = "table_catalog";
63pub const FLOW_DEFINITION: &str = "flow_definition";
64pub const COMMENT: &str = "comment";
65pub const EXPIRE_AFTER: &str = "expire_after";
66pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
67pub const SINK_TABLE_NAME: &str = "sink_table_name";
68pub const FLOWNODE_IDS: &str = "flownode_ids";
69pub const OPTIONS: &str = "options";
70pub const CREATED_TIME: &str = "created_time";
71pub const UPDATED_TIME: &str = "updated_time";
72pub const LAST_EXECUTION_TIME: &str = "last_execution_time";
73pub const SOURCE_TABLE_NAMES: &str = "source_table_names";
74
75#[derive(Debug)]
77pub(super) struct InformationSchemaFlows {
78 schema: SchemaRef,
79 catalog_name: String,
80 catalog_manager: Weak<dyn CatalogManager>,
81 flow_metadata_manager: Arc<FlowMetadataManager>,
82}
83
84impl InformationSchemaFlows {
85 pub(super) fn new(
86 catalog_name: String,
87 catalog_manager: Weak<dyn CatalogManager>,
88 flow_metadata_manager: Arc<FlowMetadataManager>,
89 ) -> Self {
90 Self {
91 schema: Self::schema(),
92 catalog_name,
93 catalog_manager,
94 flow_metadata_manager,
95 }
96 }
97
98 pub(crate) fn schema() -> SchemaRef {
101 Arc::new(Schema::new(
102 vec![
103 (FLOW_NAME, CDT::string_datatype(), false),
104 (FLOW_ID, CDT::uint32_datatype(), false),
105 (STATE_SIZE, CDT::uint64_datatype(), true),
106 (TABLE_CATALOG, CDT::string_datatype(), false),
107 (FLOW_DEFINITION, CDT::string_datatype(), false),
108 (COMMENT, CDT::string_datatype(), true),
109 (EXPIRE_AFTER, CDT::int64_datatype(), true),
110 (SOURCE_TABLE_IDS, CDT::string_datatype(), true),
111 (SINK_TABLE_NAME, CDT::string_datatype(), false),
112 (FLOWNODE_IDS, CDT::string_datatype(), true),
113 (OPTIONS, CDT::string_datatype(), true),
114 (CREATED_TIME, CDT::timestamp_millisecond_datatype(), false),
115 (UPDATED_TIME, CDT::timestamp_millisecond_datatype(), false),
116 (
117 LAST_EXECUTION_TIME,
118 CDT::timestamp_millisecond_datatype(),
119 true,
120 ),
121 (SOURCE_TABLE_NAMES, CDT::string_datatype(), true),
122 ]
123 .into_iter()
124 .map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
125 .collect(),
126 ))
127 }
128
129 pub(crate) fn generate_show_create_flow(flow_info: &FlowInfoValue) -> Result<String> {
131 let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, flow_info.raw_sql())
132 .map_err(BoxedError::new)
133 .context(InternalSnafu)?;
134
135 let query = parser_ctx
136 .parse_statement()
137 .map_err(BoxedError::new)
138 .context(InternalSnafu)?;
139
140 let raw_query = match &query {
141 Statement::Tql(_) => flow_info.raw_sql().clone(),
142 _ => query.to_string(),
143 };
144
145 let query = Box::new(
146 SqlOrTql::try_from_statement(query, &raw_query)
147 .map_err(BoxedError::new)
148 .context(InternalSnafu)?,
149 );
150
151 let comment = if flow_info.comment().is_empty() {
152 None
153 } else {
154 Some(flow_info.comment().clone())
155 };
156
157 let stmt = CreateFlow {
158 flow_name: sql::ast::ObjectName::from(vec![Ident::new(flow_info.flow_name())]),
159 sink_table_name: sql::ast::ObjectName::from(vec![
160 Ident::new(&flow_info.sink_table_name().schema_name),
161 Ident::new(&flow_info.sink_table_name().table_name),
162 ]),
163 or_replace: false,
164 if_not_exists: true,
165 expire_after: flow_info.expire_after(),
166 eval_interval: flow_info.eval_interval(),
167 comment,
168 query,
169 };
170
171 Ok(stmt.to_string())
172 }
173
174 fn builder(&self) -> InformationSchemaFlowsBuilder {
175 InformationSchemaFlowsBuilder::new(
176 self.schema.clone(),
177 self.catalog_name.clone(),
178 self.catalog_manager.clone(),
179 &self.flow_metadata_manager,
180 )
181 }
182}
183
184impl InformationTable for InformationSchemaFlows {
185 fn table_id(&self) -> TableId {
186 INFORMATION_SCHEMA_FLOW_TABLE_ID
187 }
188
189 fn table_name(&self) -> &'static str {
190 FLOWS
191 }
192
193 fn schema(&self) -> SchemaRef {
194 self.schema.clone()
195 }
196
197 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
198 let schema = self.schema.arrow_schema().clone();
199 let mut builder = self.builder();
200 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
201 schema,
202 futures::stream::once(async move {
203 builder
204 .make_flows(Some(request))
205 .await
206 .map(|x| x.into_df_record_batch())
207 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
208 }),
209 ));
210 Ok(Box::pin(
211 RecordBatchStreamAdapter::try_new(stream)
212 .map_err(BoxedError::new)
213 .context(InternalSnafu)?,
214 ))
215 }
216}
217
218struct InformationSchemaFlowsBuilder {
222 schema: SchemaRef,
223 catalog_name: String,
224 catalog_manager: Weak<dyn CatalogManager>,
225 flow_metadata_manager: Arc<FlowMetadataManager>,
226
227 flow_names: StringVectorBuilder,
228 flow_ids: UInt32VectorBuilder,
229 state_sizes: UInt64VectorBuilder,
230 table_catalogs: StringVectorBuilder,
231 raw_sqls: StringVectorBuilder,
232 comments: StringVectorBuilder,
233 expire_afters: Int64VectorBuilder,
234 source_table_id_groups: StringVectorBuilder,
235 sink_table_names: StringVectorBuilder,
236 flownode_id_groups: StringVectorBuilder,
237 option_groups: StringVectorBuilder,
238 created_time: TimestampMillisecondVectorBuilder,
239 updated_time: TimestampMillisecondVectorBuilder,
240 last_execution_time: TimestampMillisecondVectorBuilder,
241 source_table_names: StringVectorBuilder,
242}
243
244impl InformationSchemaFlowsBuilder {
245 fn new(
246 schema: SchemaRef,
247 catalog_name: String,
248 catalog_manager: Weak<dyn CatalogManager>,
249 flow_metadata_manager: &Arc<FlowMetadataManager>,
250 ) -> Self {
251 Self {
252 schema,
253 catalog_name,
254 catalog_manager,
255 flow_metadata_manager: flow_metadata_manager.clone(),
256
257 flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
258 flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
259 state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
260 table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
261 raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
262 comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
263 expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
264 source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
265 sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
266 flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
267 option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
268 created_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
269 updated_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
270 last_execution_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
271 source_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
272 }
273 }
274
275 async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
277 let catalog_name = self.catalog_name.clone();
278 let predicates = Predicates::from_scan_request(&request);
279
280 let flow_info_manager = self.flow_metadata_manager.clone();
281
282 let mut stream = flow_info_manager
284 .flow_name_manager()
285 .flow_names(&catalog_name)
286 .await;
287
288 let flow_stat = {
289 let information_extension = utils::information_extension(&self.catalog_manager)?;
290 information_extension.flow_stats().await?
291 };
292
293 while let Some((flow_name, flow_id)) = stream
294 .try_next()
295 .await
296 .map_err(BoxedError::new)
297 .context(ListFlowsSnafu {
298 catalog: &catalog_name,
299 })?
300 {
301 let flow_info = flow_info_manager
302 .flow_info_manager()
303 .get(flow_id.flow_id())
304 .await
305 .map_err(BoxedError::new)
306 .context(InternalSnafu)?
307 .with_context(|| FlowInfoNotFoundSnafu {
308 catalog_name: catalog_name.clone(),
309 flow_name: flow_name.clone(),
310 })?;
311 self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
312 .await?;
313 }
314
315 self.finish()
316 }
317
318 async fn add_flow(
319 &mut self,
320 predicates: &Predicates,
321 flow_id: FlowId,
322 flow_info: FlowInfoValue,
323 flow_stat: &Option<FlowStat>,
324 ) -> Result<()> {
325 let row = [
326 (FLOW_NAME, &Value::from(flow_info.flow_name().clone())),
327 (FLOW_ID, &Value::from(flow_id)),
328 (
329 TABLE_CATALOG,
330 &Value::from(flow_info.catalog_name().clone()),
331 ),
332 ];
333 if !predicates.eval(&row) {
334 return Ok(());
335 }
336 self.flow_names.push(Some(flow_info.flow_name()));
337 self.flow_ids.push(Some(flow_id));
338 self.state_sizes.push(
339 flow_stat
340 .as_ref()
341 .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
342 );
343 self.table_catalogs.push(Some(flow_info.catalog_name()));
344 self.raw_sqls
345 .push(Some(&InformationSchemaFlows::generate_show_create_flow(
346 &flow_info,
347 )?));
348 self.comments.push(Some(flow_info.comment()));
349 self.expire_afters.push(flow_info.expire_after());
350 self.source_table_id_groups.push(Some(
351 &serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
352 input: format!("{:?}", flow_info.source_table_ids()),
353 })?,
354 ));
355 self.sink_table_names
356 .push(Some(&flow_info.sink_table_name().to_string()));
357 self.flownode_id_groups.push(Some(
358 &serde_json::to_string(flow_info.flownode_ids()).context({
359 JsonSnafu {
360 input: format!("{:?}", flow_info.flownode_ids()),
361 }
362 })?,
363 ));
364 self.option_groups
365 .push(Some(&serde_json::to_string(flow_info.options()).context(
366 JsonSnafu {
367 input: format!("{:?}", flow_info.options()),
368 },
369 )?));
370 self.created_time
371 .push(Some(flow_info.created_time().timestamp_millis().into()));
372 self.updated_time
373 .push(Some(flow_info.updated_time().timestamp_millis().into()));
374 self.last_execution_time
375 .push(flow_stat.as_ref().and_then(|state| {
376 state
377 .last_exec_time_map
378 .get(&flow_id)
379 .map(|v| TimestampMillisecond::new(*v))
380 }));
381
382 let mut source_table_names = vec![];
383 let catalog_manager = self
384 .catalog_manager
385 .upgrade()
386 .context(UpgradeWeakCatalogManagerRefSnafu)?;
387 for table_id in flow_info.source_table_ids() {
388 if let Some(table_info) = catalog_manager.table_info_by_id(*table_id).await? {
389 source_table_names.push(table_info.full_table_name());
390 }
391 }
392
393 let source_table_names = source_table_names.join(",");
394 self.source_table_names.push(Some(&source_table_names));
395
396 Ok(())
397 }
398
399 fn finish(&mut self) -> Result<RecordBatch> {
400 let columns: Vec<VectorRef> = vec![
401 Arc::new(self.flow_names.finish()),
402 Arc::new(self.flow_ids.finish()),
403 Arc::new(self.state_sizes.finish()),
404 Arc::new(self.table_catalogs.finish()),
405 Arc::new(self.raw_sqls.finish()),
406 Arc::new(self.comments.finish()),
407 Arc::new(self.expire_afters.finish()),
408 Arc::new(self.source_table_id_groups.finish()),
409 Arc::new(self.sink_table_names.finish()),
410 Arc::new(self.flownode_id_groups.finish()),
411 Arc::new(self.option_groups.finish()),
412 Arc::new(self.created_time.finish()),
413 Arc::new(self.updated_time.finish()),
414 Arc::new(self.last_execution_time.finish()),
415 Arc::new(self.source_table_names.finish()),
416 ];
417 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
418 }
419}
420
421impl DfPartitionStream for InformationSchemaFlows {
422 fn schema(&self) -> &arrow_schema::SchemaRef {
423 self.schema.arrow_schema()
424 }
425
426 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
427 let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
428 let mut builder = self.builder();
429 Box::pin(DfRecordBatchStreamAdapter::new(
430 schema,
431 futures::stream::once(async move {
432 builder
433 .make_flows(None)
434 .await
435 .map(|x| x.into_df_record_batch())
436 .map_err(Into::into)
437 }),
438 ))
439 }
440}