1use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_meta::ddl::create_flow::FlowType;
20use common_meta::key::FlowId;
21use common_meta::key::flow::FlowMetadataManager;
22use common_meta::key::flow::flow_info::FlowInfoValue;
23use common_meta::key::flow::flow_state::FlowStat;
24use common_recordbatch::adapter::RecordBatchStreamAdapter;
25use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
26use datafusion::execution::TaskContext;
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29use datatypes::prelude::ConcreteDataType as CDT;
30use datatypes::scalars::ScalarVectorBuilder;
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::timestamp::TimestampMillisecond;
33use datatypes::value::Value;
34use datatypes::vectors::{
35 Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
36 UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
37};
38use futures::TryStreamExt;
39use snafu::{OptionExt, ResultExt};
40use sql::ast::Ident;
41use sql::dialect::GreptimeDbDialect;
42use sql::parser::ParserContext;
43use sql::statements::create::{CreateFlow, SqlOrTql};
44use sql::statements::statement::Statement;
45use store_api::storage::{ScanRequest, TableId};
46
47use crate::CatalogManager;
48use crate::error::{
49 CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
50 Result, UpgradeWeakCatalogManagerRefSnafu,
51};
52use crate::information_schema::{FLOWS, Predicates};
53use crate::system_schema::information_schema::InformationTable;
54use crate::system_schema::utils;
55
56const INIT_CAPACITY: usize = 42;
57
58pub const FLOW_NAME: &str = "flow_name";
61pub const FLOW_ID: &str = "flow_id";
62pub const STATE_SIZE: &str = "state_size";
63pub const TABLE_CATALOG: &str = "table_catalog";
64pub const FLOW_DEFINITION: &str = "flow_definition";
65pub const COMMENT: &str = "comment";
66pub const EXPIRE_AFTER: &str = "expire_after";
67pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
68pub const SINK_TABLE_NAME: &str = "sink_table_name";
69pub const FLOWNODE_IDS: &str = "flownode_ids";
70pub const OPTIONS: &str = "options";
71pub const CREATED_TIME: &str = "created_time";
72pub const UPDATED_TIME: &str = "updated_time";
73pub const LAST_EXECUTION_TIME: &str = "last_execution_time";
74pub const SOURCE_TABLE_NAMES: &str = "source_table_names";
75pub const FLOWNODE_ADDRS: &str = "flownode_addrs";
76
77#[derive(Debug)]
79pub(super) struct InformationSchemaFlows {
80 schema: SchemaRef,
81 catalog_name: String,
82 catalog_manager: Weak<dyn CatalogManager>,
83 flow_metadata_manager: Arc<FlowMetadataManager>,
84}
85
86impl InformationSchemaFlows {
87 pub(super) fn new(
88 catalog_name: String,
89 catalog_manager: Weak<dyn CatalogManager>,
90 flow_metadata_manager: Arc<FlowMetadataManager>,
91 ) -> Self {
92 Self {
93 schema: Self::schema(),
94 catalog_name,
95 catalog_manager,
96 flow_metadata_manager,
97 }
98 }
99
100 pub(crate) fn schema() -> SchemaRef {
104 Arc::new(Schema::new(
105 vec![
106 (FLOW_NAME, CDT::string_datatype(), false),
107 (FLOW_ID, CDT::uint32_datatype(), false),
108 (STATE_SIZE, CDT::uint64_datatype(), true),
109 (TABLE_CATALOG, CDT::string_datatype(), false),
110 (FLOW_DEFINITION, CDT::string_datatype(), false),
111 (COMMENT, CDT::string_datatype(), true),
112 (EXPIRE_AFTER, CDT::int64_datatype(), true),
113 (SOURCE_TABLE_IDS, CDT::string_datatype(), true),
114 (SINK_TABLE_NAME, CDT::string_datatype(), false),
115 (FLOWNODE_IDS, CDT::string_datatype(), true),
116 (OPTIONS, CDT::string_datatype(), true),
117 (CREATED_TIME, CDT::timestamp_millisecond_datatype(), false),
118 (UPDATED_TIME, CDT::timestamp_millisecond_datatype(), false),
119 (
120 LAST_EXECUTION_TIME,
121 CDT::timestamp_millisecond_datatype(),
122 true,
123 ),
124 (SOURCE_TABLE_NAMES, CDT::string_datatype(), true),
125 (FLOWNODE_ADDRS, CDT::string_datatype(), true),
126 ]
127 .into_iter()
128 .map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
129 .collect(),
130 ))
131 }
132
133 pub(crate) fn generate_show_create_flow(flow_info: &FlowInfoValue) -> Result<String> {
135 let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, flow_info.raw_sql())
136 .map_err(BoxedError::new)
137 .context(InternalSnafu)?;
138
139 let query = parser_ctx
140 .parse_statement()
141 .map_err(BoxedError::new)
142 .context(InternalSnafu)?;
143
144 let raw_query = match &query {
145 Statement::Tql(_) => flow_info.raw_sql().clone(),
146 _ => query.to_string(),
147 };
148
149 let query = Box::new(
150 SqlOrTql::try_from_statement(query, &raw_query)
151 .map_err(BoxedError::new)
152 .context(InternalSnafu)?,
153 );
154
155 let comment = if flow_info.comment().is_empty() {
156 None
157 } else {
158 Some(flow_info.comment().clone())
159 };
160
161 let stmt = CreateFlow {
162 flow_name: sql::ast::ObjectName::from(vec![Ident::new(flow_info.flow_name())]),
163 sink_table_name: sql::ast::ObjectName::from(vec![
164 Ident::new(&flow_info.sink_table_name().schema_name),
165 Ident::new(&flow_info.sink_table_name().table_name),
166 ]),
167 or_replace: false,
168 if_not_exists: true,
169 expire_after: flow_info.expire_after(),
170 eval_interval: flow_info.eval_interval(),
171 comment,
172 flow_options: sql::statements::OptionMap::from_filtered_string_map(
173 flow_info.options(),
174 &[FlowType::FLOW_TYPE_KEY],
175 ),
176 query,
177 };
178
179 Ok(stmt.to_string())
180 }
181
182 fn builder(&self) -> InformationSchemaFlowsBuilder {
183 InformationSchemaFlowsBuilder::new(
184 self.schema.clone(),
185 self.catalog_name.clone(),
186 self.catalog_manager.clone(),
187 &self.flow_metadata_manager,
188 )
189 }
190}
191
192impl InformationTable for InformationSchemaFlows {
193 fn table_id(&self) -> TableId {
194 INFORMATION_SCHEMA_FLOW_TABLE_ID
195 }
196
197 fn table_name(&self) -> &'static str {
198 FLOWS
199 }
200
201 fn schema(&self) -> SchemaRef {
202 self.schema.clone()
203 }
204
205 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
206 let schema = self.schema.arrow_schema().clone();
207 let mut builder = self.builder();
208 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
209 schema,
210 futures::stream::once(async move {
211 builder
212 .make_flows(Some(request))
213 .await
214 .map(|x| x.into_df_record_batch())
215 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
216 }),
217 ));
218 Ok(Box::pin(
219 RecordBatchStreamAdapter::try_new(stream)
220 .map_err(BoxedError::new)
221 .context(InternalSnafu)?,
222 ))
223 }
224}
225
226struct InformationSchemaFlowsBuilder {
230 schema: SchemaRef,
231 catalog_name: String,
232 catalog_manager: Weak<dyn CatalogManager>,
233 flow_metadata_manager: Arc<FlowMetadataManager>,
234
235 flow_names: StringVectorBuilder,
236 flow_ids: UInt32VectorBuilder,
237 state_sizes: UInt64VectorBuilder,
238 table_catalogs: StringVectorBuilder,
239 raw_sqls: StringVectorBuilder,
240 comments: StringVectorBuilder,
241 expire_afters: Int64VectorBuilder,
242 source_table_id_groups: StringVectorBuilder,
243 sink_table_names: StringVectorBuilder,
244 flownode_id_groups: StringVectorBuilder,
245 option_groups: StringVectorBuilder,
246 created_time: TimestampMillisecondVectorBuilder,
247 updated_time: TimestampMillisecondVectorBuilder,
248 last_execution_time: TimestampMillisecondVectorBuilder,
249 source_table_names: StringVectorBuilder,
250 flownode_addr_groups: StringVectorBuilder,
251}
252
253impl InformationSchemaFlowsBuilder {
254 fn new(
255 schema: SchemaRef,
256 catalog_name: String,
257 catalog_manager: Weak<dyn CatalogManager>,
258 flow_metadata_manager: &Arc<FlowMetadataManager>,
259 ) -> Self {
260 Self {
261 schema,
262 catalog_name,
263 catalog_manager,
264 flow_metadata_manager: flow_metadata_manager.clone(),
265
266 flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
267 flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
268 state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
269 table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
270 raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
271 comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
272 expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
273 source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
274 sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
275 flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
276 option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
277 created_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
278 updated_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
279 last_execution_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
280 source_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
281 flownode_addr_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
282 }
283 }
284
285 async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
287 let catalog_name = self.catalog_name.clone();
288 let predicates = Predicates::from_scan_request(&request);
289
290 let flow_info_manager = self.flow_metadata_manager.clone();
291
292 let mut stream = flow_info_manager
294 .flow_name_manager()
295 .flow_names(&catalog_name)
296 .await;
297
298 let flow_stat = {
299 let information_extension = utils::information_extension(&self.catalog_manager)?;
300 information_extension.flow_stats().await?
301 };
302
303 while let Some((flow_name, flow_id)) = stream
304 .try_next()
305 .await
306 .map_err(BoxedError::new)
307 .context(ListFlowsSnafu {
308 catalog: &catalog_name,
309 })?
310 {
311 let flow_info = flow_info_manager
312 .flow_info_manager()
313 .get(flow_id.flow_id())
314 .await
315 .map_err(BoxedError::new)
316 .context(InternalSnafu)?
317 .with_context(|| FlowInfoNotFoundSnafu {
318 catalog_name: catalog_name.clone(),
319 flow_name: flow_name.clone(),
320 })?;
321 self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
322 .await?;
323 }
324
325 self.finish()
326 }
327
328 async fn add_flow(
329 &mut self,
330 predicates: &Predicates,
331 flow_id: FlowId,
332 flow_info: FlowInfoValue,
333 flow_stat: &Option<FlowStat>,
334 ) -> Result<()> {
335 let row = [
336 (FLOW_NAME, &Value::from(flow_info.flow_name().clone())),
337 (FLOW_ID, &Value::from(flow_id)),
338 (
339 TABLE_CATALOG,
340 &Value::from(flow_info.catalog_name().clone()),
341 ),
342 ];
343 if !predicates.eval(&row) {
344 return Ok(());
345 }
346 self.flow_names.push(Some(flow_info.flow_name()));
347 self.flow_ids.push(Some(flow_id));
348 self.state_sizes.push(
349 flow_stat
350 .as_ref()
351 .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
352 );
353 self.table_catalogs.push(Some(flow_info.catalog_name()));
354 self.raw_sqls
355 .push(Some(&InformationSchemaFlows::generate_show_create_flow(
356 &flow_info,
357 )?));
358 self.comments.push(Some(flow_info.comment()));
359 self.expire_afters.push(flow_info.expire_after());
360 self.source_table_id_groups.push(Some(
361 &serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
362 input: format!("{:?}", flow_info.source_table_ids()),
363 })?,
364 ));
365 self.sink_table_names
366 .push(Some(&flow_info.sink_table_name().to_string()));
367 self.flownode_id_groups.push(Some(
368 &serde_json::to_string(flow_info.flownode_ids()).context({
369 JsonSnafu {
370 input: format!("{:?}", flow_info.flownode_ids()),
371 }
372 })?,
373 ));
374 self.option_groups
375 .push(Some(&serde_json::to_string(flow_info.options()).context(
376 JsonSnafu {
377 input: format!("{:?}", flow_info.options()),
378 },
379 )?));
380 self.created_time
381 .push(Some(flow_info.created_time().timestamp_millis().into()));
382 self.updated_time
383 .push(Some(flow_info.updated_time().timestamp_millis().into()));
384 self.last_execution_time
385 .push(flow_stat.as_ref().and_then(|state| {
386 state
387 .last_exec_time_map
388 .get(&flow_id)
389 .map(|v| TimestampMillisecond::new(*v))
390 }));
391 let flownode_addrs = self
392 .flow_metadata_manager
393 .flownode_addrs(flow_id)
394 .await
395 .map_err(BoxedError::new)
396 .context(InternalSnafu)?;
397 if flownode_addrs.is_empty() {
398 self.flownode_addr_groups.push(None);
399 } else {
400 let flownode_addrs_json =
401 serde_json::to_string(&flownode_addrs).with_context(|_| JsonSnafu {
402 input: format!("{:?}", flownode_addrs),
403 })?;
404 self.flownode_addr_groups.push(Some(&flownode_addrs_json));
405 }
406
407 let mut source_table_names = vec![];
408 let catalog_manager = self
409 .catalog_manager
410 .upgrade()
411 .context(UpgradeWeakCatalogManagerRefSnafu)?;
412 for table_id in flow_info.source_table_ids() {
413 if let Some(table_info) = catalog_manager.table_info_by_id(*table_id).await? {
414 source_table_names.push(table_info.full_table_name());
415 }
416 }
417
418 let source_table_names = source_table_names.join(",");
419 self.source_table_names.push(Some(&source_table_names));
420
421 Ok(())
422 }
423
424 fn finish(&mut self) -> Result<RecordBatch> {
425 let columns: Vec<VectorRef> = vec![
426 Arc::new(self.flow_names.finish()),
427 Arc::new(self.flow_ids.finish()),
428 Arc::new(self.state_sizes.finish()),
429 Arc::new(self.table_catalogs.finish()),
430 Arc::new(self.raw_sqls.finish()),
431 Arc::new(self.comments.finish()),
432 Arc::new(self.expire_afters.finish()),
433 Arc::new(self.source_table_id_groups.finish()),
434 Arc::new(self.sink_table_names.finish()),
435 Arc::new(self.flownode_id_groups.finish()),
436 Arc::new(self.option_groups.finish()),
437 Arc::new(self.created_time.finish()),
438 Arc::new(self.updated_time.finish()),
439 Arc::new(self.last_execution_time.finish()),
440 Arc::new(self.source_table_names.finish()),
441 Arc::new(self.flownode_addr_groups.finish()),
442 ];
443 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
444 }
445}
446
447impl DfPartitionStream for InformationSchemaFlows {
448 fn schema(&self) -> &arrow_schema::SchemaRef {
449 self.schema.arrow_schema()
450 }
451
452 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
453 let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
454 let mut builder = self.builder();
455 Box::pin(DfRecordBatchStreamAdapter::new(
456 schema,
457 futures::stream::once(async move {
458 builder
459 .make_flows(None)
460 .await
461 .map(|x| x.into_df_record_batch())
462 .map_err(Into::into)
463 }),
464 ))
465 }
466}