catalog/system_schema/information_schema/
flows.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_meta::key::FlowId;
20use common_meta::key::flow::FlowMetadataManager;
21use common_meta::key::flow::flow_info::FlowInfoValue;
22use common_meta::key::flow::flow_state::FlowStat;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datatypes::prelude::ConcreteDataType as CDT;
29use datatypes::scalars::ScalarVectorBuilder;
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::timestamp::TimestampMillisecond;
32use datatypes::value::Value;
33use datatypes::vectors::{
34    Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
35    UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
36};
37use futures::TryStreamExt;
38use snafu::{OptionExt, ResultExt};
39use sql::ast::Ident;
40use sql::dialect::GreptimeDbDialect;
41use sql::parser::ParserContext;
42use sql::statements::create::{CreateFlow, SqlOrTql};
43use sql::statements::statement::Statement;
44use store_api::storage::{ScanRequest, TableId};
45
46use crate::CatalogManager;
47use crate::error::{
48    CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
49    Result, UpgradeWeakCatalogManagerRefSnafu,
50};
51use crate::information_schema::{FLOWS, Predicates};
52use crate::system_schema::information_schema::InformationTable;
53use crate::system_schema::utils;
54
55const INIT_CAPACITY: usize = 42;
56
57// rows of information_schema.flows
58// pk is (flow_name, flow_id, table_catalog)
59pub const FLOW_NAME: &str = "flow_name";
60pub const FLOW_ID: &str = "flow_id";
61pub const STATE_SIZE: &str = "state_size";
62pub const TABLE_CATALOG: &str = "table_catalog";
63pub const FLOW_DEFINITION: &str = "flow_definition";
64pub const COMMENT: &str = "comment";
65pub const EXPIRE_AFTER: &str = "expire_after";
66pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
67pub const SINK_TABLE_NAME: &str = "sink_table_name";
68pub const FLOWNODE_IDS: &str = "flownode_ids";
69pub const OPTIONS: &str = "options";
70pub const CREATED_TIME: &str = "created_time";
71pub const UPDATED_TIME: &str = "updated_time";
72pub const LAST_EXECUTION_TIME: &str = "last_execution_time";
73pub const SOURCE_TABLE_NAMES: &str = "source_table_names";
74
75/// The `information_schema.flows` to provides information about flows in databases.
76#[derive(Debug)]
77pub(super) struct InformationSchemaFlows {
78    schema: SchemaRef,
79    catalog_name: String,
80    catalog_manager: Weak<dyn CatalogManager>,
81    flow_metadata_manager: Arc<FlowMetadataManager>,
82}
83
84impl InformationSchemaFlows {
85    pub(super) fn new(
86        catalog_name: String,
87        catalog_manager: Weak<dyn CatalogManager>,
88        flow_metadata_manager: Arc<FlowMetadataManager>,
89    ) -> Self {
90        Self {
91            schema: Self::schema(),
92            catalog_name,
93            catalog_manager,
94            flow_metadata_manager,
95        }
96    }
97
98    /// for complex fields(including [`SOURCE_TABLE_IDS`], [`FLOWNODE_IDS`] and [`OPTIONS`]), it will be serialized to json string for now
99    /// TODO(discord9): use a better way to store complex fields like json type
100    pub(crate) fn schema() -> SchemaRef {
101        Arc::new(Schema::new(
102            vec![
103                (FLOW_NAME, CDT::string_datatype(), false),
104                (FLOW_ID, CDT::uint32_datatype(), false),
105                (STATE_SIZE, CDT::uint64_datatype(), true),
106                (TABLE_CATALOG, CDT::string_datatype(), false),
107                (FLOW_DEFINITION, CDT::string_datatype(), false),
108                (COMMENT, CDT::string_datatype(), true),
109                (EXPIRE_AFTER, CDT::int64_datatype(), true),
110                (SOURCE_TABLE_IDS, CDT::string_datatype(), true),
111                (SINK_TABLE_NAME, CDT::string_datatype(), false),
112                (FLOWNODE_IDS, CDT::string_datatype(), true),
113                (OPTIONS, CDT::string_datatype(), true),
114                (CREATED_TIME, CDT::timestamp_millisecond_datatype(), false),
115                (UPDATED_TIME, CDT::timestamp_millisecond_datatype(), false),
116                (
117                    LAST_EXECUTION_TIME,
118                    CDT::timestamp_millisecond_datatype(),
119                    true,
120                ),
121                (SOURCE_TABLE_NAMES, CDT::string_datatype(), true),
122            ]
123            .into_iter()
124            .map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
125            .collect(),
126        ))
127    }
128
129    /// Generates the CREATE FLOW statement for the flow_definition column
130    pub(crate) fn generate_show_create_flow(flow_info: &FlowInfoValue) -> Result<String> {
131        let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, flow_info.raw_sql())
132            .map_err(BoxedError::new)
133            .context(InternalSnafu)?;
134
135        let query = parser_ctx
136            .parse_statement()
137            .map_err(BoxedError::new)
138            .context(InternalSnafu)?;
139
140        let raw_query = match &query {
141            Statement::Tql(_) => flow_info.raw_sql().clone(),
142            _ => query.to_string(),
143        };
144
145        let query = Box::new(
146            SqlOrTql::try_from_statement(query, &raw_query)
147                .map_err(BoxedError::new)
148                .context(InternalSnafu)?,
149        );
150
151        let comment = if flow_info.comment().is_empty() {
152            None
153        } else {
154            Some(flow_info.comment().clone())
155        };
156
157        let stmt = CreateFlow {
158            flow_name: sql::ast::ObjectName::from(vec![Ident::new(flow_info.flow_name())]),
159            sink_table_name: sql::ast::ObjectName::from(vec![
160                Ident::new(&flow_info.sink_table_name().schema_name),
161                Ident::new(&flow_info.sink_table_name().table_name),
162            ]),
163            or_replace: false,
164            if_not_exists: true,
165            expire_after: flow_info.expire_after(),
166            eval_interval: flow_info.eval_interval(),
167            comment,
168            query,
169        };
170
171        Ok(stmt.to_string())
172    }
173
174    fn builder(&self) -> InformationSchemaFlowsBuilder {
175        InformationSchemaFlowsBuilder::new(
176            self.schema.clone(),
177            self.catalog_name.clone(),
178            self.catalog_manager.clone(),
179            &self.flow_metadata_manager,
180        )
181    }
182}
183
184impl InformationTable for InformationSchemaFlows {
185    fn table_id(&self) -> TableId {
186        INFORMATION_SCHEMA_FLOW_TABLE_ID
187    }
188
189    fn table_name(&self) -> &'static str {
190        FLOWS
191    }
192
193    fn schema(&self) -> SchemaRef {
194        self.schema.clone()
195    }
196
197    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
198        let schema = self.schema.arrow_schema().clone();
199        let mut builder = self.builder();
200        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
201            schema,
202            futures::stream::once(async move {
203                builder
204                    .make_flows(Some(request))
205                    .await
206                    .map(|x| x.into_df_record_batch())
207                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
208            }),
209        ));
210        Ok(Box::pin(
211            RecordBatchStreamAdapter::try_new(stream)
212                .map_err(BoxedError::new)
213                .context(InternalSnafu)?,
214        ))
215    }
216}
217
218/// Builds the `information_schema.FLOWS` table row by row
219///
220/// columns are based on [`FlowInfoValue`]
221struct InformationSchemaFlowsBuilder {
222    schema: SchemaRef,
223    catalog_name: String,
224    catalog_manager: Weak<dyn CatalogManager>,
225    flow_metadata_manager: Arc<FlowMetadataManager>,
226
227    flow_names: StringVectorBuilder,
228    flow_ids: UInt32VectorBuilder,
229    state_sizes: UInt64VectorBuilder,
230    table_catalogs: StringVectorBuilder,
231    raw_sqls: StringVectorBuilder,
232    comments: StringVectorBuilder,
233    expire_afters: Int64VectorBuilder,
234    source_table_id_groups: StringVectorBuilder,
235    sink_table_names: StringVectorBuilder,
236    flownode_id_groups: StringVectorBuilder,
237    option_groups: StringVectorBuilder,
238    created_time: TimestampMillisecondVectorBuilder,
239    updated_time: TimestampMillisecondVectorBuilder,
240    last_execution_time: TimestampMillisecondVectorBuilder,
241    source_table_names: StringVectorBuilder,
242}
243
244impl InformationSchemaFlowsBuilder {
245    fn new(
246        schema: SchemaRef,
247        catalog_name: String,
248        catalog_manager: Weak<dyn CatalogManager>,
249        flow_metadata_manager: &Arc<FlowMetadataManager>,
250    ) -> Self {
251        Self {
252            schema,
253            catalog_name,
254            catalog_manager,
255            flow_metadata_manager: flow_metadata_manager.clone(),
256
257            flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
258            flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
259            state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
260            table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
261            raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
262            comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
263            expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
264            source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
265            sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
266            flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
267            option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
268            created_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
269            updated_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
270            last_execution_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
271            source_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
272        }
273    }
274
275    /// Construct the `information_schema.flows` virtual table
276    async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
277        let catalog_name = self.catalog_name.clone();
278        let predicates = Predicates::from_scan_request(&request);
279
280        let flow_info_manager = self.flow_metadata_manager.clone();
281
282        // TODO(discord9): use `AsyncIterator` once it's stable-ish
283        let mut stream = flow_info_manager
284            .flow_name_manager()
285            .flow_names(&catalog_name)
286            .await;
287
288        let flow_stat = {
289            let information_extension = utils::information_extension(&self.catalog_manager)?;
290            information_extension.flow_stats().await?
291        };
292
293        while let Some((flow_name, flow_id)) = stream
294            .try_next()
295            .await
296            .map_err(BoxedError::new)
297            .context(ListFlowsSnafu {
298                catalog: &catalog_name,
299            })?
300        {
301            let flow_info = flow_info_manager
302                .flow_info_manager()
303                .get(flow_id.flow_id())
304                .await
305                .map_err(BoxedError::new)
306                .context(InternalSnafu)?
307                .with_context(|| FlowInfoNotFoundSnafu {
308                    catalog_name: catalog_name.clone(),
309                    flow_name: flow_name.clone(),
310                })?;
311            self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
312                .await?;
313        }
314
315        self.finish()
316    }
317
318    async fn add_flow(
319        &mut self,
320        predicates: &Predicates,
321        flow_id: FlowId,
322        flow_info: FlowInfoValue,
323        flow_stat: &Option<FlowStat>,
324    ) -> Result<()> {
325        let row = [
326            (FLOW_NAME, &Value::from(flow_info.flow_name().clone())),
327            (FLOW_ID, &Value::from(flow_id)),
328            (
329                TABLE_CATALOG,
330                &Value::from(flow_info.catalog_name().clone()),
331            ),
332        ];
333        if !predicates.eval(&row) {
334            return Ok(());
335        }
336        self.flow_names.push(Some(flow_info.flow_name()));
337        self.flow_ids.push(Some(flow_id));
338        self.state_sizes.push(
339            flow_stat
340                .as_ref()
341                .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
342        );
343        self.table_catalogs.push(Some(flow_info.catalog_name()));
344        self.raw_sqls
345            .push(Some(&InformationSchemaFlows::generate_show_create_flow(
346                &flow_info,
347            )?));
348        self.comments.push(Some(flow_info.comment()));
349        self.expire_afters.push(flow_info.expire_after());
350        self.source_table_id_groups.push(Some(
351            &serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
352                input: format!("{:?}", flow_info.source_table_ids()),
353            })?,
354        ));
355        self.sink_table_names
356            .push(Some(&flow_info.sink_table_name().to_string()));
357        self.flownode_id_groups.push(Some(
358            &serde_json::to_string(flow_info.flownode_ids()).context({
359                JsonSnafu {
360                    input: format!("{:?}", flow_info.flownode_ids()),
361                }
362            })?,
363        ));
364        self.option_groups
365            .push(Some(&serde_json::to_string(flow_info.options()).context(
366                JsonSnafu {
367                    input: format!("{:?}", flow_info.options()),
368                },
369            )?));
370        self.created_time
371            .push(Some(flow_info.created_time().timestamp_millis().into()));
372        self.updated_time
373            .push(Some(flow_info.updated_time().timestamp_millis().into()));
374        self.last_execution_time
375            .push(flow_stat.as_ref().and_then(|state| {
376                state
377                    .last_exec_time_map
378                    .get(&flow_id)
379                    .map(|v| TimestampMillisecond::new(*v))
380            }));
381
382        let mut source_table_names = vec![];
383        let catalog_manager = self
384            .catalog_manager
385            .upgrade()
386            .context(UpgradeWeakCatalogManagerRefSnafu)?;
387        for table_id in flow_info.source_table_ids() {
388            if let Some(table_info) = catalog_manager.table_info_by_id(*table_id).await? {
389                source_table_names.push(table_info.full_table_name());
390            }
391        }
392
393        let source_table_names = source_table_names.join(",");
394        self.source_table_names.push(Some(&source_table_names));
395
396        Ok(())
397    }
398
399    fn finish(&mut self) -> Result<RecordBatch> {
400        let columns: Vec<VectorRef> = vec![
401            Arc::new(self.flow_names.finish()),
402            Arc::new(self.flow_ids.finish()),
403            Arc::new(self.state_sizes.finish()),
404            Arc::new(self.table_catalogs.finish()),
405            Arc::new(self.raw_sqls.finish()),
406            Arc::new(self.comments.finish()),
407            Arc::new(self.expire_afters.finish()),
408            Arc::new(self.source_table_id_groups.finish()),
409            Arc::new(self.sink_table_names.finish()),
410            Arc::new(self.flownode_id_groups.finish()),
411            Arc::new(self.option_groups.finish()),
412            Arc::new(self.created_time.finish()),
413            Arc::new(self.updated_time.finish()),
414            Arc::new(self.last_execution_time.finish()),
415            Arc::new(self.source_table_names.finish()),
416        ];
417        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
418    }
419}
420
421impl DfPartitionStream for InformationSchemaFlows {
422    fn schema(&self) -> &arrow_schema::SchemaRef {
423        self.schema.arrow_schema()
424    }
425
426    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
427        let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
428        let mut builder = self.builder();
429        Box::pin(DfRecordBatchStreamAdapter::new(
430            schema,
431            futures::stream::once(async move {
432                builder
433                    .make_flows(None)
434                    .await
435                    .map(|x| x.into_df_record_batch())
436                    .map_err(Into::into)
437            }),
438        ))
439    }
440}