catalog/system_schema/information_schema/
flows.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_meta::key::flow::flow_info::FlowInfoValue;
20use common_meta::key::flow::flow_state::FlowStat;
21use common_meta::key::flow::FlowMetadataManager;
22use common_meta::key::FlowId;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datatypes::prelude::ConcreteDataType as CDT;
29use datatypes::scalars::ScalarVectorBuilder;
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::timestamp::TimestampMillisecond;
32use datatypes::value::Value;
33use datatypes::vectors::{
34    Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
35    UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
36};
37use futures::TryStreamExt;
38use snafu::{OptionExt, ResultExt};
39use store_api::storage::{ScanRequest, TableId};
40
41use crate::error::{
42    CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
43    Result, UpgradeWeakCatalogManagerRefSnafu,
44};
45use crate::information_schema::{Predicates, FLOWS};
46use crate::system_schema::information_schema::InformationTable;
47use crate::system_schema::utils;
48use crate::CatalogManager;
49
50const INIT_CAPACITY: usize = 42;
51
52// rows of information_schema.flows
53// pk is (flow_name, flow_id, table_catalog)
54pub const FLOW_NAME: &str = "flow_name";
55pub const FLOW_ID: &str = "flow_id";
56pub const STATE_SIZE: &str = "state_size";
57pub const TABLE_CATALOG: &str = "table_catalog";
58pub const FLOW_DEFINITION: &str = "flow_definition";
59pub const COMMENT: &str = "comment";
60pub const EXPIRE_AFTER: &str = "expire_after";
61pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
62pub const SINK_TABLE_NAME: &str = "sink_table_name";
63pub const FLOWNODE_IDS: &str = "flownode_ids";
64pub const OPTIONS: &str = "options";
65pub const CREATED_TIME: &str = "created_time";
66pub const UPDATED_TIME: &str = "updated_time";
67pub const LAST_EXECUTION_TIME: &str = "last_execution_time";
68pub const SOURCE_TABLE_NAMES: &str = "source_table_names";
69
70/// The `information_schema.flows` to provides information about flows in databases.
71#[derive(Debug)]
72pub(super) struct InformationSchemaFlows {
73    schema: SchemaRef,
74    catalog_name: String,
75    catalog_manager: Weak<dyn CatalogManager>,
76    flow_metadata_manager: Arc<FlowMetadataManager>,
77}
78
79impl InformationSchemaFlows {
80    pub(super) fn new(
81        catalog_name: String,
82        catalog_manager: Weak<dyn CatalogManager>,
83        flow_metadata_manager: Arc<FlowMetadataManager>,
84    ) -> Self {
85        Self {
86            schema: Self::schema(),
87            catalog_name,
88            catalog_manager,
89            flow_metadata_manager,
90        }
91    }
92
93    /// for complex fields(including [`SOURCE_TABLE_IDS`], [`FLOWNODE_IDS`] and [`OPTIONS`]), it will be serialized to json string for now
94    /// TODO(discord9): use a better way to store complex fields like json type
95    pub(crate) fn schema() -> SchemaRef {
96        Arc::new(Schema::new(
97            vec![
98                (FLOW_NAME, CDT::string_datatype(), false),
99                (FLOW_ID, CDT::uint32_datatype(), false),
100                (STATE_SIZE, CDT::uint64_datatype(), true),
101                (TABLE_CATALOG, CDT::string_datatype(), false),
102                (FLOW_DEFINITION, CDT::string_datatype(), false),
103                (COMMENT, CDT::string_datatype(), true),
104                (EXPIRE_AFTER, CDT::int64_datatype(), true),
105                (SOURCE_TABLE_IDS, CDT::string_datatype(), true),
106                (SINK_TABLE_NAME, CDT::string_datatype(), false),
107                (FLOWNODE_IDS, CDT::string_datatype(), true),
108                (OPTIONS, CDT::string_datatype(), true),
109                (CREATED_TIME, CDT::timestamp_millisecond_datatype(), false),
110                (UPDATED_TIME, CDT::timestamp_millisecond_datatype(), false),
111                (
112                    LAST_EXECUTION_TIME,
113                    CDT::timestamp_millisecond_datatype(),
114                    true,
115                ),
116                (SOURCE_TABLE_NAMES, CDT::string_datatype(), true),
117            ]
118            .into_iter()
119            .map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
120            .collect(),
121        ))
122    }
123
124    fn builder(&self) -> InformationSchemaFlowsBuilder {
125        InformationSchemaFlowsBuilder::new(
126            self.schema.clone(),
127            self.catalog_name.clone(),
128            self.catalog_manager.clone(),
129            &self.flow_metadata_manager,
130        )
131    }
132}
133
134impl InformationTable for InformationSchemaFlows {
135    fn table_id(&self) -> TableId {
136        INFORMATION_SCHEMA_FLOW_TABLE_ID
137    }
138
139    fn table_name(&self) -> &'static str {
140        FLOWS
141    }
142
143    fn schema(&self) -> SchemaRef {
144        self.schema.clone()
145    }
146
147    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
148        let schema = self.schema.arrow_schema().clone();
149        let mut builder = self.builder();
150        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
151            schema,
152            futures::stream::once(async move {
153                builder
154                    .make_flows(Some(request))
155                    .await
156                    .map(|x| x.into_df_record_batch())
157                    .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
158            }),
159        ));
160        Ok(Box::pin(
161            RecordBatchStreamAdapter::try_new(stream)
162                .map_err(BoxedError::new)
163                .context(InternalSnafu)?,
164        ))
165    }
166}
167
168/// Builds the `information_schema.FLOWS` table row by row
169///
170/// columns are based on [`FlowInfoValue`]
171struct InformationSchemaFlowsBuilder {
172    schema: SchemaRef,
173    catalog_name: String,
174    catalog_manager: Weak<dyn CatalogManager>,
175    flow_metadata_manager: Arc<FlowMetadataManager>,
176
177    flow_names: StringVectorBuilder,
178    flow_ids: UInt32VectorBuilder,
179    state_sizes: UInt64VectorBuilder,
180    table_catalogs: StringVectorBuilder,
181    raw_sqls: StringVectorBuilder,
182    comments: StringVectorBuilder,
183    expire_afters: Int64VectorBuilder,
184    source_table_id_groups: StringVectorBuilder,
185    sink_table_names: StringVectorBuilder,
186    flownode_id_groups: StringVectorBuilder,
187    option_groups: StringVectorBuilder,
188    created_time: TimestampMillisecondVectorBuilder,
189    updated_time: TimestampMillisecondVectorBuilder,
190    last_execution_time: TimestampMillisecondVectorBuilder,
191    source_table_names: StringVectorBuilder,
192}
193
194impl InformationSchemaFlowsBuilder {
195    fn new(
196        schema: SchemaRef,
197        catalog_name: String,
198        catalog_manager: Weak<dyn CatalogManager>,
199        flow_metadata_manager: &Arc<FlowMetadataManager>,
200    ) -> Self {
201        Self {
202            schema,
203            catalog_name,
204            catalog_manager,
205            flow_metadata_manager: flow_metadata_manager.clone(),
206
207            flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
208            flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
209            state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
210            table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
211            raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
212            comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
213            expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
214            source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
215            sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
216            flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
217            option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
218            created_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
219            updated_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
220            last_execution_time: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
221            source_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
222        }
223    }
224
225    /// Construct the `information_schema.flows` virtual table
226    async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
227        let catalog_name = self.catalog_name.clone();
228        let predicates = Predicates::from_scan_request(&request);
229
230        let flow_info_manager = self.flow_metadata_manager.clone();
231
232        // TODO(discord9): use `AsyncIterator` once it's stable-ish
233        let mut stream = flow_info_manager
234            .flow_name_manager()
235            .flow_names(&catalog_name)
236            .await;
237
238        let flow_stat = {
239            let information_extension = utils::information_extension(&self.catalog_manager)?;
240            information_extension.flow_stats().await?
241        };
242
243        while let Some((flow_name, flow_id)) = stream
244            .try_next()
245            .await
246            .map_err(BoxedError::new)
247            .context(ListFlowsSnafu {
248                catalog: &catalog_name,
249            })?
250        {
251            let flow_info = flow_info_manager
252                .flow_info_manager()
253                .get(flow_id.flow_id())
254                .await
255                .map_err(BoxedError::new)
256                .context(InternalSnafu)?
257                .context(FlowInfoNotFoundSnafu {
258                    catalog_name: catalog_name.to_string(),
259                    flow_name: flow_name.to_string(),
260                })?;
261            self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
262                .await?;
263        }
264
265        self.finish()
266    }
267
268    async fn add_flow(
269        &mut self,
270        predicates: &Predicates,
271        flow_id: FlowId,
272        flow_info: FlowInfoValue,
273        flow_stat: &Option<FlowStat>,
274    ) -> Result<()> {
275        let row = [
276            (FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
277            (FLOW_ID, &Value::from(flow_id)),
278            (
279                TABLE_CATALOG,
280                &Value::from(flow_info.catalog_name().to_string()),
281            ),
282        ];
283        if !predicates.eval(&row) {
284            return Ok(());
285        }
286        self.flow_names.push(Some(flow_info.flow_name()));
287        self.flow_ids.push(Some(flow_id));
288        self.state_sizes.push(
289            flow_stat
290                .as_ref()
291                .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
292        );
293        self.table_catalogs.push(Some(flow_info.catalog_name()));
294        self.raw_sqls.push(Some(flow_info.raw_sql()));
295        self.comments.push(Some(flow_info.comment()));
296        self.expire_afters.push(flow_info.expire_after());
297        self.source_table_id_groups.push(Some(
298            &serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
299                input: format!("{:?}", flow_info.source_table_ids()),
300            })?,
301        ));
302        self.sink_table_names
303            .push(Some(&flow_info.sink_table_name().to_string()));
304        self.flownode_id_groups.push(Some(
305            &serde_json::to_string(flow_info.flownode_ids()).context({
306                JsonSnafu {
307                    input: format!("{:?}", flow_info.flownode_ids()),
308                }
309            })?,
310        ));
311        self.option_groups
312            .push(Some(&serde_json::to_string(flow_info.options()).context(
313                JsonSnafu {
314                    input: format!("{:?}", flow_info.options()),
315                },
316            )?));
317        self.created_time
318            .push(Some(flow_info.created_time().timestamp_millis().into()));
319        self.updated_time
320            .push(Some(flow_info.updated_time().timestamp_millis().into()));
321        self.last_execution_time
322            .push(flow_stat.as_ref().and_then(|state| {
323                state
324                    .last_exec_time_map
325                    .get(&flow_id)
326                    .map(|v| TimestampMillisecond::new(*v))
327            }));
328
329        let mut source_table_names = vec![];
330        let catalog_name = self.catalog_name.clone();
331        let catalog_manager = self
332            .catalog_manager
333            .upgrade()
334            .context(UpgradeWeakCatalogManagerRefSnafu)?;
335        for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
336            source_table_names.extend(
337                catalog_manager
338                    .tables_by_ids(&catalog_name, &schema_name, flow_info.source_table_ids())
339                    .await?
340                    .into_iter()
341                    .map(|table| table.table_info().full_table_name()),
342            );
343        }
344
345        let source_table_names = source_table_names.join(",");
346        self.source_table_names.push(Some(&source_table_names));
347
348        Ok(())
349    }
350
351    fn finish(&mut self) -> Result<RecordBatch> {
352        let columns: Vec<VectorRef> = vec![
353            Arc::new(self.flow_names.finish()),
354            Arc::new(self.flow_ids.finish()),
355            Arc::new(self.state_sizes.finish()),
356            Arc::new(self.table_catalogs.finish()),
357            Arc::new(self.raw_sqls.finish()),
358            Arc::new(self.comments.finish()),
359            Arc::new(self.expire_afters.finish()),
360            Arc::new(self.source_table_id_groups.finish()),
361            Arc::new(self.sink_table_names.finish()),
362            Arc::new(self.flownode_id_groups.finish()),
363            Arc::new(self.option_groups.finish()),
364            Arc::new(self.created_time.finish()),
365            Arc::new(self.updated_time.finish()),
366            Arc::new(self.last_execution_time.finish()),
367            Arc::new(self.source_table_names.finish()),
368        ];
369        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
370    }
371}
372
373impl DfPartitionStream for InformationSchemaFlows {
374    fn schema(&self) -> &arrow_schema::SchemaRef {
375        self.schema.arrow_schema()
376    }
377
378    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
379        let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
380        let mut builder = self.builder();
381        Box::pin(DfRecordBatchStreamAdapter::new(
382            schema,
383            futures::stream::once(async move {
384                builder
385                    .make_flows(None)
386                    .await
387                    .map(|x| x.into_df_record_batch())
388                    .map_err(Into::into)
389            }),
390        ))
391    }
392}