catalog/system_schema/information_schema/
process_list.rs1use std::sync::Arc;
16
17use common_catalog::consts::INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_frontend::DisplayProcessId;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use common_time::util::current_time_millis;
23use common_time::{Duration, Timestamp};
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datatypes::prelude::ConcreteDataType as CDT;
26use datatypes::scalars::ScalarVectorBuilder;
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{
30 DurationMillisecondVectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
31 VectorRef,
32};
33use snafu::ResultExt;
34use store_api::storage::{ScanRequest, TableId};
35
36use crate::error::{self, InternalSnafu};
37use crate::information_schema::Predicates;
38use crate::process_manager::ProcessManagerRef;
39use crate::system_schema::information_schema::InformationTable;
40
41pub const ID: &str = "id";
43pub const CATALOG: &str = "catalog";
44pub const SCHEMAS: &str = "schemas";
45pub const QUERY: &str = "query";
46pub const CLIENT: &str = "client";
47pub const FRONTEND: &str = "frontend";
48pub const START_TIMESTAMP: &str = "start_timestamp";
49pub const ELAPSED_TIME: &str = "elapsed_time";
50
51pub struct InformationSchemaProcessList {
54 schema: SchemaRef,
55 process_manager: ProcessManagerRef,
56}
57
58impl InformationSchemaProcessList {
59 pub fn new(process_manager: ProcessManagerRef) -> Self {
60 Self {
61 schema: Self::schema(),
62 process_manager,
63 }
64 }
65
66 fn schema() -> SchemaRef {
67 Arc::new(Schema::new(vec![
68 ColumnSchema::new(ID, CDT::string_datatype(), false),
69 ColumnSchema::new(CATALOG, CDT::string_datatype(), false),
70 ColumnSchema::new(SCHEMAS, CDT::string_datatype(), false),
71 ColumnSchema::new(QUERY, CDT::string_datatype(), false),
72 ColumnSchema::new(CLIENT, CDT::string_datatype(), false),
73 ColumnSchema::new(FRONTEND, CDT::string_datatype(), false),
74 ColumnSchema::new(
75 START_TIMESTAMP,
76 CDT::timestamp_millisecond_datatype(),
77 false,
78 ),
79 ColumnSchema::new(ELAPSED_TIME, CDT::duration_millisecond_datatype(), false),
80 ]))
81 }
82}
83
84impl InformationTable for InformationSchemaProcessList {
85 fn table_id(&self) -> TableId {
86 INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID
87 }
88
89 fn table_name(&self) -> &'static str {
90 "process_list"
91 }
92
93 fn schema(&self) -> SchemaRef {
94 self.schema.clone()
95 }
96
97 fn to_stream(&self, request: ScanRequest) -> error::Result<SendableRecordBatchStream> {
98 let process_manager = self.process_manager.clone();
99 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
100 self.schema.arrow_schema().clone(),
101 futures::stream::once(async move {
102 make_process_list(process_manager, request)
103 .await
104 .map(RecordBatch::into_df_record_batch)
105 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
106 }),
107 ));
108
109 Ok(Box::pin(
110 RecordBatchStreamAdapter::try_new(stream)
111 .map_err(BoxedError::new)
112 .context(InternalSnafu)?,
113 ))
114 }
115}
116
117async fn make_process_list(
119 process_manager: ProcessManagerRef,
120 request: ScanRequest,
121) -> error::Result<RecordBatch> {
122 let predicates = Predicates::from_scan_request(&Some(request));
123 let current_time = current_time_millis();
124 let queries = process_manager.list_all_processes(None).await?;
126
127 let mut id_builder = StringVectorBuilder::with_capacity(queries.len());
128 let mut catalog_builder = StringVectorBuilder::with_capacity(queries.len());
129 let mut schemas_builder = StringVectorBuilder::with_capacity(queries.len());
130 let mut query_builder = StringVectorBuilder::with_capacity(queries.len());
131 let mut client_builder = StringVectorBuilder::with_capacity(queries.len());
132 let mut frontend_builder = StringVectorBuilder::with_capacity(queries.len());
133 let mut start_time_builder = TimestampMillisecondVectorBuilder::with_capacity(queries.len());
134 let mut elapsed_time_builder = DurationMillisecondVectorBuilder::with_capacity(queries.len());
135
136 for process in queries {
137 let display_id = DisplayProcessId {
138 server_addr: process.frontend.to_string(),
139 id: process.id,
140 }
141 .to_string();
142 let schemas = process.schemas.join(",");
143 let id = Value::from(display_id);
144 let catalog = Value::from(process.catalog);
145 let schemas = Value::from(schemas);
146 let query = Value::from(process.query);
147 let client = Value::from(process.client);
148 let frontend = Value::from(process.frontend);
149 let start_timestamp = Value::from(Timestamp::new_millisecond(process.start_timestamp));
150 let elapsed_time = Value::from(Duration::new_millisecond(
151 current_time - process.start_timestamp,
152 ));
153 let row = [
154 (ID, &id),
155 (CATALOG, &catalog),
156 (SCHEMAS, &schemas),
157 (QUERY, &query),
158 (CLIENT, &client),
159 (FRONTEND, &frontend),
160 (START_TIMESTAMP, &start_timestamp),
161 (ELAPSED_TIME, &elapsed_time),
162 ];
163 if predicates.eval(&row) {
164 id_builder.push(id.as_string().as_deref());
165 catalog_builder.push(catalog.as_string().as_deref());
166 schemas_builder.push(schemas.as_string().as_deref());
167 query_builder.push(query.as_string().as_deref());
168 client_builder.push(client.as_string().as_deref());
169 frontend_builder.push(frontend.as_string().as_deref());
170 start_time_builder.push(start_timestamp.as_timestamp().map(|t| t.value().into()));
171 elapsed_time_builder.push(elapsed_time.as_duration().map(|d| d.value().into()));
172 }
173 }
174
175 RecordBatch::new(
176 InformationSchemaProcessList::schema(),
177 vec![
178 Arc::new(id_builder.finish()) as VectorRef,
179 Arc::new(catalog_builder.finish()) as VectorRef,
180 Arc::new(schemas_builder.finish()) as VectorRef,
181 Arc::new(query_builder.finish()) as VectorRef,
182 Arc::new(client_builder.finish()) as VectorRef,
183 Arc::new(frontend_builder.finish()) as VectorRef,
184 Arc::new(start_time_builder.finish()) as VectorRef,
185 Arc::new(elapsed_time_builder.finish()) as VectorRef,
186 ],
187 )
188 .context(error::CreateRecordBatchSnafu)
189}