frontend/
slow_query_recorder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant, UNIX_EPOCH};
18
19use api::v1::value::ValueData;
20use api::v1::{
21    ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
22    RowInsertRequests, Rows, SemanticType,
23};
24use catalog::CatalogManagerRef;
25use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
26use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
27use common_telemetry::{debug, error, info, slow};
28use common_time::timestamp::{TimeUnit, Timestamp};
29use operator::insert::InserterRef;
30use operator::statement::StatementExecutorRef;
31use query::parser::QueryStatement;
32use rand::random;
33use session::context::{QueryContextBuilder, QueryContextRef};
34use snafu::ResultExt;
35use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
36use table::TableRef;
37use tokio::sync::mpsc::{channel, Receiver, Sender};
38use tokio::task::JoinHandle;
39
40use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
41
42const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
43const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
44const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
45const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
46const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
47const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
48const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
49const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
50const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
51const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
52
53const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
54const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
55
56/// SlowQueryRecorder is responsible for recording slow queries.
57#[derive(Clone)]
58pub struct SlowQueryRecorder {
59    tx: Sender<SlowQueryEvent>,
60    slow_query_opts: SlowQueryOptions,
61    _handle: Arc<JoinHandle<()>>,
62}
63
64#[derive(Debug)]
65struct SlowQueryEvent {
66    cost: u64,
67    threshold: u64,
68    query: String,
69    is_promql: bool,
70    query_ctx: QueryContextRef,
71    promql_range: Option<u64>,
72    promql_step: Option<u64>,
73    promql_start: Option<i64>,
74    promql_end: Option<i64>,
75}
76
77impl SlowQueryRecorder {
78    /// Create a new SlowQueryRecorder.
79    pub fn new(
80        slow_query_opts: SlowQueryOptions,
81        inserter: InserterRef,
82        statement_executor: StatementExecutorRef,
83        catalog_manager: CatalogManagerRef,
84    ) -> Self {
85        let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE);
86
87        let ttl = slow_query_opts
88            .ttl
89            .clone()
90            .unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string());
91
92        // Start a new task to process the slow query events.
93        let event_handler = SlowQueryEventHandler {
94            inserter,
95            statement_executor,
96            catalog_manager,
97            rx,
98            record_type: slow_query_opts.record_type,
99            ttl,
100        };
101
102        // Start a new background task to process the slow query events.
103        let handle = tokio::spawn(async move {
104            event_handler.process_slow_query().await;
105        });
106
107        Self {
108            tx,
109            slow_query_opts,
110            _handle: Arc::new(handle),
111        }
112    }
113
114    /// Starts a new SlowQueryTimer. Returns `None` if `slow_query.enable` is false.
115    /// The timer sets the start time when created and calculates the elapsed duration when dropped.
116    pub fn start(
117        &self,
118        stmt: QueryStatement,
119        query_ctx: QueryContextRef,
120    ) -> Option<SlowQueryTimer> {
121        if self.slow_query_opts.enable {
122            Some(SlowQueryTimer {
123                stmt,
124                query_ctx,
125                start: Instant::now(), // Set the initial start time.
126                threshold: self.slow_query_opts.threshold,
127                sample_ratio: self.slow_query_opts.sample_ratio,
128                tx: self.tx.clone(),
129            })
130        } else {
131            None
132        }
133    }
134}
135
136struct SlowQueryEventHandler {
137    inserter: InserterRef,
138    statement_executor: StatementExecutorRef,
139    catalog_manager: CatalogManagerRef,
140    rx: Receiver<SlowQueryEvent>,
141    record_type: SlowQueriesRecordType,
142    ttl: String,
143}
144
145impl SlowQueryEventHandler {
146    async fn process_slow_query(mut self) {
147        info!(
148            "Start the background handler to process slow query events and record them in {:?}.",
149            self.record_type
150        );
151        while let Some(event) = self.rx.recv().await {
152            self.record_slow_query(event).await;
153        }
154    }
155
156    async fn record_slow_query(&self, event: SlowQueryEvent) {
157        match self.record_type {
158            SlowQueriesRecordType::Log => {
159                // Record the slow query in a specific logs file.
160                slow!(
161                    cost = event.cost,
162                    threshold = event.threshold,
163                    query = event.query,
164                    is_promql = event.is_promql,
165                    promql_range = event.promql_range,
166                    promql_step = event.promql_step,
167                    promql_start = event.promql_start,
168                    promql_end = event.promql_end,
169                );
170            }
171            SlowQueriesRecordType::SystemTable => {
172                // Record the slow query in a system table that is stored in greptimedb itself.
173                if let Err(e) = self.insert_slow_query(&event).await {
174                    error!(e; "Failed to insert slow query, query: {:?}", event);
175                }
176            }
177        }
178    }
179
180    async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> {
181        debug!("Handle the slow query event: {:?}", event);
182
183        let table = if let Some(table) = self
184            .catalog_manager
185            .table(
186                event.query_ctx.current_catalog(),
187                DEFAULT_PRIVATE_SCHEMA_NAME,
188                SLOW_QUERY_TABLE_NAME,
189                Some(&event.query_ctx),
190            )
191            .await
192            .context(CatalogSnafu)?
193        {
194            table
195        } else {
196            // Create the system table if it doesn't exist.
197            self.create_system_table(event.query_ctx.clone()).await?
198        };
199
200        let insert = RowInsertRequest {
201            table_name: SLOW_QUERY_TABLE_NAME.to_string(),
202            rows: Some(Rows {
203                schema: self.build_insert_column_schema(),
204                rows: vec![Row {
205                    values: vec![
206                        ValueData::U64Value(event.cost).into(),
207                        ValueData::U64Value(event.threshold).into(),
208                        ValueData::StringValue(event.query.to_string()).into(),
209                        ValueData::BoolValue(event.is_promql).into(),
210                        ValueData::TimestampNanosecondValue(
211                            Timestamp::current_time(TimeUnit::Nanosecond).value(),
212                        )
213                        .into(),
214                        ValueData::U64Value(event.promql_range.unwrap_or(0)).into(),
215                        ValueData::U64Value(event.promql_step.unwrap_or(0)).into(),
216                        ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0))
217                            .into(),
218                        ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(),
219                    ],
220                }],
221            }),
222        };
223
224        let requests = RowInsertRequests {
225            inserts: vec![insert],
226        };
227
228        let table_info = table.table_info();
229        let query_ctx = QueryContextBuilder::default()
230            .current_catalog(table_info.catalog_name.to_string())
231            .current_schema(table_info.schema_name.to_string())
232            .build()
233            .into();
234
235        self.inserter
236            .handle_row_inserts(requests, query_ctx, &self.statement_executor, false)
237            .await
238            .context(TableOperationSnafu)?;
239
240        Ok(())
241    }
242
243    async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result<TableRef> {
244        let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog());
245        if let Some(table) = self
246            .catalog_manager
247            .table(
248                &create_table_expr.catalog_name,
249                &create_table_expr.schema_name,
250                &create_table_expr.table_name,
251                Some(&query_ctx),
252            )
253            .await
254            .context(CatalogSnafu)?
255        {
256            // The table is already created, so we don't need to create it again.
257            return Ok(table);
258        }
259
260        // Create the `slow_queries` system table.
261        let table = self
262            .statement_executor
263            .create_table_inner(&mut create_table_expr, None, query_ctx.clone())
264            .await
265            .context(TableOperationSnafu)?;
266
267        info!(
268            "Create the {} system table in {:?} successfully.",
269            SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME
270        );
271
272        Ok(table)
273    }
274
275    fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr {
276        let column_defs = vec![
277            ColumnDef {
278                name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
279                data_type: ColumnDataType::Uint64 as i32,
280                is_nullable: false,
281                default_constraint: vec![],
282                semantic_type: SemanticType::Field as i32,
283                comment: "The cost of the slow query in milliseconds".to_string(),
284                datatype_extension: None,
285                options: None,
286            },
287            ColumnDef {
288                name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
289                data_type: ColumnDataType::Uint64 as i32,
290                is_nullable: false,
291                default_constraint: vec![],
292                semantic_type: SemanticType::Field as i32,
293                comment:
294                    "When the query cost exceeds this value, it will be recorded as a slow query"
295                        .to_string(),
296                datatype_extension: None,
297                options: None,
298            },
299            ColumnDef {
300                name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
301                data_type: ColumnDataType::String as i32,
302                is_nullable: false,
303                default_constraint: vec![],
304                semantic_type: SemanticType::Field as i32,
305                comment: "The original query statement".to_string(),
306                datatype_extension: None,
307                options: None,
308            },
309            ColumnDef {
310                name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
311                data_type: ColumnDataType::Boolean as i32,
312                is_nullable: false,
313                default_constraint: vec![],
314                semantic_type: SemanticType::Field as i32,
315                comment: "Whether the query is a PromQL query".to_string(),
316                datatype_extension: None,
317                options: None,
318            },
319            ColumnDef {
320                name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
321                data_type: ColumnDataType::TimestampNanosecond as i32,
322                is_nullable: false,
323                default_constraint: vec![],
324                semantic_type: SemanticType::Timestamp as i32,
325                comment: "The timestamp of the slow query".to_string(),
326                datatype_extension: None,
327                options: None,
328            },
329            ColumnDef {
330                name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
331                data_type: ColumnDataType::Uint64 as i32,
332                is_nullable: false,
333                default_constraint: vec![],
334                semantic_type: SemanticType::Field as i32,
335                comment: "The time range of the PromQL query in milliseconds".to_string(),
336                datatype_extension: None,
337                options: None,
338            },
339            ColumnDef {
340                name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
341                data_type: ColumnDataType::Uint64 as i32,
342                is_nullable: false,
343                default_constraint: vec![],
344                semantic_type: SemanticType::Field as i32,
345                comment: "The step of the PromQL query in milliseconds".to_string(),
346                datatype_extension: None,
347                options: None,
348            },
349            ColumnDef {
350                name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
351                data_type: ColumnDataType::TimestampMillisecond as i32,
352                is_nullable: false,
353                default_constraint: vec![],
354                semantic_type: SemanticType::Field as i32,
355                comment: "The start timestamp of the PromQL query in milliseconds".to_string(),
356                datatype_extension: None,
357                options: None,
358            },
359            ColumnDef {
360                name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
361                data_type: ColumnDataType::TimestampMillisecond as i32,
362                is_nullable: false,
363                default_constraint: vec![],
364                semantic_type: SemanticType::Field as i32,
365                comment: "The end timestamp of the PromQL query in milliseconds".to_string(),
366                datatype_extension: None,
367                options: None,
368            },
369        ];
370
371        let table_options = HashMap::from([
372            (APPEND_MODE_KEY.to_string(), "true".to_string()),
373            (TTL_KEY.to_string(), self.ttl.to_string()),
374        ]);
375
376        CreateTableExpr {
377            catalog_name: catalog.to_string(),
378            schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), // Always to store in the `greptime_private` schema.
379            table_name: SLOW_QUERY_TABLE_NAME.to_string(),
380            desc: "GreptimeDB system table for storing slow queries".to_string(),
381            column_defs,
382            time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
383            primary_keys: vec![],
384            create_if_not_exists: true,
385            table_options,
386            table_id: None,
387            engine: default_engine().to_string(),
388        }
389    }
390
391    fn build_insert_column_schema(&self) -> Vec<ColumnSchema> {
392        vec![
393            ColumnSchema {
394                column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
395                datatype: ColumnDataType::Uint64.into(),
396                semantic_type: SemanticType::Field.into(),
397                ..Default::default()
398            },
399            ColumnSchema {
400                column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
401                datatype: ColumnDataType::Uint64.into(),
402                semantic_type: SemanticType::Field.into(),
403                ..Default::default()
404            },
405            ColumnSchema {
406                column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
407                datatype: ColumnDataType::String.into(),
408                semantic_type: SemanticType::Field.into(),
409                ..Default::default()
410            },
411            ColumnSchema {
412                column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
413                datatype: ColumnDataType::Boolean.into(),
414                semantic_type: SemanticType::Field.into(),
415                ..Default::default()
416            },
417            ColumnSchema {
418                column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
419                datatype: ColumnDataType::TimestampNanosecond.into(),
420                semantic_type: SemanticType::Timestamp.into(),
421                ..Default::default()
422            },
423            ColumnSchema {
424                column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
425                datatype: ColumnDataType::Uint64.into(),
426                semantic_type: SemanticType::Field.into(),
427                ..Default::default()
428            },
429            ColumnSchema {
430                column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
431                datatype: ColumnDataType::Uint64.into(),
432                semantic_type: SemanticType::Field.into(),
433                ..Default::default()
434            },
435            ColumnSchema {
436                column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
437                datatype: ColumnDataType::TimestampMillisecond.into(),
438                semantic_type: SemanticType::Field.into(),
439                ..Default::default()
440            },
441            ColumnSchema {
442                column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
443                datatype: ColumnDataType::TimestampMillisecond.into(),
444                semantic_type: SemanticType::Field.into(),
445                ..Default::default()
446            },
447        ]
448    }
449}
450
451/// SlowQueryTimer is used to log slow query when it's dropped.
452/// In drop(), it will check if the query is slow and send the slow query event to the handler.
453pub struct SlowQueryTimer {
454    start: Instant,
455    stmt: QueryStatement,
456    query_ctx: QueryContextRef,
457    threshold: Option<Duration>,
458    sample_ratio: Option<f64>,
459    tx: Sender<SlowQueryEvent>,
460}
461
462impl SlowQueryTimer {
463    fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
464        let mut slow_query_event = SlowQueryEvent {
465            cost: elapsed.as_millis() as u64,
466            threshold: threshold.as_millis() as u64,
467            query: "".to_string(),
468            query_ctx: self.query_ctx.clone(),
469
470            // The following fields are only used for PromQL queries.
471            is_promql: false,
472            promql_range: None,
473            promql_step: None,
474            promql_start: None,
475            promql_end: None,
476        };
477
478        match &self.stmt {
479            QueryStatement::Promql(stmt) => {
480                slow_query_event.is_promql = true;
481                slow_query_event.query = stmt.expr.to_string();
482                slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
483
484                let start = stmt
485                    .start
486                    .duration_since(UNIX_EPOCH)
487                    .unwrap_or_default()
488                    .as_millis() as i64;
489
490                let end = stmt
491                    .end
492                    .duration_since(UNIX_EPOCH)
493                    .unwrap_or_default()
494                    .as_millis() as i64;
495
496                slow_query_event.promql_range = Some((end - start) as u64);
497                slow_query_event.promql_start = Some(start);
498                slow_query_event.promql_end = Some(end);
499            }
500            QueryStatement::Sql(stmt) => {
501                slow_query_event.query = stmt.to_string();
502            }
503        }
504
505        // Send SlowQueryEvent to the handler.
506        if let Err(e) = self.tx.try_send(slow_query_event) {
507            error!(e; "Failed to send slow query event");
508        }
509    }
510}
511
512impl Drop for SlowQueryTimer {
513    fn drop(&mut self) {
514        if let Some(threshold) = self.threshold {
515            // Calculate the elaspsed duration since the timer is created.
516            let elapsed = self.start.elapsed();
517            if elapsed > threshold {
518                if let Some(ratio) = self.sample_ratio {
519                    // Only capture a portion of slow queries based on sample_ratio.
520                    // Generate a random number in [0, 1) and compare it with sample_ratio.
521                    if ratio >= 1.0 || random::<f64>() <= ratio {
522                        self.send_slow_query_event(elapsed, threshold);
523                    }
524                } else {
525                    // Captures all slow queries if sample_ratio is not set.
526                    self.send_slow_query_event(elapsed, threshold);
527                }
528            }
529        }
530    }
531}