catalog/system_schema/information_schema/
procedure_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_procedure::ProcedureInfo;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use common_time::timestamp::Timestamp;
24use datafusion::execution::TaskContext;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
26use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
27use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
28use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use datatypes::timestamp::TimestampMillisecond;
31use datatypes::value::Value;
32use datatypes::vectors::{StringVectorBuilder, TimestampMillisecondVectorBuilder};
33use snafu::ResultExt;
34use store_api::storage::{ScanRequest, TableId};
35
36use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
37use crate::system_schema::information_schema::{InformationTable, Predicates, PROCEDURE_INFO};
38use crate::system_schema::utils;
39use crate::CatalogManager;
40
41const PROCEDURE_ID: &str = "procedure_id";
42const PROCEDURE_TYPE: &str = "procedure_type";
43const START_TIME: &str = "start_time";
44const END_TIME: &str = "end_time";
45const STATUS: &str = "status";
46const LOCK_KEYS: &str = "lock_keys";
47
48const INIT_CAPACITY: usize = 42;
49
50/// The `PROCEDURE_INFO` table provides information about the current procedure information of the cluster.
51///
52/// - `procedure_id`: the unique identifier of the procedure.
53/// - `procedure_name`: the name of the procedure.
54/// - `start_time`: the starting execution time of the procedure.
55/// - `end_time`: the ending execution time of the procedure.
56/// - `status`: the status of the procedure.
57/// - `lock_keys`: the lock keys of the procedure.
58#[derive(Debug)]
59pub(super) struct InformationSchemaProcedureInfo {
60    schema: SchemaRef,
61    catalog_manager: Weak<dyn CatalogManager>,
62}
63
64impl InformationSchemaProcedureInfo {
65    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
66        Self {
67            schema: Self::schema(),
68            catalog_manager,
69        }
70    }
71
72    pub(crate) fn schema() -> SchemaRef {
73        Arc::new(Schema::new(vec![
74            ColumnSchema::new(PROCEDURE_ID, ConcreteDataType::string_datatype(), false),
75            ColumnSchema::new(PROCEDURE_TYPE, ConcreteDataType::string_datatype(), false),
76            ColumnSchema::new(
77                START_TIME,
78                ConcreteDataType::timestamp_millisecond_datatype(),
79                true,
80            ),
81            ColumnSchema::new(
82                END_TIME,
83                ConcreteDataType::timestamp_millisecond_datatype(),
84                true,
85            ),
86            ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), false),
87            ColumnSchema::new(LOCK_KEYS, ConcreteDataType::string_datatype(), true),
88        ]))
89    }
90
91    fn builder(&self) -> InformationSchemaProcedureInfoBuilder {
92        InformationSchemaProcedureInfoBuilder::new(
93            self.schema.clone(),
94            self.catalog_manager.clone(),
95        )
96    }
97}
98
99impl InformationTable for InformationSchemaProcedureInfo {
100    fn table_id(&self) -> TableId {
101        INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID
102    }
103
104    fn table_name(&self) -> &'static str {
105        PROCEDURE_INFO
106    }
107
108    fn schema(&self) -> SchemaRef {
109        self.schema.clone()
110    }
111
112    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
113        let schema = self.schema.arrow_schema().clone();
114        let mut builder = self.builder();
115        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
116            schema,
117            futures::stream::once(async move {
118                builder
119                    .make_procedure_info(Some(request))
120                    .await
121                    .map(|x| x.into_df_record_batch())
122                    .map_err(Into::into)
123            }),
124        ));
125        Ok(Box::pin(
126            RecordBatchStreamAdapter::try_new(stream)
127                .map_err(BoxedError::new)
128                .context(InternalSnafu)?,
129        ))
130    }
131}
132
133struct InformationSchemaProcedureInfoBuilder {
134    schema: SchemaRef,
135    catalog_manager: Weak<dyn CatalogManager>,
136
137    procedure_ids: StringVectorBuilder,
138    procedure_types: StringVectorBuilder,
139    start_times: TimestampMillisecondVectorBuilder,
140    end_times: TimestampMillisecondVectorBuilder,
141    statuses: StringVectorBuilder,
142    lock_keys: StringVectorBuilder,
143}
144
145impl InformationSchemaProcedureInfoBuilder {
146    fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
147        Self {
148            schema,
149            catalog_manager,
150            procedure_ids: StringVectorBuilder::with_capacity(INIT_CAPACITY),
151            procedure_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
152            start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
153            end_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
154            statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY),
155            lock_keys: StringVectorBuilder::with_capacity(INIT_CAPACITY),
156        }
157    }
158
159    /// Construct the `information_schema.procedure_info` virtual table
160    async fn make_procedure_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
161        let predicates = Predicates::from_scan_request(&request);
162        let information_extension = utils::information_extension(&self.catalog_manager)?;
163        let procedures = information_extension.procedures().await?;
164        for (status, procedure_info) in procedures {
165            self.add_procedure(&predicates, status, procedure_info);
166        }
167        self.finish()
168    }
169
170    fn add_procedure(
171        &mut self,
172        predicates: &Predicates,
173        status: String,
174        procedure_info: ProcedureInfo,
175    ) {
176        let ProcedureInfo {
177            id,
178            type_name,
179            start_time_ms,
180            end_time_ms,
181            lock_keys,
182            ..
183        } = procedure_info;
184        let pid = id.to_string();
185        let start_time = TimestampMillisecond(Timestamp::new_millisecond(start_time_ms));
186        let end_time = TimestampMillisecond(Timestamp::new_millisecond(end_time_ms));
187        let lock_keys = lock_keys.join(",");
188
189        let row = [
190            (PROCEDURE_ID, &Value::from(pid.clone())),
191            (PROCEDURE_TYPE, &Value::from(type_name.clone())),
192            (START_TIME, &Value::from(start_time)),
193            (END_TIME, &Value::from(end_time)),
194            (STATUS, &Value::from(status.clone())),
195            (LOCK_KEYS, &Value::from(lock_keys.clone())),
196        ];
197        if !predicates.eval(&row) {
198            return;
199        }
200        self.procedure_ids.push(Some(&pid));
201        self.procedure_types.push(Some(&type_name));
202        self.start_times.push(Some(start_time));
203        self.end_times.push(Some(end_time));
204        self.statuses.push(Some(&status));
205        self.lock_keys.push(Some(&lock_keys));
206    }
207
208    fn finish(&mut self) -> Result<RecordBatch> {
209        let columns: Vec<VectorRef> = vec![
210            Arc::new(self.procedure_ids.finish()),
211            Arc::new(self.procedure_types.finish()),
212            Arc::new(self.start_times.finish()),
213            Arc::new(self.end_times.finish()),
214            Arc::new(self.statuses.finish()),
215            Arc::new(self.lock_keys.finish()),
216        ];
217        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
218    }
219}
220
221impl DfPartitionStream for InformationSchemaProcedureInfo {
222    fn schema(&self) -> &ArrowSchemaRef {
223        self.schema.arrow_schema()
224    }
225
226    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
227        let schema = self.schema.arrow_schema().clone();
228        let mut builder = self.builder();
229        Box::pin(DfRecordBatchStreamAdapter::new(
230            schema,
231            futures::stream::once(async move {
232                builder
233                    .make_procedure_info(None)
234                    .await
235                    .map(|x| x.into_df_record_batch())
236                    .map_err(Into::into)
237            }),
238        ))
239    }
240}