catalog/system_schema/information_schema/
process_list.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_catalog::consts::INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID;
18use common_error::ext::BoxedError;
19use common_frontend::DisplayProcessId;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use common_time::util::current_time_millis;
23use common_time::{Duration, Timestamp};
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datatypes::prelude::ConcreteDataType as CDT;
26use datatypes::scalars::ScalarVectorBuilder;
27use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{
30    DurationMillisecondVectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
31    VectorRef,
32};
33use snafu::ResultExt;
34use store_api::storage::{ScanRequest, TableId};
35
36use crate::error::{self, InternalSnafu};
37use crate::information_schema::Predicates;
38use crate::process_manager::ProcessManagerRef;
39use crate::system_schema::information_schema::InformationTable;
40
41/// Column names of `information_schema.process_list`
42pub const ID: &str = "id";
43pub const CATALOG: &str = "catalog";
44pub const SCHEMAS: &str = "schemas";
45pub const QUERY: &str = "query";
46pub const CLIENT: &str = "client";
47pub const FRONTEND: &str = "frontend";
48pub const START_TIMESTAMP: &str = "start_timestamp";
49pub const ELAPSED_TIME: &str = "elapsed_time";
50
51/// `information_schema.process_list` table implementation that tracks running
52/// queries in current cluster.
53pub struct InformationSchemaProcessList {
54    schema: SchemaRef,
55    process_manager: ProcessManagerRef,
56}
57
58impl InformationSchemaProcessList {
59    pub fn new(process_manager: ProcessManagerRef) -> Self {
60        Self {
61            schema: Self::schema(),
62            process_manager,
63        }
64    }
65
66    fn schema() -> SchemaRef {
67        Arc::new(Schema::new(vec![
68            ColumnSchema::new(ID, CDT::string_datatype(), false),
69            ColumnSchema::new(CATALOG, CDT::string_datatype(), false),
70            ColumnSchema::new(SCHEMAS, CDT::string_datatype(), false),
71            ColumnSchema::new(QUERY, CDT::string_datatype(), false),
72            ColumnSchema::new(CLIENT, CDT::string_datatype(), false),
73            ColumnSchema::new(FRONTEND, CDT::string_datatype(), false),
74            ColumnSchema::new(
75                START_TIMESTAMP,
76                CDT::timestamp_millisecond_datatype(),
77                false,
78            ),
79            ColumnSchema::new(ELAPSED_TIME, CDT::duration_millisecond_datatype(), false),
80        ]))
81    }
82}
83
84impl InformationTable for InformationSchemaProcessList {
85    fn table_id(&self) -> TableId {
86        INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID
87    }
88
89    fn table_name(&self) -> &'static str {
90        "process_list"
91    }
92
93    fn schema(&self) -> SchemaRef {
94        self.schema.clone()
95    }
96
97    fn to_stream(&self, request: ScanRequest) -> error::Result<SendableRecordBatchStream> {
98        let process_manager = self.process_manager.clone();
99        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
100            self.schema.arrow_schema().clone(),
101            futures::stream::once(async move {
102                make_process_list(process_manager, request)
103                    .await
104                    .map(RecordBatch::into_df_record_batch)
105                    .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
106            }),
107        ));
108
109        Ok(Box::pin(
110            RecordBatchStreamAdapter::try_new(stream)
111                .map_err(BoxedError::new)
112                .context(InternalSnafu)?,
113        ))
114    }
115}
116
117/// Build running process list.
118async fn make_process_list(
119    process_manager: ProcessManagerRef,
120    request: ScanRequest,
121) -> error::Result<RecordBatch> {
122    let predicates = Predicates::from_scan_request(&Some(request));
123    let current_time = current_time_millis();
124    // todo(hl): find a way to extract user catalog to filter queries from other users.
125    let queries = process_manager.list_all_processes(None).await?;
126
127    let mut id_builder = StringVectorBuilder::with_capacity(queries.len());
128    let mut catalog_builder = StringVectorBuilder::with_capacity(queries.len());
129    let mut schemas_builder = StringVectorBuilder::with_capacity(queries.len());
130    let mut query_builder = StringVectorBuilder::with_capacity(queries.len());
131    let mut client_builder = StringVectorBuilder::with_capacity(queries.len());
132    let mut frontend_builder = StringVectorBuilder::with_capacity(queries.len());
133    let mut start_time_builder = TimestampMillisecondVectorBuilder::with_capacity(queries.len());
134    let mut elapsed_time_builder = DurationMillisecondVectorBuilder::with_capacity(queries.len());
135
136    for process in queries {
137        let display_id = DisplayProcessId {
138            server_addr: process.frontend.to_string(),
139            id: process.id,
140        }
141        .to_string();
142        let schemas = process.schemas.join(",");
143        let id = Value::from(display_id);
144        let catalog = Value::from(process.catalog);
145        let schemas = Value::from(schemas);
146        let query = Value::from(process.query);
147        let client = Value::from(process.client);
148        let frontend = Value::from(process.frontend);
149        let start_timestamp = Value::from(Timestamp::new_millisecond(process.start_timestamp));
150        let elapsed_time = Value::from(Duration::new_millisecond(
151            current_time - process.start_timestamp,
152        ));
153        let row = [
154            (ID, &id),
155            (CATALOG, &catalog),
156            (SCHEMAS, &schemas),
157            (QUERY, &query),
158            (CLIENT, &client),
159            (FRONTEND, &frontend),
160            (START_TIMESTAMP, &start_timestamp),
161            (ELAPSED_TIME, &elapsed_time),
162        ];
163        if predicates.eval(&row) {
164            id_builder.push(id.as_string().as_deref());
165            catalog_builder.push(catalog.as_string().as_deref());
166            schemas_builder.push(schemas.as_string().as_deref());
167            query_builder.push(query.as_string().as_deref());
168            client_builder.push(client.as_string().as_deref());
169            frontend_builder.push(frontend.as_string().as_deref());
170            start_time_builder.push(start_timestamp.as_timestamp().map(|t| t.value().into()));
171            elapsed_time_builder.push(elapsed_time.as_duration().map(|d| d.value().into()));
172        }
173    }
174
175    RecordBatch::new(
176        InformationSchemaProcessList::schema(),
177        vec![
178            Arc::new(id_builder.finish()) as VectorRef,
179            Arc::new(catalog_builder.finish()) as VectorRef,
180            Arc::new(schemas_builder.finish()) as VectorRef,
181            Arc::new(query_builder.finish()) as VectorRef,
182            Arc::new(client_builder.finish()) as VectorRef,
183            Arc::new(frontend_builder.finish()) as VectorRef,
184            Arc::new(start_time_builder.finish()) as VectorRef,
185            Arc::new(elapsed_time_builder.finish()) as VectorRef,
186        ],
187    )
188    .context(error::CreateRecordBatchSnafu)
189}