1use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_meta::key::flow::flow_info::FlowInfoValue;
20use common_meta::key::flow::flow_state::FlowStat;
21use common_meta::key::flow::FlowMetadataManager;
22use common_meta::key::FlowId;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datatypes::prelude::ConcreteDataType as CDT;
29use datatypes::scalars::ScalarVectorBuilder;
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::timestamp::TimestampMillisecond;
32use datatypes::value::Value;
33use datatypes::vectors::{
34 Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
35 UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
36};
37use futures::TryStreamExt;
38use snafu::{OptionExt, ResultExt};
39use store_api::storage::{ScanRequest, TableId};
40
41use crate::error::{
42 CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
43 Result, UpgradeWeakCatalogManagerRefSnafu,
44};
45use crate::information_schema::{Predicates, FLOWS};
46use crate::system_schema::information_schema::InformationTable;
47use crate::system_schema::utils;
48use crate::CatalogManager;
49
50const INIT_CAPACITY: usize = 42;
51
52pub const FLOW_NAME: &str = "flow_name";
55pub const FLOW_ID: &str = "flow_id";
56pub const STATE_SIZE: &str = "state_size";
57pub const TABLE_CATALOG: &str = "table_catalog";
58pub const FLOW_DEFINITION: &str = "flow_definition";
59pub const COMMENT: &str = "comment";
60pub const EXPIRE_AFTER: &str = "expire_after";
61pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
62pub const SINK_TABLE_NAME: &str = "sink_table_name";
63pub const FLOWNODE_IDS: &str = "flownode_ids";
64pub const OPTIONS: &str = "options";
65pub const CREATED_TIME: &str = "created_time";
66pub const UPDATED_TIME: &str = "updated_time";
67pub const LAST_EXECUTION_TIME: &str = "last_execution_time";
68pub const SOURCE_TABLE_NAMES: &str = "source_table_names";
69
70#[derive(Debug)]
72pub(super) struct InformationSchemaFlows {
73 schema: SchemaRef,
74 catalog_name: String,
75 catalog_manager: Weak<dyn CatalogManager>,
76 flow_metadata_manager: Arc<FlowMetadataManager>,
77}
78
79impl InformationSchemaFlows {
80 pub(super) fn new(
81 catalog_name: String,
82 catalog_manager: Weak<dyn CatalogManager>,
83 flow_metadata_manager: Arc<FlowMetadataManager>,
84 ) -> Self {
85 Self {
86 schema: Self::schema(),
87 catalog_name,
88 catalog_manager,
89 flow_metadata_manager,
90 }
91 }
92
93 pub(crate) fn schema() -> SchemaRef {
96 Arc::new(Schema::new(
97 vec![
98 (FLOW_NAME, CDT::string_datatype(), false),
99 (FLOW_ID, CDT::uint32_datatype(), false),
100 (STATE_SIZE, CDT::uint64_datatype(), true),
101 (TABLE_CATALOG, CDT::string_datatype(), false),
102 (FLOW_DEFINITION, CDT::string_datatype(), false),
103 (COMMENT, CDT::string_datatype(), true),
104 (EXPIRE_AFTER, CDT::int64_datatype(), true),
105 (SOURCE_TABLE_IDS, CDT::string_datatype(), true),
106 (SINK_TABLE_NAME, CDT::string_datatype(), false),
107 (FLOWNODE_IDS, CDT::string_datatype(), true),
108 (OPTIONS, CDT::string_datatype(), true),
109 (CREATED_TIME, CDT::timestamp_millisecond_datatype(), false),
110 (UPDATED_TIME, CDT::timestamp_millisecond_datatype(), false),
111 (
112 LAST_EXECUTION_TIME,
113 CDT::timestamp_millisecond_datatype(),
114 true,
115 ),
116 (SOURCE_TABLE_NAMES, CDT::string_datatype(), true),
117 ]
118 .into_iter()
119 .map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
120 .collect(),
121 ))
122 }
123
124 fn builder(&self) -> InformationSchemaFlowsBuilder {
125 InformationSchemaFlowsBuilder::new(
126 self.schema.clone(),
127 self.catalog_name.clone(),
128 self.catalog_manager.clone(),
129 &self.flow_metadata_manager,
130 )
131 }
132}
133
134impl InformationTable for InformationSchemaFlows {
135 fn table_id(&self) -> TableId {
136 INFORMATION_SCHEMA_FLOW_TABLE_ID
137 }
138
139 fn table_name(&self) -> &'static str {
140 FLOWS
141 }
142
143 fn schema(&self) -> SchemaRef {
144 self.schema.clone()
145 }
146
147 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
148 let schema = self.schema.arrow_schema().clone();
149 let mut builder = self.builder();
150 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
151 schema,
152 futures::stream::once(async move {
153 builder
154 .make_flows(Some(request))
155 .await
156 .map(|x| x.into_df_record_batch())
157 .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
158 }),
159 ));
160 Ok(Box::pin(
161 RecordBatchStreamAdapter::try_new(stream)
162 .map_err(BoxedError::new)
163 .context(InternalSnafu)?,
164 ))
165 }
166}
167
168struct InformationSchemaFlowsBuilder {
172 schema: SchemaRef,
173 catalog_name: String,
174 catalog_manager: Weak<dyn CatalogManager>,
175 flow_metadata_manager: Arc<FlowMetadataManager>,
176
177 flow_names: StringVectorBuilder,
178 flow_ids: UInt32VectorBuilder,
179 state_sizes: UInt64VectorBuilder,
180 table_catalogs: StringVectorBuilder,
181 raw_sqls: StringVectorBuilder,
182 comments: StringVectorBuilder,
183 expire_afters: Int64VectorBuilder,
184 source_table_id_groups: StringVectorBuilder,
185 sink_table_names: StringVectorBuilder,
186 flownode_id_groups: StringVectorBuilder,
187 option_groups: StringVectorBuilder,
188 created_time: TimestampMillisecondVectorBuilder,
189 updated_time: TimestampMillisecondVectorBuilder,
190 last_execution_time: TimestampMillisecondVectorBuilder,
191 source_table_names: StringVectorBuilder,
192}
193
194impl InformationSchemaFlowsBuilder {
195 fn new(
196 schema: SchemaRef,
197 catalog_name: String,
198 catalog_manager: Weak<dyn CatalogManager>,
199 flow_metadata_manager: &Arc<FlowMetadataManager>,
200 ) -> Self {
201 Self {
202 schema,
203 catalog_name,
204 catalog_manager,
205 flow_metadata_manager: flow_metadata_manager.clone(),
206
207 flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
208 flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
209 state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
210 table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
211 raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
212 comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
213 expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
214 source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
215 sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
216 flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
217 option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
218 created_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
219 updated_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
220 last_execution_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
221 source_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
222 }
223 }
224
225 async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
227 let catalog_name = self.catalog_name.clone();
228 let predicates = Predicates::from_scan_request(&request);
229
230 let flow_info_manager = self.flow_metadata_manager.clone();
231
232 let mut stream = flow_info_manager
234 .flow_name_manager()
235 .flow_names(&catalog_name)
236 .await;
237
238 let flow_stat = {
239 let information_extension = utils::information_extension(&self.catalog_manager)?;
240 information_extension.flow_stats().await?
241 };
242
243 while let Some((flow_name, flow_id)) = stream
244 .try_next()
245 .await
246 .map_err(BoxedError::new)
247 .context(ListFlowsSnafu {
248 catalog: &catalog_name,
249 })?
250 {
251 let flow_info = flow_info_manager
252 .flow_info_manager()
253 .get(flow_id.flow_id())
254 .await
255 .map_err(BoxedError::new)
256 .context(InternalSnafu)?
257 .context(FlowInfoNotFoundSnafu {
258 catalog_name: catalog_name.to_string(),
259 flow_name: flow_name.to_string(),
260 })?;
261 self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
262 .await?;
263 }
264
265 self.finish()
266 }
267
268 async fn add_flow(
269 &mut self,
270 predicates: &Predicates,
271 flow_id: FlowId,
272 flow_info: FlowInfoValue,
273 flow_stat: &Option<FlowStat>,
274 ) -> Result<()> {
275 let row = [
276 (FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
277 (FLOW_ID, &Value::from(flow_id)),
278 (
279 TABLE_CATALOG,
280 &Value::from(flow_info.catalog_name().to_string()),
281 ),
282 ];
283 if !predicates.eval(&row) {
284 return Ok(());
285 }
286 self.flow_names.push(Some(flow_info.flow_name()));
287 self.flow_ids.push(Some(flow_id));
288 self.state_sizes.push(
289 flow_stat
290 .as_ref()
291 .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
292 );
293 self.table_catalogs.push(Some(flow_info.catalog_name()));
294 self.raw_sqls.push(Some(flow_info.raw_sql()));
295 self.comments.push(Some(flow_info.comment()));
296 self.expire_afters.push(flow_info.expire_after());
297 self.source_table_id_groups.push(Some(
298 &serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
299 input: format!("{:?}", flow_info.source_table_ids()),
300 })?,
301 ));
302 self.sink_table_names
303 .push(Some(&flow_info.sink_table_name().to_string()));
304 self.flownode_id_groups.push(Some(
305 &serde_json::to_string(flow_info.flownode_ids()).context({
306 JsonSnafu {
307 input: format!("{:?}", flow_info.flownode_ids()),
308 }
309 })?,
310 ));
311 self.option_groups
312 .push(Some(&serde_json::to_string(flow_info.options()).context(
313 JsonSnafu {
314 input: format!("{:?}", flow_info.options()),
315 },
316 )?));
317 self.created_time
318 .push(Some(flow_info.created_time().timestamp_millis().into()));
319 self.updated_time
320 .push(Some(flow_info.updated_time().timestamp_millis().into()));
321 self.last_execution_time
322 .push(flow_stat.as_ref().and_then(|state| {
323 state
324 .last_exec_time_map
325 .get(&flow_id)
326 .map(|v| TimestampMillisecond::new(*v))
327 }));
328
329 let mut source_table_names = vec![];
330 let catalog_name = self.catalog_name.clone();
331 let catalog_manager = self
332 .catalog_manager
333 .upgrade()
334 .context(UpgradeWeakCatalogManagerRefSnafu)?;
335 for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
336 source_table_names.extend(
337 catalog_manager
338 .tables_by_ids(&catalog_name, &schema_name, flow_info.source_table_ids())
339 .await?
340 .into_iter()
341 .map(|table| table.table_info().full_table_name()),
342 );
343 }
344
345 let source_table_names = source_table_names.join(",");
346 self.source_table_names.push(Some(&source_table_names));
347
348 Ok(())
349 }
350
351 fn finish(&mut self) -> Result<RecordBatch> {
352 let columns: Vec<VectorRef> = vec![
353 Arc::new(self.flow_names.finish()),
354 Arc::new(self.flow_ids.finish()),
355 Arc::new(self.state_sizes.finish()),
356 Arc::new(self.table_catalogs.finish()),
357 Arc::new(self.raw_sqls.finish()),
358 Arc::new(self.comments.finish()),
359 Arc::new(self.expire_afters.finish()),
360 Arc::new(self.source_table_id_groups.finish()),
361 Arc::new(self.sink_table_names.finish()),
362 Arc::new(self.flownode_id_groups.finish()),
363 Arc::new(self.option_groups.finish()),
364 Arc::new(self.created_time.finish()),
365 Arc::new(self.updated_time.finish()),
366 Arc::new(self.last_execution_time.finish()),
367 Arc::new(self.source_table_names.finish()),
368 ];
369 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
370 }
371}
372
373impl DfPartitionStream for InformationSchemaFlows {
374 fn schema(&self) -> &arrow_schema::SchemaRef {
375 self.schema.arrow_schema()
376 }
377
378 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
379 let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
380 let mut builder = self.builder();
381 Box::pin(DfRecordBatchStreamAdapter::new(
382 schema,
383 futures::stream::once(async move {
384 builder
385 .make_flows(None)
386 .await
387 .map(|x| x.into_df_record_batch())
388 .map_err(Into::into)
389 }),
390 ))
391 }
392}