Skip to main content

catalog/system_schema/information_schema/
flows.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_meta::ddl::create_flow::FlowType;
20use common_meta::key::FlowId;
21use common_meta::key::flow::FlowMetadataManager;
22use common_meta::key::flow::flow_info::FlowInfoValue;
23use common_meta::key::flow::flow_state::FlowStat;
24use common_recordbatch::adapter::RecordBatchStreamAdapter;
25use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
26use datafusion::execution::TaskContext;
27use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
28use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
29use datatypes::prelude::ConcreteDataType as CDT;
30use datatypes::scalars::ScalarVectorBuilder;
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::timestamp::TimestampMillisecond;
33use datatypes::value::Value;
34use datatypes::vectors::{
35    Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
36    UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
37};
38use futures::TryStreamExt;
39use snafu::{OptionExt, ResultExt};
40use sql::ast::Ident;
41use sql::dialect::GreptimeDbDialect;
42use sql::parser::ParserContext;
43use sql::statements::create::{CreateFlow, SqlOrTql};
44use sql::statements::statement::Statement;
45use store_api::storage::{ScanRequest, TableId};
46
47use crate::CatalogManager;
48use crate::error::{
49    CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
50    Result, UpgradeWeakCatalogManagerRefSnafu,
51};
52use crate::information_schema::{FLOWS, Predicates};
53use crate::system_schema::information_schema::InformationTable;
54use crate::system_schema::utils;
55
56const INIT_CAPACITY: usize = 42;
57
58// rows of information_schema.flows
59// pk is (flow_name, flow_id, table_catalog)
60pub const FLOW_NAME: &str = "flow_name";
61pub const FLOW_ID: &str = "flow_id";
62pub const STATE_SIZE: &str = "state_size";
63pub const TABLE_CATALOG: &str = "table_catalog";
64pub const FLOW_DEFINITION: &str = "flow_definition";
65pub const COMMENT: &str = "comment";
66pub const EXPIRE_AFTER: &str = "expire_after";
67pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
68pub const SINK_TABLE_NAME: &str = "sink_table_name";
69pub const FLOWNODE_IDS: &str = "flownode_ids";
70pub const OPTIONS: &str = "options";
71pub const CREATED_TIME: &str = "created_time";
72pub const UPDATED_TIME: &str = "updated_time";
73pub const LAST_EXECUTION_TIME: &str = "last_execution_time";
74pub const SOURCE_TABLE_NAMES: &str = "source_table_names";
75pub const FLOWNODE_ADDRS: &str = "flownode_addrs";
76
77/// The `information_schema.flows` to provides information about flows in databases.
78#[derive(Debug)]
79pub(super) struct InformationSchemaFlows {
80    schema: SchemaRef,
81    catalog_name: String,
82    catalog_manager: Weak<dyn CatalogManager>,
83    flow_metadata_manager: Arc<FlowMetadataManager>,
84}
85
86impl InformationSchemaFlows {
87    pub(super) fn new(
88        catalog_name: String,
89        catalog_manager: Weak<dyn CatalogManager>,
90        flow_metadata_manager: Arc<FlowMetadataManager>,
91    ) -> Self {
92        Self {
93            schema: Self::schema(),
94            catalog_name,
95            catalog_manager,
96            flow_metadata_manager,
97        }
98    }
99
100    /// for complex fields(including [`SOURCE_TABLE_IDS`], [`FLOWNODE_IDS`], [`OPTIONS`] and
101    /// [`FLOWNODE_ADDRS`]), it will be serialized to json string for now
102    /// TODO(discord9): use a better way to store complex fields like json type
103    pub(crate) fn schema() -> SchemaRef {
104        Arc::new(Schema::new(
105            vec![
106                (FLOW_NAME, CDT::string_datatype(), false),
107                (FLOW_ID, CDT::uint32_datatype(), false),
108                (STATE_SIZE, CDT::uint64_datatype(), true),
109                (TABLE_CATALOG, CDT::string_datatype(), false),
110                (FLOW_DEFINITION, CDT::string_datatype(), false),
111                (COMMENT, CDT::string_datatype(), true),
112                (EXPIRE_AFTER, CDT::int64_datatype(), true),
113                (SOURCE_TABLE_IDS, CDT::string_datatype(), true),
114                (SINK_TABLE_NAME, CDT::string_datatype(), false),
115                (FLOWNODE_IDS, CDT::string_datatype(), true),
116                (OPTIONS, CDT::string_datatype(), true),
117                (CREATED_TIME, CDT::timestamp_millisecond_datatype(), false),
118                (UPDATED_TIME, CDT::timestamp_millisecond_datatype(), false),
119                (
120                    LAST_EXECUTION_TIME,
121                    CDT::timestamp_millisecond_datatype(),
122                    true,
123                ),
124                (SOURCE_TABLE_NAMES, CDT::string_datatype(), true),
125                (FLOWNODE_ADDRS, CDT::string_datatype(), true),
126            ]
127            .into_iter()
128            .map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
129            .collect(),
130        ))
131    }
132
133    /// Generates the CREATE FLOW statement for the flow_definition column
134    pub(crate) fn generate_show_create_flow(flow_info: &FlowInfoValue) -> Result<String> {
135        let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, flow_info.raw_sql())
136            .map_err(BoxedError::new)
137            .context(InternalSnafu)?;
138
139        let query = parser_ctx
140            .parse_statement()
141            .map_err(BoxedError::new)
142            .context(InternalSnafu)?;
143
144        let raw_query = match &query {
145            Statement::Tql(_) => flow_info.raw_sql().clone(),
146            _ => query.to_string(),
147        };
148
149        let query = Box::new(
150            SqlOrTql::try_from_statement(query, &raw_query)
151                .map_err(BoxedError::new)
152                .context(InternalSnafu)?,
153        );
154
155        let comment = if flow_info.comment().is_empty() {
156            None
157        } else {
158            Some(flow_info.comment().clone())
159        };
160
161        let stmt = CreateFlow {
162            flow_name: sql::ast::ObjectName::from(vec![Ident::new(flow_info.flow_name())]),
163            sink_table_name: sql::ast::ObjectName::from(vec![
164                Ident::new(&flow_info.sink_table_name().schema_name),
165                Ident::new(&flow_info.sink_table_name().table_name),
166            ]),
167            or_replace: false,
168            if_not_exists: true,
169            expire_after: flow_info.expire_after(),
170            eval_interval: flow_info.eval_interval(),
171            comment,
172            flow_options: sql::statements::OptionMap::from_filtered_string_map(
173                flow_info.options(),
174                &[FlowType::FLOW_TYPE_KEY],
175            ),
176            query,
177        };
178
179        Ok(stmt.to_string())
180    }
181
182    fn builder(&self) -> InformationSchemaFlowsBuilder {
183        InformationSchemaFlowsBuilder::new(
184            self.schema.clone(),
185            self.catalog_name.clone(),
186            self.catalog_manager.clone(),
187            &self.flow_metadata_manager,
188        )
189    }
190}
191
192impl InformationTable for InformationSchemaFlows {
193    fn table_id(&self) -> TableId {
194        INFORMATION_SCHEMA_FLOW_TABLE_ID
195    }
196
197    fn table_name(&self) -> &'static str {
198        FLOWS
199    }
200
201    fn schema(&self) -> SchemaRef {
202        self.schema.clone()
203    }
204
205    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
206        let schema = self.schema.arrow_schema().clone();
207        let mut builder = self.builder();
208        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
209            schema,
210            futures::stream::once(async move {
211                builder
212                    .make_flows(Some(request))
213                    .await
214                    .map(|x| x.into_df_record_batch())
215                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
216            }),
217        ));
218        Ok(Box::pin(
219            RecordBatchStreamAdapter::try_new(stream)
220                .map_err(BoxedError::new)
221                .context(InternalSnafu)?,
222        ))
223    }
224}
225
226/// Builds the `information_schema.FLOWS` table row by row
227///
228/// columns are based on [`FlowInfoValue`]
229struct InformationSchemaFlowsBuilder {
230    schema: SchemaRef,
231    catalog_name: String,
232    catalog_manager: Weak<dyn CatalogManager>,
233    flow_metadata_manager: Arc<FlowMetadataManager>,
234
235    flow_names: StringVectorBuilder,
236    flow_ids: UInt32VectorBuilder,
237    state_sizes: UInt64VectorBuilder,
238    table_catalogs: StringVectorBuilder,
239    raw_sqls: StringVectorBuilder,
240    comments: StringVectorBuilder,
241    expire_afters: Int64VectorBuilder,
242    source_table_id_groups: StringVectorBuilder,
243    sink_table_names: StringVectorBuilder,
244    flownode_id_groups: StringVectorBuilder,
245    option_groups: StringVectorBuilder,
246    created_time: TimestampMillisecondVectorBuilder,
247    updated_time: TimestampMillisecondVectorBuilder,
248    last_execution_time: TimestampMillisecondVectorBuilder,
249    source_table_names: StringVectorBuilder,
250    flownode_addr_groups: StringVectorBuilder,
251}
252
253impl InformationSchemaFlowsBuilder {
254    fn new(
255        schema: SchemaRef,
256        catalog_name: String,
257        catalog_manager: Weak<dyn CatalogManager>,
258        flow_metadata_manager: &Arc<FlowMetadataManager>,
259    ) -> Self {
260        Self {
261            schema,
262            catalog_name,
263            catalog_manager,
264            flow_metadata_manager: flow_metadata_manager.clone(),
265
266            flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
267            flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
268            state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
269            table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
270            raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
271            comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
272            expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
273            source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
274            sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
275            flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
276            option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
277            created_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
278            updated_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
279            last_execution_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
280            source_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
281            flownode_addr_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
282        }
283    }
284
285    /// Construct the `information_schema.flows` virtual table
286    async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
287        let catalog_name = self.catalog_name.clone();
288        let predicates = Predicates::from_scan_request(&request);
289
290        let flow_info_manager = self.flow_metadata_manager.clone();
291
292        // TODO(discord9): use `AsyncIterator` once it's stable-ish
293        let mut stream = flow_info_manager
294            .flow_name_manager()
295            .flow_names(&catalog_name)
296            .await;
297
298        let flow_stat = {
299            let information_extension = utils::information_extension(&self.catalog_manager)?;
300            information_extension.flow_stats().await?
301        };
302
303        while let Some((flow_name, flow_id)) = stream
304            .try_next()
305            .await
306            .map_err(BoxedError::new)
307            .context(ListFlowsSnafu {
308                catalog: &catalog_name,
309            })?
310        {
311            let flow_info = flow_info_manager
312                .flow_info_manager()
313                .get(flow_id.flow_id())
314                .await
315                .map_err(BoxedError::new)
316                .context(InternalSnafu)?
317                .with_context(|| FlowInfoNotFoundSnafu {
318                    catalog_name: catalog_name.clone(),
319                    flow_name: flow_name.clone(),
320                })?;
321            self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
322                .await?;
323        }
324
325        self.finish()
326    }
327
328    async fn add_flow(
329        &mut self,
330        predicates: &Predicates,
331        flow_id: FlowId,
332        flow_info: FlowInfoValue,
333        flow_stat: &Option<FlowStat>,
334    ) -> Result<()> {
335        let row = [
336            (FLOW_NAME, &Value::from(flow_info.flow_name().clone())),
337            (FLOW_ID, &Value::from(flow_id)),
338            (
339                TABLE_CATALOG,
340                &Value::from(flow_info.catalog_name().clone()),
341            ),
342        ];
343        if !predicates.eval(&row) {
344            return Ok(());
345        }
346        self.flow_names.push(Some(flow_info.flow_name()));
347        self.flow_ids.push(Some(flow_id));
348        self.state_sizes.push(
349            flow_stat
350                .as_ref()
351                .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
352        );
353        self.table_catalogs.push(Some(flow_info.catalog_name()));
354        self.raw_sqls
355            .push(Some(&InformationSchemaFlows::generate_show_create_flow(
356                &flow_info,
357            )?));
358        self.comments.push(Some(flow_info.comment()));
359        self.expire_afters.push(flow_info.expire_after());
360        self.source_table_id_groups.push(Some(
361            &serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
362                input: format!("{:?}", flow_info.source_table_ids()),
363            })?,
364        ));
365        self.sink_table_names
366            .push(Some(&flow_info.sink_table_name().to_string()));
367        self.flownode_id_groups.push(Some(
368            &serde_json::to_string(flow_info.flownode_ids()).context({
369                JsonSnafu {
370                    input: format!("{:?}", flow_info.flownode_ids()),
371                }
372            })?,
373        ));
374        self.option_groups
375            .push(Some(&serde_json::to_string(flow_info.options()).context(
376                JsonSnafu {
377                    input: format!("{:?}", flow_info.options()),
378                },
379            )?));
380        self.created_time
381            .push(Some(flow_info.created_time().timestamp_millis().into()));
382        self.updated_time
383            .push(Some(flow_info.updated_time().timestamp_millis().into()));
384        self.last_execution_time
385            .push(flow_stat.as_ref().and_then(|state| {
386                state
387                    .last_exec_time_map
388                    .get(&flow_id)
389                    .map(|v| TimestampMillisecond::new(*v))
390            }));
391        let flownode_addrs = self
392            .flow_metadata_manager
393            .flownode_addrs(flow_id)
394            .await
395            .map_err(BoxedError::new)
396            .context(InternalSnafu)?;
397        if flownode_addrs.is_empty() {
398            self.flownode_addr_groups.push(None);
399        } else {
400            let flownode_addrs_json =
401                serde_json::to_string(&flownode_addrs).with_context(|_| JsonSnafu {
402                    input: format!("{:?}", flownode_addrs),
403                })?;
404            self.flownode_addr_groups.push(Some(&flownode_addrs_json));
405        }
406
407        let mut source_table_names = vec![];
408        let catalog_manager = self
409            .catalog_manager
410            .upgrade()
411            .context(UpgradeWeakCatalogManagerRefSnafu)?;
412        for table_id in flow_info.source_table_ids() {
413            if let Some(table_info) = catalog_manager.table_info_by_id(*table_id).await? {
414                source_table_names.push(table_info.full_table_name());
415            }
416        }
417
418        let source_table_names = source_table_names.join(",");
419        self.source_table_names.push(Some(&source_table_names));
420
421        Ok(())
422    }
423
424    fn finish(&mut self) -> Result<RecordBatch> {
425        let columns: Vec<VectorRef> = vec![
426            Arc::new(self.flow_names.finish()),
427            Arc::new(self.flow_ids.finish()),
428            Arc::new(self.state_sizes.finish()),
429            Arc::new(self.table_catalogs.finish()),
430            Arc::new(self.raw_sqls.finish()),
431            Arc::new(self.comments.finish()),
432            Arc::new(self.expire_afters.finish()),
433            Arc::new(self.source_table_id_groups.finish()),
434            Arc::new(self.sink_table_names.finish()),
435            Arc::new(self.flownode_id_groups.finish()),
436            Arc::new(self.option_groups.finish()),
437            Arc::new(self.created_time.finish()),
438            Arc::new(self.updated_time.finish()),
439            Arc::new(self.last_execution_time.finish()),
440            Arc::new(self.source_table_names.finish()),
441            Arc::new(self.flownode_addr_groups.finish()),
442        ];
443        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
444    }
445}
446
447impl DfPartitionStream for InformationSchemaFlows {
448    fn schema(&self) -> &arrow_schema::SchemaRef {
449        self.schema.arrow_schema()
450    }
451
452    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
453        let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
454        let mut builder = self.builder();
455        Box::pin(DfRecordBatchStreamAdapter::new(
456            schema,
457            futures::stream::once(async move {
458                builder
459                    .make_flows(None)
460                    .await
461                    .map(|x| x.into_df_record_batch())
462                    .map_err(Into::into)
463            }),
464        ))
465    }
466}