catalog/system_schema/information_schema/
procedure_info.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_procedure::ProcedureInfo;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
23use common_time::timestamp::Timestamp;
24use datafusion::execution::TaskContext;
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
26use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
27use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
28use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use datatypes::timestamp::TimestampMillisecond;
31use datatypes::value::Value;
32use datatypes::vectors::{StringVectorBuilder, TimestampMillisecondVectorBuilder};
33use snafu::ResultExt;
34use store_api::storage::{ScanRequest, TableId};
35
36use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
37use crate::system_schema::information_schema::{InformationTable, Predicates, PROCEDURE_INFO};
38use crate::system_schema::utils;
39use crate::CatalogManager;
40
41const PROCEDURE_ID: &str = "procedure_id";
42const PROCEDURE_TYPE: &str = "procedure_type";
43const START_TIME: &str = "start_time";
44const END_TIME: &str = "end_time";
45const STATUS: &str = "status";
46const LOCK_KEYS: &str = "lock_keys";
47
48const INIT_CAPACITY: usize = 42;
49
50#[derive(Debug)]
59pub(super) struct InformationSchemaProcedureInfo {
60 schema: SchemaRef,
61 catalog_manager: Weak<dyn CatalogManager>,
62}
63
64impl InformationSchemaProcedureInfo {
65 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
66 Self {
67 schema: Self::schema(),
68 catalog_manager,
69 }
70 }
71
72 pub(crate) fn schema() -> SchemaRef {
73 Arc::new(Schema::new(vec![
74 ColumnSchema::new(PROCEDURE_ID, ConcreteDataType::string_datatype(), false),
75 ColumnSchema::new(PROCEDURE_TYPE, ConcreteDataType::string_datatype(), false),
76 ColumnSchema::new(
77 START_TIME,
78 ConcreteDataType::timestamp_millisecond_datatype(),
79 true,
80 ),
81 ColumnSchema::new(
82 END_TIME,
83 ConcreteDataType::timestamp_millisecond_datatype(),
84 true,
85 ),
86 ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), false),
87 ColumnSchema::new(LOCK_KEYS, ConcreteDataType::string_datatype(), true),
88 ]))
89 }
90
91 fn builder(&self) -> InformationSchemaProcedureInfoBuilder {
92 InformationSchemaProcedureInfoBuilder::new(
93 self.schema.clone(),
94 self.catalog_manager.clone(),
95 )
96 }
97}
98
99impl InformationTable for InformationSchemaProcedureInfo {
100 fn table_id(&self) -> TableId {
101 INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID
102 }
103
104 fn table_name(&self) -> &'static str {
105 PROCEDURE_INFO
106 }
107
108 fn schema(&self) -> SchemaRef {
109 self.schema.clone()
110 }
111
112 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
113 let schema = self.schema.arrow_schema().clone();
114 let mut builder = self.builder();
115 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
116 schema,
117 futures::stream::once(async move {
118 builder
119 .make_procedure_info(Some(request))
120 .await
121 .map(|x| x.into_df_record_batch())
122 .map_err(Into::into)
123 }),
124 ));
125 Ok(Box::pin(
126 RecordBatchStreamAdapter::try_new(stream)
127 .map_err(BoxedError::new)
128 .context(InternalSnafu)?,
129 ))
130 }
131}
132
133struct InformationSchemaProcedureInfoBuilder {
134 schema: SchemaRef,
135 catalog_manager: Weak<dyn CatalogManager>,
136
137 procedure_ids: StringVectorBuilder,
138 procedure_types: StringVectorBuilder,
139 start_times: TimestampMillisecondVectorBuilder,
140 end_times: TimestampMillisecondVectorBuilder,
141 statuses: StringVectorBuilder,
142 lock_keys: StringVectorBuilder,
143}
144
145impl InformationSchemaProcedureInfoBuilder {
146 fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
147 Self {
148 schema,
149 catalog_manager,
150 procedure_ids: StringVectorBuilder::with_capacity(INIT_CAPACITY),
151 procedure_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
152 start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
153 end_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
154 statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY),
155 lock_keys: StringVectorBuilder::with_capacity(INIT_CAPACITY),
156 }
157 }
158
159 async fn make_procedure_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
161 let predicates = Predicates::from_scan_request(&request);
162 let information_extension = utils::information_extension(&self.catalog_manager)?;
163 let procedures = information_extension.procedures().await?;
164 for (status, procedure_info) in procedures {
165 self.add_procedure(&predicates, status, procedure_info);
166 }
167 self.finish()
168 }
169
170 fn add_procedure(
171 &mut self,
172 predicates: &Predicates,
173 status: String,
174 procedure_info: ProcedureInfo,
175 ) {
176 let ProcedureInfo {
177 id,
178 type_name,
179 start_time_ms,
180 end_time_ms,
181 lock_keys,
182 ..
183 } = procedure_info;
184 let pid = id.to_string();
185 let start_time = TimestampMillisecond(Timestamp::new_millisecond(start_time_ms));
186 let end_time = TimestampMillisecond(Timestamp::new_millisecond(end_time_ms));
187 let lock_keys = lock_keys.join(",");
188
189 let row = [
190 (PROCEDURE_ID, &Value::from(pid.clone())),
191 (PROCEDURE_TYPE, &Value::from(type_name.clone())),
192 (START_TIME, &Value::from(start_time)),
193 (END_TIME, &Value::from(end_time)),
194 (STATUS, &Value::from(status.clone())),
195 (LOCK_KEYS, &Value::from(lock_keys.clone())),
196 ];
197 if !predicates.eval(&row) {
198 return;
199 }
200 self.procedure_ids.push(Some(&pid));
201 self.procedure_types.push(Some(&type_name));
202 self.start_times.push(Some(start_time));
203 self.end_times.push(Some(end_time));
204 self.statuses.push(Some(&status));
205 self.lock_keys.push(Some(&lock_keys));
206 }
207
208 fn finish(&mut self) -> Result<RecordBatch> {
209 let columns: Vec<VectorRef> = vec![
210 Arc::new(self.procedure_ids.finish()),
211 Arc::new(self.procedure_types.finish()),
212 Arc::new(self.start_times.finish()),
213 Arc::new(self.end_times.finish()),
214 Arc::new(self.statuses.finish()),
215 Arc::new(self.lock_keys.finish()),
216 ];
217 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
218 }
219}
220
221impl DfPartitionStream for InformationSchemaProcedureInfo {
222 fn schema(&self) -> &ArrowSchemaRef {
223 self.schema.arrow_schema()
224 }
225
226 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
227 let schema = self.schema.arrow_schema().clone();
228 let mut builder = self.builder();
229 Box::pin(DfRecordBatchStreamAdapter::new(
230 schema,
231 futures::stream::once(async move {
232 builder
233 .make_procedure_info(None)
234 .await
235 .map(|x| x.into_df_record_batch())
236 .map_err(Into::into)
237 }),
238 ))
239 }
240}